如下所示:
from kafka import KafkaClientfrom kafka.producer import SimpleProducer
def send_data_2_kafka(datas): ''' 向kafka解析隊列發送數據 ''' client = KafkaClient(hosts=KAFKABROKER.split(","), timeout=30) producer = SimpleProducer(client, async=False) curcount = len(datas)/PARTNUM for i in range(0, PARTNUM): start = i*curcount if i != PARTNUM - 1: end = (i+1)*curcount curdata = datas[start:end] producer.send_messages(TOPICNAME, *curdata) else: curdata = datas[start:] producer.send_messages(TOPICNAME, *curdata) producer.stop() client.close()其中PARTNUM為topic的partition的數目,這樣保證批量發送的數據均勻的落在kafka的partition中。
以上這篇kafka-python批量發送數據的實例就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持武林站長站。
新聞熱點
疑難解答