TDMQpulsar golang 快速入门教程

TDMQpulsar golang 快速入门教程,第1张

TDMQ/pulsar golang 快速入门教程 架构: pulsar 对比 kafka kafka

kafka 由 zookeeper 和 broker 集群注册, broker 集群负责计算和储存消息, zookeeper 为注册中心(Kafka2.8就能不依赖zookeeper独立运行了, 部署还是比较方便的)

pulsar

pulsar 比 kafka 的架构更为复杂, 部署也是更加复杂

pulsar 是计算储存分离架构, 计算使用 broker 集群(是无状态的) 储存使用 bookeeper 集群, broker 计算要使用 bookeeper 的数据都是要内置 bookeeper 客户端pulsar 的分离架构更具有伸缩性pulsar 至今最新版本还是需要依赖于 zookeeper pulsar 的 4 种消费模型 独占模式(Exclusive): 一个 topic 只能有一个消费者订阅, 多个消费者订阅就会报错灾备模式(Failover): 一个 topic 可以多个消费者订阅, 但是只有一个生效, 其他的作为容灾备份使用共享订阅(Shared): 最常用的订阅模式, 一个 topic 可以被多个消费者订阅, 每个消息轮询发给其中的一个消费者key_shared: 共享订阅 + key 限制, 每个消息只会发送给绑定相同 key 的消费者

这里一般使用共享订阅 shared

TDMQ 使用教程

pulsar 的搭建十分繁琐, 如果搭建单机版本的话就根本发挥不了 pulsar 的高可用的特性

下面的示例是基于 腾讯云的 TDMQ (底层是 pulsar) 共享订阅模式的的消息传输

1. 创建 虚拟消息队列 TDMQ

打开腾讯云控制台 -> 搜索 tdmq -> 进入消息队列 TDMQ -> 集群管理 -> 新建集群

新建的是虚拟集群,按量计费, 做测试开发一般够用的

如果你的开发服务器是再 vpc 内网里面就不建议接入公网, 更安全且拥有更高的带宽. 如果要接入公网你需提交申请

2. 创建命名空间

命名空间 -> 选择当前集群为新建的集群 -> 新建命名空间

输入命名空间的名称

选择消息的 TTL 默认 1 天, TTL 过期了消费者还是没有 ACK 该消息, 消息就会过期

消息保留策略一般就是消费及删除

3. 创建角色并授权 在 TDMQ Pulsar 版控制台的 角色管理 页面,选择地域和刚刚创建好的集群,单击新建进入新建角色页面。填写角色名称和说明,单击提交完成角色创建。进入 命名空间 页面,在刚刚创建的命名空间中,单击 *** 作列的配置权限进入命名空间的权限列表。在配置权限页面,单击添加角色,将刚刚创建的角色添加进来,分配生产和消费的权限。

在命名空间这里配置权限

添加角色

我们选择刚刚添加的角色, 然后基于 生产消息和消费消息的权限

我们可以只是将 2 个角色分开, 也能防止循环依赖

添加成功

4. 创建 topic

点击 topic -> 新建

设置 topic 名称

消息类型

各种消息类型区别如下

分区数: 分区能提升单个 topic 的吞吐量

编写代码: 生产者 Producer
package main

import (
	"context"
	"fmt"
	"log"
	"strconv"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

var (
	//TODO:角色权限的token
	token = "{token}"
	//TODO: 消息队列的id/命名空间名称/topic名称
	topic = "{pulsar-name}/{namespace-name}/{topic-name}"
)

//就是说消费者要先链接才行,
//1.游标的问题
//2.share模式问题
func main() {
	//1. client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               "{url}", //TODO: 更换为接入点地址(控制台集群管理页完整复制)
		Authentication:    pulsar.NewAuthenticationToken(token),
		OperationTimeout:  30 * time.Second,
		ConnectionTimeout: 30 * time.Second,
	})

	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	} else {
		fmt.Printf("ok=%#v\n", "ok")
	}
	defer client.Close()
	//------
	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: topic,
	})

	if err != nil {
		log.Fatal(err)
	}
	for i := 0; i < 100; i++ {
		_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
			Payload: []byte("a" + strconv.Itoa(i)),
		})
	}

	defer producer.Close()

	if err != nil {
		fmt.Println("Failed to publish message", err)
	}
	fmt.Println("Published message")
}

编写代码: 消费者 Consumer
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

var (
	//TODO:角色权限的token
	token = "{token}"
	//TODO: 消息队列的id/命名空间名称/topic名称
	topic = "{pulsar-name}/{namespace-name}/{topic-name}"
)

//就是说消费者要先链接才行,
//1.游标的问题
//2.share模式问题
func main() {
	//1. client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               "{url}", //TODO: 更换为接入点地址(控制台集群管理页完整复制)
		Authentication:    pulsar.NewAuthenticationToken(token),
		OperationTimeout:  30 * time.Second,
		ConnectionTimeout: 30 * time.Second,
	})

	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	} else {
		fmt.Printf("ok=%#v\n", "ok")
	}
	defer client.Close()
	//2. consumer -------
	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            topic,
		SubscriptionName: "{sub-name}", //TODO:消费者的名称,这里可以没有就新建
		//shared 类型
		Type: pulsar.Shared,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	for {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}

		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
			msg.ID(), string(msg.Payload()))

		consumer.Ack(msg)
	}
}

参数详解

上面的 todo 包含的位置都是需要自己填写删除的

{token}: 角色的授权

{pulsar-name}: 虚拟集群的id

{namespace-name}: 命名空间名称

{topic-name}: topic 名称

{url}: 访问地址

{sub-name}: 消费者的名称, 不存在将会新建

详情查看文档

https://cloud.tencent.com/document/product/1179/44814)

https://pulsar.staged.apache.org/docs/zh-CN

reference

Pulsar真的可以取代Kafka吗 - 知乎 (zhihu.com): https://zhuanlan.zhihu.com/p/370213273

pulsar 官网: https://pulsar.staged.apache.org/docs/zh-CN/standalone-docker

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/990448.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存