kafka 由 zookeeper 和 broker 集群注册, broker 集群负责计算和储存消息, zookeeper 为注册中心(Kafka2.8就能不依赖zookeeper独立运行了, 部署还是比较方便的)
pulsarpulsar 比 kafka 的架构更为复杂, 部署也是更加复杂
pulsar 是计算储存分离架构, 计算使用 broker 集群(是无状态的) 储存使用 bookeeper 集群, broker 计算要使用 bookeeper 的数据都是要内置 bookeeper 客户端pulsar 的分离架构更具有伸缩性pulsar 至今最新版本还是需要依赖于 zookeeper pulsar 的 4 种消费模型 独占模式(Exclusive): 一个 topic 只能有一个消费者订阅, 多个消费者订阅就会报错灾备模式(Failover): 一个 topic 可以多个消费者订阅, 但是只有一个生效, 其他的作为容灾备份使用共享订阅(Shared): 最常用的订阅模式, 一个 topic 可以被多个消费者订阅, 每个消息轮询发给其中的一个消费者key_shared: 共享订阅 + key 限制, 每个消息只会发送给绑定相同 key 的消费者这里一般使用共享订阅 shared
TDMQ 使用教程pulsar 的搭建十分繁琐, 如果搭建单机版本的话就根本发挥不了 pulsar 的高可用的特性
下面的示例是基于 腾讯云的 TDMQ (底层是 pulsar) 共享订阅模式的的消息传输
1. 创建 虚拟消息队列 TDMQ打开腾讯云控制台 -> 搜索 tdmq -> 进入消息队列 TDMQ -> 集群管理 -> 新建集群
新建的是虚拟集群,按量计费, 做测试开发一般够用的
2. 创建命名空间如果你的开发服务器是再 vpc 内网里面就不建议接入公网, 更安全且拥有更高的带宽. 如果要接入公网你需提交申请
命名空间 -> 选择当前集群为新建的集群 -> 新建命名空间
输入命名空间的名称
选择消息的 TTL 默认 1 天, TTL 过期了消费者还是没有 ACK 该消息, 消息就会过期
消息保留策略一般就是消费及删除
3. 创建角色并授权 在 TDMQ Pulsar 版控制台的 角色管理 页面,选择地域和刚刚创建好的集群,单击新建进入新建角色页面。填写角色名称和说明,单击提交完成角色创建。进入 命名空间 页面,在刚刚创建的命名空间中,单击 *** 作列的配置权限进入命名空间的权限列表。在配置权限页面,单击添加角色,将刚刚创建的角色添加进来,分配生产和消费的权限。在命名空间这里配置权限
添加角色
我们选择刚刚添加的角色, 然后基于 生产消息和消费消息的权限
我们可以只是将 2 个角色分开, 也能防止循环依赖
添加成功
4. 创建 topic点击 topic -> 新建
设置 topic 名称
消息类型
各种消息类型区别如下
分区数: 分区能提升单个 topic 的吞吐量
编写代码: 生产者 Producerpackage main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
var (
//TODO:角色权限的token
token = "{token}"
//TODO: 消息队列的id/命名空间名称/topic名称
topic = "{pulsar-name}/{namespace-name}/{topic-name}"
)
//就是说消费者要先链接才行,
//1.游标的问题
//2.share模式问题
func main() {
//1. client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "{url}", //TODO: 更换为接入点地址(控制台集群管理页完整复制)
Authentication: pulsar.NewAuthenticationToken(token),
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
} else {
fmt.Printf("ok=%#v\n", "ok")
}
defer client.Close()
//------
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
})
if err != nil {
log.Fatal(err)
}
for i := 0; i < 100; i++ {
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("a" + strconv.Itoa(i)),
})
}
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
}
编写代码: 消费者 Consumer
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
var (
//TODO:角色权限的token
token = "{token}"
//TODO: 消息队列的id/命名空间名称/topic名称
topic = "{pulsar-name}/{namespace-name}/{topic-name}"
)
//就是说消费者要先链接才行,
//1.游标的问题
//2.share模式问题
func main() {
//1. client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "{url}", //TODO: 更换为接入点地址(控制台集群管理页完整复制)
Authentication: pulsar.NewAuthenticationToken(token),
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
} else {
fmt.Printf("ok=%#v\n", "ok")
}
defer client.Close()
//2. consumer -------
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "{sub-name}", //TODO:消费者的名称,这里可以没有就新建
//shared 类型
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
}
参数详解
上面的 todo 包含的位置都是需要自己填写删除的
{token}: 角色的授权
{pulsar-name}: 虚拟集群的id
{namespace-name}: 命名空间名称
{topic-name}: topic 名称
{url}: 访问地址
{sub-name}: 消费者的名称, 不存在将会新建
详情查看文档https://cloud.tencent.com/document/product/1179/44814)
https://pulsar.staged.apache.org/docs/zh-CN
referencePulsar真的可以取代Kafka吗 - 知乎 (zhihu.com): https://zhuanlan.zhihu.com/p/370213273
pulsar 官网: https://pulsar.staged.apache.org/docs/zh-CN/standalone-docker
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)