package main import ( "fmt" "github.com/Shopify/sarama" "log" "strconv" ) const ( BROKER = "ip:port" TOPIC = "xx" ) // sendMsg 发送到 kfk func sendMsg(client sarama.SyncProducer, msg *sarama.ProducerMessage) error { partID, offset, err := client.SendMessage(msg) if err != nil { return err } // 打印信息 fmt.Printf("pid:%v offset:%vn", partID, offset) return nil } // newMsg 返回一个 ProducerMessage 结构体 func newMsg(topic, key, content string) *sarama.ProducerMessage { return &sarama.ProducerMessage{ Topic: topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(content), } } // 基于sarama第三方库开发的kafka client func main() { // Client 配置 config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 // 连接kafka, 集群外部使用 8000 端口, 内部使用 9092 端口 c, err := sarama.NewSyncProducer([]string{BROKER}, config) if err != nil { log.Fatalf("[create client failed] err: %v", err) } defer c.Close() // 构造消息 msg := newMsg(TOPIC, "ttt", `{"name":"Tim", "age":"18"}`) // 发送单个消息 if err := sendMsg(c, msg); err != nil { log.Fatalf("[send msg failed] err: %v", err) } // 循环发送消息 for k, word := range []string{"Welcome11", "to", "the", "Confluent", "Kafka", "Golang", "client"} { msg := newMsg("yy", strconv.Itoa(k), word) if err := sendMsg(c, msg); err != nil { fmt.Printf("[send msg failed] err: %vn", err) continue } } }
consumer
package main import ( "fmt" "github.com/Shopify/sarama" "log" ) const ( BROKER = "service-epoch-kafka.epoch-kafka.svc.manager.ucbj.kuber.thc:8000" TOPIC = "yangkaiyue-test" ) func main() { consumer, err := sarama.NewConsumer([]string{BROKER}, nil) if err != nil { log.Fatalf("[new consumer failed] err: %v", err) } // 根据topic取到所有的分区 partitionList, err := consumer.Partitions(TOPIC) if err != nil { log.Fatalf("[get partition list failed] err: %v", err) } // 遍历所有的分区 for partition := range partitionList { // 针对每个分区创建一个对应的分区消费者 pc, err := consumer.ConsumePartition(TOPIC, int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%vn", partition, err) return } defer pc.AsyncClose() // 异步从每个分区消费信息 go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%vn", msg.Partition, msg.Offset, msg.Key, string(msg.Value)) } }(pc) } //阻塞进程 select{} }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)