kafka 安装及基础概念介绍可以参考:kafka 安装、配置、启动_王安的博客-CSDN博客_kafka安装启动
本文主要介绍confluent-kafka-go的使用方法。confluent-kafka-go,简单易用,并且表现稳定,是kafka官网推荐的golang package。
https://github.com/confluentinc/confluent-kafka-go
go get -v github.com/confluentinc/confluent-kafka-go二 example 2.1 创建topic
import ( "context" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "os" "os/signal" "strconv" "syscall" "time" ) const Topic = "testTopic" const Broker = "127.0.0.1:9192" const NumPartition = 2 const ReplicationFactor = 1 const ConsumerGroup1 = "consumerTest1" func main() { fmt.Println("Kafka Demo RUN:") //创建topic KafkaCreateTopic() //创建生产者 go KafkaProducer() //创建消费者 go KafkaConsumer("group1") go KafkaConsumer("group1") go KafkaConsumer("group2") sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) run := true for run { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminatingn", sig) run = false } } } func KafkaCreateTopic() { // 创建一个新的AdminClient。 a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": Broker}) if err != nil { fmt.Printf("Failed to create Admin client: %sn", err) os.Exit(1) } // Contexts 用于中止或限制时间量 ctx, cancel := context.WithCancel(context.Background()) defer cancel() //在集群上创建主题。 //设置管理员选项以等待操作完成(或最多60秒) maxDur, err := time.ParseDuration("60s") if err != nil { panic("ParseDuration(60s)") } results, err := a.CreateTopics( ctx, //通过提供的TopicSpecification结构,可以同时创建多个主题 []kafka.TopicSpecification{{ Topic: Topic, NumPartitions: NumPartition, ReplicationFactor: ReplicationFactor}}, // Admin options kafka.SetAdminOperationTimeout(maxDur)) if err != nil { fmt.Printf("Failed to create topic: %vn", err) os.Exit(1) } // Print results for _, result := range results { fmt.Printf("%sn", result) } a.Close() }2.2 Producer
func KafkaProducer() { topic := Topic broker := Broker p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { fmt.Printf("Failed to create producer: %sn", err) os.Exit(1) } fmt.Printf("Created Producer %vn", p) // Optional delivery channel, if not specified the Producer object's // .Events channel is used. deliveryChan := make(chan kafka.Event) //每5s向kafka发送一条消息 n := 0 for { n++ value := strconv.Itoa(n) + " Hello Go!" err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value), Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, }, deliveryChan) e := <-deliveryChan m := e.(*kafka.Message) fmt.Printf("生产者:Delivery failed: %vn", m.TopicPartition.Error) if m.TopicPartition.Error != nil { fmt.Printf("生产者:Delivery failed: %vn", m.TopicPartition.Error) } else { fmt.Printf("生产者:Delivered message to topic %s [%d] at offset %vn", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) } time.Sleep(5 * time.Second) } close(deliveryChan) }2.3 Consumer
func KafkaConsumer(consumerGroup string) { broker := Broker group := consumerGroup topics := Topic sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, // Avoid connecting to IPv6 brokers: // This is needed for the ErrAllBrokersDown show-case below // when using localhost brokers on OSX, since the OSX resolver // will return the IPv6 addresses first. // You typically don't need to specify this configuration property. "broker.address.family": "v4", "group.id": group, "session.timeout.ms": 6000, "auto.offset.reset": "earliest"}) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %sn", err) os.Exit(1) } fmt.Printf("Created Consumer %vn", c) err = c.SubscribeTopics([]string{topics}, nil) run := true for run { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminatingn", sig) run = false default: ev := c.Poll(100) if ev == nil { continue } switch e := ev.(type) { case *kafka.Message: fmt.Printf("%%消费者consumerGroup%s Message on %s:n%sn", group, e.TopicPartition, string(e.Value)) if e.Headers != nil { fmt.Printf("%% Headers: %vn", e.Headers) } case kafka.Error: // Errors should generally be considered // informational, the client will try to // automatically recover. // But in this example we choose to terminate // the application if all brokers are down. fmt.Fprintf(os.Stderr, "%% Error: %v: %vn", e.Code(), e) if e.Code() == kafka.ErrAllBrokersDown { run = false } default: fmt.Printf("Ignored %vn", e) } } } fmt.Printf("Closing consumern") c.Close() }
更多example...
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)