//生产者代码
//product.go
package main
import (
"fmt"
"github.com/Shopify/sarama"
"strconv"
)
// 基于sarama第三方库开发的kafka client
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
//msg.Value = sarama.StringEncoder("this is a test log")
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"ip:9082"}, config)
if err != nil {
panic(fmt.Sprintf("producer closed, err %v" ,err))
}
defer client.Close()
// 发送消息
i := 0
for {
msg.Value = sarama.StringEncoder("this is a test log"+ strconv.Itoa(i))
pid, offset, err := client.SendMessage(msg)
if err != nil {
panic(fmt.Sprintf("send msg failed, err %v" ,err))
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
i++
if i == 10 {
break
}
}
}
//消费者
//consumer
package main
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
)
func main() {
var wg sync.WaitGroup
//创建消费者
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V2_1_1_0
client, err := sarama.NewClient([]string{"ip:9082"}, config)
defer client.Close()
if err != nil {
panic(err)
}
consumer, err := sarama.NewConsumerFromClient(client)
defer consumer.Close()
if err != nil {
panic(err)
}
//设置分区
partitionList, err := consumer.Partitions("liangtian_topic")
if err != nil {
fmt.Println("faild to get the list of partitions", err)
}
//[0 1 2]
fmt.Println(partitionList)
//循环读取分区
for partition := range partitionList {
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetOldest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
return
}
defer pc.AsyncClose()
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s \n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc)
}
//time.Sleep(time.Hour)
wg.Wait()
consumer.Close()
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)