国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Golang > 正文

golang如何使用sarama訪問kafka

2020-04-01 18:50:41
字體:
來源:轉載
供稿:網友

下面一個客戶端代碼例子訪問kafka服務器,來發送和接受消息。

使用方式

1、命令行參數

$ ./kafkaclient -hUsage of ./client: -ca string  CA Certificate (default "ca.pem") -cert string  Client Certificate (default "cert.pem") -command string  consumer|producer (default "consumer") -host string  Common separated kafka hosts (default "localhost:9093") -key string  Client Key (default "key.pem") -partition int  Kafka topic partition -tls  TLS enable -topic string  Kafka topic (default "test--topic")

2、作為producer啟動

$ ./kafkaclient -command producer / -host kafka1:9092,kafka2:9092## TLS-enabled$ ./kafkaclient -command producer / -tls -cert client.pem -key client.key -ca ca.pem / -host kafka1:9093,kafka2:9093

producer發送消息給kafka:

> aaa2018/12/15 07:11:21 Produced message: [aaa]> bbb2018/12/15 07:11:30 Produced message: [bbb]> quit

3、作為consumer啟動

$ ./kafkaclient -command consumer / -host kafka1:9092,kafka2:9092## TLS-enabled$ ./kafkaclient -command consumer / -tls -cert client.pem -key client.key -ca ca.pem / -host kafka1:9093,kafka2:9093

consumer從kafka接受消息:

2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]

完整源代碼如下

這個代碼使用到了Shopify/sarama庫,請自行下載使用。

$ cat kafkaclient.gopackage mainimport ( "flag" "fmt" "log" "os" "io/ioutil" "bufio" "strings" "crypto/tls" "crypto/x509" "github.com/Shopify/sarama")var ( command  string tlsEnable bool hosts  string topic  string partition int clientcert string clientkey string cacert  string)func main() { flag.StringVar(&command, "command",  "consumer",   "consumer|producer") flag.BoolVar(&tlsEnable, "tls",   false,    "TLS enable") flag.StringVar(&hosts,  "host",   "localhost:9093", "Common separated kafka hosts") flag.StringVar(&topic,  "topic",  "test--topic",  "Kafka topic") flag.IntVar(&partition,  "partition", 0,     "Kafka topic partition") flag.StringVar(&clientcert, "cert",   "cert.pem",   "Client Certificate") flag.StringVar(&clientkey, "key",   "key.pem",   "Client Key") flag.StringVar(&cacert,  "ca",   "ca.pem",   "CA Certificate") flag.Parse() config := sarama.NewConfig() if tlsEnable {  //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)  tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)  if err != nil {   log.Fatal(err)  }  config.Net.TLS.Enable = true  config.Net.TLS.Config = tlsConfig } client, err := sarama.NewClient(strings.Split(hosts, ","), config) if err != nil {  log.Fatalf("unable to create kafka client: %q", err) } if command == "consumer" {  consumer, err := sarama.NewConsumerFromClient(client)  if err != nil {   log.Fatal(err)  }  defer consumer.Close()  loopConsumer(consumer, topic, partition) } else {  producer, err := sarama.NewAsyncProducerFromClient(client)  if err != nil {   log.Fatal(err)  }  defer producer.Close()  loopProducer(producer, topic, partition) }}func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) { // load client cert clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile) if err != nil {  return nil, err } // load ca cert pool cacert, err := ioutil.ReadFile(cacertfile) if err != nil {  return nil, err } cacertpool := x509.NewCertPool() cacertpool.AppendCertsFromPEM(cacert) // generate tlcconfig tlsConfig := tls.Config{} tlsConfig.RootCAs = cacertpool tlsConfig.Certificates = []tls.Certificate{clientcert} tlsConfig.BuildNameToCertificate() // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert: return &tlsConfig, err}func loopProducer(producer sarama.AsyncProducer, topic string, partition int) { scanner := bufio.NewScanner(os.Stdin) fmt.Print("> ") for scanner.Scan() {  text := scanner.Text()  if text == "" {  } else if text == "exit" || text == "quit" {   break  } else {   producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}   log.Printf("Produced message: [%s]/n",text)  }  fmt.Print("> ") }}func loopConsumer(consumer sarama.Consumer, topic string, partition int) { partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil {  log.Println(err)  return } defer partitionConsumer.Close() for {  msg := <-partitionConsumer.Messages()  log.Printf("Consumed message: [%s], offset: [%d]/n", msg.Value, msg.Offset) }}

編譯:

$ go build kafkaclient.go

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VEVB武林網。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 涟源市| 莱州市| 宁津县| 黄石市| 连山| 泽州县| 吴堡县| 门头沟区| 平原县| 固安县| 广水市| 米脂县| 抚宁县| 寿光市| 广饶县| 慈利县| 永年县| 淳安县| 深泽县| 西宁市| 小金县| 牡丹江市| 日喀则市| 孟州市| 长垣县| 青冈县| 茶陵县| 西青区| 镇坪县| 兰州市| 昭平县| 瓮安县| 通化县| 玛纳斯县| 崇阳县| 枞阳县| 临汾市| 明溪县| 罗源县| 正定县| 南昌市|