go调用kafka做生产消费者

go调用kafka做生产消费者,第1张

//生产者代码
//product.go
package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"strconv"
)

// 基于sarama第三方库开发的kafka client

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	//msg.Value = sarama.StringEncoder("this is a test log")
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"ip:9082"}, config)
	if err != nil {
		panic(fmt.Sprintf("producer closed, err %v" ,err))
	}
	defer client.Close()
	// 发送消息
	i := 0
	for {
		msg.Value = sarama.StringEncoder("this is a test log"+ strconv.Itoa(i))
		pid, offset, err := client.SendMessage(msg)
		if err != nil {
			panic(fmt.Sprintf("send msg failed, err %v" ,err))
		}
		fmt.Printf("pid:%v offset:%v\n", pid, offset)
		i++
		if i == 10 {
			break
		}
	}
}
//消费者
//consumer
package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	//创建消费者
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Version = sarama.V2_1_1_0
	client, err := sarama.NewClient([]string{"ip:9082"}, config)
	defer client.Close()
	if err != nil {
		panic(err)
	}
	consumer, err := sarama.NewConsumerFromClient(client)
	defer consumer.Close()
	if err != nil {
		panic(err)
	}
	//设置分区
	partitionList, err := consumer.Partitions("liangtian_topic")
	if err != nil {
		fmt.Println("faild to get the list of partitions", err)
	}
	//[0 1 2]
	fmt.Println(partitionList)
	//循环读取分区
	for partition := range partitionList {
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetOldest)
		if err != nil {
			fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		wg.Add(1)
		go func(pc sarama.PartitionConsumer) {
			defer wg.Done()
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s \n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
			}
		}(pc)
	}
	//time.Sleep(time.Hour)
	wg.Wait()
	consumer.Close()
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/990925.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存