golang驱动kafka

golang驱动kafka,第1张

概述kafka简介 kafka是一种高吞吐量的分布式发布订阅消息系统, 特点 通过O(1)的磁盘数据结构提供消息的持久,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量,即使是非常普通的硬件,kafka也可以支持每秒数百万的消息. 支持通过kafka服务器和消费机集群来分区消息 支持hadoop并行数据加载. kafka组成部分 Broker kafka集群包含一个或多个服务器 kafka简介

kafka是一种高吞吐量的分布式发布订阅消息系统,

特点 通过O(1)的磁盘数据结构提供消息的持久,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量,即使是非常普通的硬件,kafka也可以支持每秒数百万的消息. 支持通过kafka服务器和消费机集群来分区消息 支持hadoop并行数据加载. kafka组成部分 broker

kafka集群包含一个或多个服务器,这种服务器被称为broker

topic

每条发布到kafka集群的消息都有一个类别,这个类别被称为topic.(物理上不同topic的消息分开存储. 逻辑上topic的消息虽然保存于一个或多个broker上,但是用户只需要指定消息的topic,即可生产或消费数据,而不同关心数据存于何处)

Partition

Partition是物理上的概念,每个topic包含一个或多个Partition

Producer

负责发布消息到Kafka broker

Consumer

消息消费者,向kafka broker读取消息的客户端

Consumer Group

每个Consumer属于一个特定的Consumer Group(可以每个consumer指定group name,若不指定group name,则属于默认的group)

golang驱动kafka
go get github.com/Shopify/Sarama

这个是github.com上开源的一个kafka驱动包.在下载这个包的过程中,会附带的下载几个依赖包.

示例代码:

package mainimport (    "fmt"    "log"    "os"    "strings"    "sync"    "github.com/Shopify/Sarama")var (    // kafka 服务器地址,以及端口号,这里可以指定多个地址,使用逗号分隔开即可.    kafka = "172.168.173.55:9092"    wg     sync.WaitGroup    logger = log.New(os.Stderr,"[srama]",log.LstdFlags))func main() {    Sarama.Logger = logger    // 连接kafka消息服务器    consumer,err := Sarama.NewConsumer(strings.Split(kafka,","),nil)    if err != nil {        logger.Println("Failed to start consumer: %s",err)    }    // consumer.Partitions 用户获取topic上所有的Partitions. 消息服务器上已经创建了test这个topic,所以,在这里指定参数为test.    partitionList,err := consumer.Partitions("test")    if err != nil {        logger.Println("Failed to get the List of partitions: ",err)    }    for partition := range partitionList {        pc,err := consumer.ConsumePartition("test",int32(partition),Sarama.OffsetNewest)        if err != nil {            logger.Printf("Failed to start consumer for partition %d: %s\n",partition,err)        }        defer pc.Asyncclose()        wg.Add(1)        go func(Sarama.PartitionConsumer) {            defer wg.Done()            for msg := range pc.Messages() {                fmt.Println("message is :",msg)                fmt.Printf("Partition:%d,Offset:%d,Key:%s,Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))                fmt.Println()            }        }(pc)    }    wg.Wait()    logger.Println("Done consuming topic hello")    consumer.Close()}
总结

kafka是一个非常优秀的消息订阅发布系统. 在大型项目中,通过消息系统,有效的解耦各个系统,使各个系统信息方便的互联.

参考资料 百度百科 其他网络上的文章 总结

以上是内存溢出为你收集整理的golang驱动kafka全部内容,希望文章能够帮你解决golang驱动kafka所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1281369.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-09
下一篇 2022-06-09

发表评论

登录后才能评论

评论列表(0条)

保存