package main
import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"strconv"
"time"
)
// go get github.com/Shopify/sarama
var (
product sarama.SyncProducer
)
type BookInfo struct {
Title string `json:"title"`
Price float32 `json:"price"`
}
func main() {
if err := NewProduct([]string{"localhost:9092"}); err != nil {
fmt.Println(err, "NewProduct")
return
}
defer product.Close()
for i := 0; i < 5000; i++ {
data, err := json.Marshal(&BookInfo{
Title: "t" + strconv.Itoa(i),
Price: float32(i),
})
if err != nil {
fmt.Println("Marshal", err)
return
}
partition, offset, err := product.SendMessage(&sarama.ProducerMessage{
//Key: sarama.StringEncoder("audit"),
Topic: "audit",
Value: sarama.ByteEncoder(data),
//Partition: 1,
})
if err != nil {
fmt.Println("SendMessage:", err)
return
}
fmt.Printf("partition=%d , offset=%d, i=%d\n", partition, offset, i)
time.Sleep(2 * time.Second)
}
}
func NewProduct(addrs []string) error {
config := sarama.NewConfig()
// 异步生产者不建议把Return的 Errors 和 Successes 都开启,一般开启 Errors 就行
// 同步生产者的Return就必须都开启,因为会同步返回发送成功或者失败
config.Producer.Return.Successes = true // 成功交付的消息将在success_channel返回
//config.Producer.Partitioner = NewHashPartitioner
//config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Partitioner = sarama.NewRandomPartitioner
client, err := sarama.NewClient(addrs, config)
if err != nil {
return err
}
product, err = sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
return nil
}
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/Shopify/sarama"
)
func main() {
k := NewKafka([]string{"localhost:9092"}, []string{"audit"})
c := k.Connect()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigterm:
fmt.Println("terminating: via signal")
}
c()
}
type Kafka struct {
brokers []string
topics []string
startOffset int64
version string
ready chan struct{}
group string
channelBufferSize int
assignor string
}
func NewKafka(brokers []string, topics []string) *Kafka {
return &Kafka{
brokers: brokers,
topics: topics,
group: "grp1",
channelBufferSize: 1000,
ready: make(chan struct{}),
version: "2.8.0",
assignor: "range",
}
}
func (k *Kafka) Connect() func() {
version, err := sarama.ParseKafkaVersion(k.version)
if err != nil {
log.Fatalf("Error parsing Kafka version: %v", err)
}
config := sarama.NewConfig()
config.Version = version
// 分区分配策略
//switch assignor {
//case "sticky":
// config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
//case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
//case "range":
// config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
//default:
// log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
//}
config.Consumer.Offsets.Initial = sarama.OffsetNewest
//config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.ChannelBufferSize = k.channelBufferSize // channel长度
// 创建client
newClient, err := sarama.NewClient(k.brokers, config)
if err != nil {
log.Fatal(err)
}
// 获取所有的topic
topics, err := newClient.Topics()
if err != nil {
log.Fatal(err)
}
fmt.Println("topics: ", topics)
// 根据client创建consumerGroup
client, err := sarama.NewConsumerGroupFromClient(k.group, newClient)
if err != nil {
log.Fatalf("Error creating consumer group client: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
fmt.Println("ff1")
if err := client.Consume(ctx, k.topics, k); err != nil {
// 当setup失败的时候,error会返回到这里
fmt.Printf("Error from consumer: %v\n", err)
return
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
log.Println(ctx.Err(), "::ctx")
return
}
fmt.Println("ff2")
}
}()
fmt.Println("Sarama consumer up and running!...")
// 保证在系统退出时,通道里面的消息被消费
return func() {
fmt.Println("kafka close")
cancel()
close(k.ready)
wg.Wait()
if err = client.Close(); err != nil {
fmt.Printf("Error closing client: %v\n", err)
}
}
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {
fmt.Println("setup")
fmt.Println("session.Claims: ", session.Claims())
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
// 具体消费消息
for message := range claim.Messages() {
select {
case _, ok := <-k.ready:
if !ok {
fmt.Println("!ok")
return nil
}
default:
}
fmt.Printf("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]\n",
message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
// 更新位移
time.Sleep(5 * time.Second)
session.MarkMessage(message, "")
fmt.Printf("-0-[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]\n",
message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
}
return nil
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)