golang使用kafka

golang使用kafka,第1张

kafka 安装及基础概念介绍可以参考:kafka 安装、配置、启动_王安的博客-CSDN博客_kafka安装启动

本文主要介绍confluent-kafka-go的使用方法。confluent-kafka-go,简单易用,并且表现稳定,是kafka官网推荐的golang package。
https://github.com/confluentinc/confluent-kafka-go
 

一. 下载go client
go get -v github.com/confluentinc/confluent-kafka-go
二 example 2.1 创建topic 
import (
	"context"
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"time"
)

const Topic = "testTopic"
const Broker = "127.0.0.1:9192"
const NumPartition = 2
const ReplicationFactor = 1
const ConsumerGroup1 = "consumerTest1"



func main() {

	fmt.Println("Kafka Demo RUN:")
	//创建topic
	KafkaCreateTopic()
	//创建生产者
	go KafkaProducer()
	//创建消费者
	go KafkaConsumer("group1")
	go KafkaConsumer("group1")
	go KafkaConsumer("group2")

	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
	run := true
	for run {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false
		}
	}
}



/*
创建topic
*/
func KafkaCreateTopic() {

	// 创建一个新的AdminClient。
	a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": Broker})
	if err != nil {
		fmt.Printf("Failed to create Admin client: %s\n", err)
		os.Exit(1)
	}

	// Contexts 用于中止或限制时间量
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	//在集群上创建主题。
	//设置管理员选项以等待操作完成(或最多60秒)
	maxDur, err := time.ParseDuration("60s")
	if err != nil {
		panic("ParseDuration(60s)")
	}
	results, err := a.CreateTopics(
		ctx,
		//通过提供的TopicSpecification结构,可以同时创建多个主题
		[]kafka.TopicSpecification{{
			Topic:             Topic,
			NumPartitions:     NumPartition,
			ReplicationFactor: ReplicationFactor}},
		// Admin options
		kafka.SetAdminOperationTimeout(maxDur))
	if err != nil {
		fmt.Printf("Failed to create topic: %v\n", err)
		os.Exit(1)
	}

	// Print results
	for _, result := range results {
		fmt.Printf("%s\n", result)
	}

	a.Close()
}
2.2 Producer

/*
消息生产者
*/
func KafkaProducer() {

	topic := Topic
	broker := Broker
	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})

	if err != nil {
		fmt.Printf("Failed to create producer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Producer %v\n", p)

	// Optional delivery channel, if not specified the Producer object's
	// .Events channel is used.
	deliveryChan := make(chan kafka.Event)

	//每5s向kafka发送一条消息
	n := 0
	for {
		n++
		value := strconv.Itoa(n) + " Hello Go!"
		err = p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(value),
			Headers:        []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
		}, deliveryChan)

		e := <-deliveryChan
		m := e.(*kafka.Message)

		fmt.Printf("生产者:Delivery failed: %v\n", m.TopicPartition.Error)
		if m.TopicPartition.Error != nil {
			fmt.Printf("生产者:Delivery failed: %v\n", m.TopicPartition.Error)
		} else {
			fmt.Printf("生产者:Delivered message to topic %s [%d] at offset %v\n",
				*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
		}

		time.Sleep(5 * time.Second)
	}

	close(deliveryChan)
}


2.3 Consumer
func KafkaConsumer(consumerGroup string) {

	broker := Broker
	group := consumerGroup
	topics := Topic
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": broker,
		// Avoid connecting to IPv6 brokers:
		// This is needed for the ErrAllBrokersDown show-case below
		// when using localhost brokers on OSX, since the OSX resolver
		// will return the IPv6 addresses first.
		// You typically don't need to specify this configuration property.
		"broker.address.family": "v4",
		"group.id":              group,
		"session.timeout.ms":    6000,
		"auto.offset.reset":     "earliest"})

	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Consumer %v\n", c)

	err = c.SubscribeTopics([]string{topics}, nil)

	run := true

	for run {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false
		default:
			ev := c.Poll(100)
			if ev == nil {
				continue
			}

			switch e := ev.(type) {
			case *kafka.Message:

				fmt.Printf("%%消费者consumerGroup%s Message on %s:\n%s\n",
					group, e.TopicPartition, string(e.Value))
				if e.Headers != nil {
					fmt.Printf("%% Headers: %v\n", e.Headers)
				}
			case kafka.Error:
				// Errors should generally be considered
				// informational, the client will try to
				// automatically recover.
				// But in this example we choose to terminate
				// the application if all brokers are down.
				fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
				if e.Code() == kafka.ErrAllBrokersDown {
					run = false
				}
			default:
				fmt.Printf("Ignored %v\n", e)
			}
		}
	}

	fmt.Printf("Closing consumer\n")
	c.Close()
}

 更多example...

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/989753.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存