MongoDB
 sql >> Cơ Sở Dữ Liệu >  >> NoSQL >> MongoDB

Chìm Kafka Stream vào MongoDB bằng cách sử dụng PySpark Structured Streaming

Tôi đã tìm thấy một giải pháp. ..). Mã của tôi trông giống như thế này trong tệp testSpark.py:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Tệp nhật ký của mongodb trong mac ở đâu

  2. kết quả tổng hợp mgo $ unwind thành Loại phần tử không xác định (0x2E)

  3. ngoại lệ:Kích thước BSONObj:-286331154 (0xEEEEEEEE) không hợp lệ. Kích thước phải từ 0 đến 16793600 (16MB)

  4. Hợp nhất hai trường mảng trong mongoDB

  5. lấy giá trị bằng tên khóa mongodb node.js trình điều khiển