如下所示:
from kafka import KafkaClientfrom kafka.producer import SimpleProducer
def send_data_2_kafka(datas): ''' 向kafka解析隊(duì)列發(fā)送數(shù)據(jù) ''' client = KafkaClient(hosts=KAFKABROKER.split(","), timeout=30) producer = SimpleProducer(client, async=False) curcount = len(datas)/PARTNUM for i in range(0, PARTNUM): start = i*curcount if i != PARTNUM - 1: end = (i+1)*curcount curdata = datas[start:end] producer.send_messages(TOPICNAME, *curdata) else: curdata = datas[start:] producer.send_messages(TOPICNAME, *curdata) producer.stop() client.close()其中PARTNUM為topic的partition的數(shù)目,這樣保證批量發(fā)送的數(shù)據(jù)均勻的落在kafka的partition中。
以上這篇kafka-python批量發(fā)送數(shù)據(jù)的實(shí)例就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持VEVB武林網(wǎng)。
新聞熱點(diǎn)
疑難解答
圖片精選