如下所示:
安裝kafka支持庫pip install kafka-python
from kafka import KafkaProducerimport json ''' 生產(chǎn)者demo 向test_lyl2主題中循環(huán)寫入10條json數(shù)據(jù) 注意事項:要寫入json數(shù)據(jù)需加上value_serializer參數(shù),如下代碼'''producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'] )for i in range(10): data={ "name":"李四", "age":23, "gender":"男", "id":i } producer.send('test_lyl2', data)producer.close()from kafka import KafkaConsumerimport json ''' 消費者demo 消費test_lyl2主題中的數(shù)據(jù) 注意事項:如需以json格式讀取數(shù)據(jù)需加上value_deserializer參數(shù)''' consumer = KafkaConsumer('test_lyl2',group_id="lyl-gid1", bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'], auto_offset_reset='earliest',value_deserializer=json.loads )for message in consumer: print(message.value)以上這篇對python操作kafka寫入json數(shù)據(jù)的簡單demo分享就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持武林站長站。
新聞熱點
疑難解答