Apache pulsar producer接口详解

Apache pulsar producer接口详解,第1张

本文将介绍 pulsar 的核心功能,不会对设计细节以及与其他 MQ 进行对比

Apache Pulsar 是一个多租户、高性能的发布-订阅消息中间件, Pulsar 最初由雅虎开发。从 2018 年 9 月以来,它是 Apache 基金会的顶级项目之一,由 Apache 软件基金会管理。 更多历史背景参考 。截止当前发文, 项目仓库 已有 97k 个 star 和 451 位 contributions,目前最新版本为 281

和大多数消息中间件一样,Pulsar 也是建立在发布订阅模式上。在这种模式中,producer 向 broker 中的 topic 发送 message。consumer 订阅这些 topic 并处理传入的消息。不同之处在于 pulsar 中的 broker 不会物理存储 message,而是交给 apache bookkeeper 单独处理,broker 只是充当一个消息路由寻址的功能

从宏观角度来看,多个 Broker 节点组成一个 Pulsar Cluster;多个 Pulsar Cluster 组成一个 Pulsar Instance,Pulsar 通过 geo-replication 支持 Pulsar Instance 内在不同的集群发送和消费消息。在 pulsar 集群中:

Pulsar 中的 broker 是一个无状态的组件,主要负责两部分:

Producer 发送 Message 到 Topic 时,除了支持 sync、async 的发送方式之外,在与 Topic 连接上, 还增加了 Excusive、Share、WaitForExclusive 三种语义:

Consumer 通过 subscription 来连接到 Topic,每次通过发送 flow permit request 给 Broker 来获取 Message,同时将 Message 放入本地维护的一个 buffer queue (默认大小为 1000)队列来缓冲,每次 consumerreceive() 调用时,将从 buffer queue 获取消息

类似的, consumer 也支持两个接收消息的方式:sync、async

Topic 在 pulsar 中被当做是一个渠道来传输 Producer 和 Conusmer 之间的消息。一个完整的 Topic 结构为:{persistent|non-persistent}//tenant/namespace/topic
由 4 部分构成:

每条消息在消息的日志上都有一个偏移量 (offset),Pulsar 使用 Subscrtion 来跟踪这个偏移量(offset),通过 Subscrtion 可以控制 Consumer 消费消息的方式,在 pulsar 中提供了四种消费方式: exclusive 、shared、 failover, and key_shared

即独占模式,在独占模式下,只允许一个 consumer 连接到 Subscription 上。如果多个 consumer 使用同一个 Subscription 订阅一个 Topic ,则会发生错误

共享模式,允许多个 consumer 使用同一个 Subscription 订阅 Topic,Message 将会循环的发给多个 Consumer,

按 key 共享,与 Shared 模式相同,允许使用同一个 Subscription 订阅 Topic,不同的是 Message 在 Consumer 的消费中,具有相同 key 或 orderingKey 的 Message 只传递给同一个 Consumer。无论消息被重新传递多少次,它都会传递给同一个消费者(必须指定 orderingKey 或 key)

普通的 Topic 仅由一个 Broker 处理,但单个 Broker 处理瓶颈将限制 Topic 的最大吞吐量,Partitioned Topic 分区主题通过多 Broker 并发处理来提高了 Topic 的吞吐量

分区主题实际上在 pulsar 中是由多个内部主题构成,每个主题归属于某一个 Broker 上,message 和对应 broker 上的内部主题路由由 pulsar 来维护

Topic1 主题有五个分区(P0 到 P4),分布在三个 Broker 上。因为分区比 Broker 多,两个 Broker 一个处理两个分区,而第三个只处理一个(同样,Pulsar 会自动处理这种分区的分布)

通常将 producer 发送 message 到 parition topic 的消息路由方式和消费者通过 Subscription 订阅 Topic 分开讨论,分区消息路由方式决定了吞吐量的高低;而 consumer 通过 subscription 订阅消费者则由应用程序业务来决定

分区主题和普通主题在 Subscription 一个 Topic 上没有区别,,因为分区仅仅决定消息从生产者发布到消费者处理和 ACK 确认之间逻辑

消息的顺序与消息路由模式和消息的 key 有关,通常需要保证具有相同 key 的消息在同一个 topic 中具有顺序性。如果消息中带有 key 属性,那么这个消息在 RoundRobinPatition 和 SinglePartition 路由模式中将具有顺序性

通过对 Pulsar 组成的一些功能模块进行简单介绍来大致了解其是如何运转的(producer->broker->consumer)、由那些组件构成的(pulsar-instance、zookeeper、bookker),以及有哪些功能特性(route mode、subscription、partition ),后面将通过源码层来深度对一些优秀的设计进行剖析

pulsar

关注 pulsar 项目将近10天左右了,在此前,我和所有刚接触 pulsar 项目的人都有一个共同的疑惑,目前社区内已经有activeMQ、kafka、RocktMQ等优秀的消息队列,而且也经历过社区长时间的洗礼,我们为什么还需要 pulsar 这个MQ?带着这样的疑问,我们开始阅读这篇文章。

记得在之前项目中遇到几个kafka的痛点问题,第一:在数据迁移过程中,我们需要从上游到下游的数据必须是有序的,简单点说就是,我们期望DDL跑到DML的前面,不然当我们DML语句到达之后,我们都不知道数据应该写入到哪个DB(因为此时压根还没有db和table)。熟悉kafka的人都知道,kafka的partition内是有序的状态,但多个partition之间是无序的,最简单的逻辑,我们将所有的数据全部写入一个partition中,让consumer订阅这个partition,但是这其中存在一个较大的问题,kafka这种基于partition的MQ,本身就是通过partition来提高吞吐量,我们的业务是在迁移数据,全量迁移时,瞬时数据会很大,同时塞到一个partition中,势必会使这个partition成为整个系统的热点问题。第二:当时候,我们内部压测过kafka,印象中,当时候的压测是分为8、32、64topic,当时候我们发现,当topic超过64时,kafka的吞吐出现明显的抖动,也就是kafka并不能够单纯的通过增加topic的数量来提升吞吐量。这个64topic性能抖动问题,与RocketMQ和kafka对比压测的现象基本一致。

我们前面也提到过,kafka和rocketMQ这种MQ是基于partition的存储模型。也就是说,他们的存储和partition是绑定的,一个partition只能由一个consumer来订阅消费,这就存在一个问题,partition和consumer究竟怎么设置,原则上,我们需要始终保持这两者是对等的关系,如果partition过多,就可能存在一个consumer订阅多个partition的问题,反过来,如果consumer过多,就会出现consumer空闲的问题。那么合理的分配策略只有是 partition 个数与 consumer 个数成倍数关系。

在kafka中最小的单位是segment,一个segment分为两个文件,一个index文件用来存储元数据信息,一个log文件用来存储数据信息,也就意味着一个partition至少需要两个fd,kafka还需要将这些元数据信息注册到zk中,方便整个kafka集群的监控调度。所以在kafka中,partition是一个相对比较重的资源,我们当时候的压测,1000左右的partition 会给kafka系统带来将近40-60ms左右的延迟(大致值)。kafka consumer静态绑定的这种架构设计导致了其扩容缩容也是一个问题。当系统遇到处理能力不足时,单纯的增加consumer并不能解决问题,我们还需要增加其对应的partition 的数量。但是,比较蛋疼的另一个问题是,partition是单向扩容的,意思是我们一旦增加上去partition的数量,我们是不能对其减少的。

这里再回答当时候我们遇到的上述提到的第一个痛点问题,如果我们只用一个partition来保证全局的顺序问题,当数据量较大的时候,这个partition会迅速增长,这个时候,即使你再想通过扩充partition的数量来提升系统的吞吐,也不是很现实来,这种强负载在kafka中简直是一个灾难。我们假设我们增加进来两个partition,但是我之前的partition已经被其中一个consumer订阅,所以那些堆积的消息只能由这个consumer来消费,这种不支持横向扩展的设计一旦遇到真的瓶颈真的有点难受,或者你就选择强制负载均衡。

针对上述的一些问题 ,pulsar是如何解决的呢?

通过上图,我们可以分析到,kafka等基于partition存储的mq存储起来是"一条一条"的,pulsar会均匀摊开。这点得意于pulsar将存储与计算分离的架构设计,我们可以方便的在存储层和计算层进行扩容。具体参考: Pulsar VS Kafka(2): 以Segment为中心的架构

上图表述了pulsar在多租户场景下的组成架构。

到了具体的业务线之后,我们就可以和正常使用mq一样对其执行相应的 *** 作。

exclusive 和 failover 适合对顺序有要求的业务场景,一般我们称作Stream(流)
shared 适合对消息顺序没什么要求的业务场景,一般我们称作Queue(队列)

下面是搜集的一些优秀的关于pulsar相关的技术文章:

如果您是一名企业架构师,您可能听说过微服务架构,并使用过它。虽然您过去可能使用REST作为服务通信层,但是越来越多的项目正在转向事件驱动的体系结构。让我们深入了解这种流行架构的优缺点、它所包含的一些关键设计选择以及常见的反模式。

什么是事件驱动的微服务体系结构

在事件驱动的体系结构中,当服务执行其他服务可能感兴趣的某些工作时,该服务将生成一个事件—执行 *** 作的记录。其他服务使用这些事件,以便它们能够执行由于该事件而需要的任何自己的任务。与REST不同,创建请求的服务不需要知道使用请求的服务的详细信息。

这里有一个简单的例子:当一个订单被放置在一个电子商务网站,一个单一的“订单放置”事件产生,然后被几个微服务消费:

1order服务,它可以向数据库写入一个order记录。

2客户服务,它可以创建客户记录。

3支付服务,它可以处理支付。

事件可以以多种方式发布。例如,可以将它们发布到保证将事件交付给适当使用者的队列中,也可以将它们发布到发布事件并允许访问所有相关方的“发布/订阅”模型流中。在这两种情况下,生产者发布事件,消费者接收该事件,并做出相应的反应。注意,在某些情况下,这两个角色还可以称为发布者(生产者)和订阅者(消费者)。

为什么使用事件驱动的体系结构

与REST相比,事件驱动架构提供了以下几个优点:

异步——基于事件的架构是异步的,没有阻塞。这使得资源可以在他们的工作单元完成后自由地转移到下一个任务,而不用担心之前发生了什么或者接下来会发生什么。它们还允许对事件进行排队或缓冲,从而防止使用者向生产者施加压力或阻塞它们。

•松耦合——服务不需要(也不应该)知道或依赖于其他服务。在使用事件时,服务独立运行,不了解其他服务,包括其实现细节和传输协议。事件模型下的服务可以独立地、更容易地更新、测试和部署。

•易于扩展——由于服务在事件驱动的体系结构下解耦,而且服务通常只执行一项任务,因此跟踪特定服务的瓶颈,并对该服务(且仅对该服务)进行扩展变得很容易。

•恢复支持——带有队列的事件驱动架构可以通过“重播”过去的事件来恢复丢失的工作。当用户需要恢复时,这对于防止数据丢失非常有用。

当然,事件驱动的架构也有缺点。通过分离紧密耦合时可能更简单的关注点,它们很容易过度设计;它们可能需要大量的前期投资;而且常常导致基础设施、服务契约或模式、多语言构建系统和依赖关系图的额外复杂性。

也许最大的缺点和挑战是数据和事务管理。由于事件驱动模型的异步性,它们必须小心处理服务之间不一致的数据、不兼容的版本、监视重复的事件,并且通常不支持ACID事务,而不支持最终的一致性,因为后者更难以跟踪或调试。

即使有这些缺点,事件驱动的体系结构通常也是企业级微服务系统的更好选择。主要的优点是可伸缩的、松散耦合的、开发人员 *** 作友好的。

何时使用REST

然而,有时REST/web接口可能仍然更可取:

•您需要一个异步请求/应答接口。

•您需要对强事务的支持。

•您的API对公众可用。

•您的项目很小(REST的设置和部署要简单得多)。

您最重要的设计选择—消息传递框架

一旦决定了事件驱动的体系结构,就该选择事件框架了。事件生成和使用的方式是系统中的一个关键因素。目前已有数十种经过验证的框架和选择,选择正确的框架需要时间和研究。

分俩个大类: 消息处理或流处理。

消息处理

在传统的消息处理中,组件创建消息,然后将其发送到特定的(通常是单个的)目的地。一直处于空闲状态并等待的接收组件接收消息并相应地执行 *** 作。通常,当消息到达时,接收组件执行单个流程。然后,删除消息。

消息处理体系结构的一个典型例子是消息队列。尽管大多数较新的项目使用流处理(如下所述),但是使用消息(或事件)队列的体系结构仍然很流行。消息队列通常使用代理的“存储和转发”系统,事件在此系统中从一个代理传递到另一个代理,直到它们到达适当的使用者。ActiveMQ和RabbitMQ是消息队列框架的两个流行示例。这些项目都有多年的实践经验和成熟的技术社区。

流处理

另一方面,在流内处理中,组件在达到某个状态时发出事件。其他感兴趣的组件在事件流中侦听这些事件并作出相应的反应。事件不针对特定的收件人,而是对所有感兴趣的组件可用。

在流内处理中,组件可以同时对多个事件作出反应,并对多个流和事件应用复杂的 *** 作。有些流包括持久性,即事件在流上停留的时间可以根据需要延长。

通过流处理,系统可以重现事件的 历史 ,在事件发生后联机并仍然对其作出反应,甚至执行滑动窗口计算。例如,它可以从每秒的事件流计算每分钟的平均CPU使用量。

最流行的流处理框架之一是Apache Kafka。Kafka是许多项目使用的成熟和稳定的解决方案。它可以被认为是一种工业强度的流处理解决方案。Kafka有一个庞大的用户群、一个有用的社区和一个改进的工具集。

其他的选择

还有其他框架提供流和消息处理的组合,或者提供它们自己独特的解决方案。例如,Apache的最新产品Pulsar是一个开源的发布/订阅消息系统,它支持流和事件队列,所有这些都具有极高的性能。Pulsar的特点是丰富的-它提供多租户和地理复制-因此复杂。据说Kafka的目标是高吞吐量,而脉冲星的目标是低延迟。

NATS是另一种具有“合成”队列的发布/订阅消息系统。NATS是为发送小而频繁的信息而设计的。它提供了高性能和低延迟;然而,NATS认为某种程度的数据丢失是可以接受的,优先考虑性能而不是交付保证。

其他的设计考虑

一旦你选择了你的事件框架,这里有几个其他的挑战需要考虑:

•Event Sourcing

很难实现松耦合服务、不同的数据存储和原子事务的组合。一个可能有所帮助的模式是事件源。在事件源中,从来不直接对数据执行更新和删除;相反,实体的状态更改被保存为一系列事件。

•CQRS

上面的事件来源引入了另一个问题:由于需要从一系列事件构建状态,查询可能会很慢,而且很复杂。命令查询责任隔离(CQRS)是一种设计解决方案,它为插入 *** 作和读取 *** 作调用单独的模型。

•事件发现

事件驱动体系结构中最大的挑战之一是对服务和事件进行编目。在哪里可以找到事件描述和详细信息事件发生的原因是什么是哪个团队创造了这个活动他们在积极地工作吗

•应对变化

事件模式会改变吗如何在不破坏其他服务的情况下更改事件模式随着服务和事件数量的增长,如何回答这些问题变得至关重要。

成为一个好的事件消费者意味着要为变化的模式编码。成为一个好的事件生产者意味着要认识到模式更改如何影响其他服务,并创建经过良好设计的事件,这些事件被清楚地记录下来。

•内部部署vs托管部署

无论您的事件框架是什么,您还需要在自行部署框架(消息代理的 *** 作并不简单,特别是在高可用性的情况下),还是使用托管服务(如Heroku上的Apache Kafka)之间做出选择。

反模式

与大多数体系结构一样,事件驱动的体系结构具有自己的一组反模式。以下是一些需要注意的地方:

设计过多的事件

注意不要对创建事件过于兴奋。创建太多的事件将在服务之间创建不必要的复杂性,增加开发人员的认知负担,增加部署和测试的难度,并导致事件使用者的拥塞。不是每个方法都需要是一个事件。

通用的事件

不要使用通用事件,无论是在名称中还是在目的上。您希望其他团队了解您的事件为何存在、应该用于什么以及应该在什么时候使用。事件应该有特定的目的,并相应地命名。事件与通用名称或通用事件与混乱的旗帜,导致问题。

复杂的依赖关系图

注意那些相互依赖的服务,并创建复杂的依赖关系图或反馈循环。每个网络跳都会给原始请求增加额外的延迟,特别是离开数据中心的南北网络流量。

这取决于保证的订单、交付或副作用

事件是异步的;因此,包含顺序或重复的假设不仅会增加复杂性,而且会抵消基于事件的体系结构的许多关键优点。如果使用者有副作用,例如在数据库中添加值,则可能无法通过重播事件进行恢复。

过早优化

大多数产品一开始很小,然后随着时间的推移而增长。虽然您可能梦想将来需要扩展到大型复杂组织,但是如果您的团队很小,那么事件驱动架构的额外复杂性实际上可能会降低您的速度。相反,考虑使用简单的体系结构来设计系统,但是要包含必要的关注点分离,以便您可以随着需求的增长将其替换掉。

期望事件驱动来修复所有问题

在较低的技术级别上,不要期望事件驱动的体系结构能够修复所有的问题。虽然这种体系结构肯定可以改进许多技术功能障碍的领域,但它不能解决核心问题,比如缺乏自动化测试、缺乏团队沟通或过时的开发-ops实践。

理解事件驱动架构的优缺点,以及它们最常见的一些设计决策和挑战,是创建尽可能好的设计的重要部分。

首先,学习大数据是需要有java,python和R语言的基础。
1) Java学习到什么样的程度才可以学习大数据呢
java需要学会javaSE即可。javaweb,javaee对于大数据用不到。学会了javase就可以看懂hadoop框架。
2) python是最容易学习的,难易程度:python java Scala 。
python不是比java更直观好理解么,因为会了Python 还是要学习java的,你学会了java,再来学习python会很简单的,一周的时间就可以学会python。
3) R语言也可以学习,但是不推荐,因为java用的人最多,大数据的第一个框架Hadoop,底层全是Java写的。就算学会了R还是看不懂hadoop。
java在大数据中的作用是构成大数据的语言,大数据的第一个框架Hadoop以及其他大数据技术框架,底层语言全是Java写的,所以推荐首选学习java
大数据开发学习路线:
第一阶段:Hadoop生态架构技术
1、语言基础
Java:多理解和实践在Java虚拟机的内存管理、以及多线程、线程池、设计模式、并行化就可以,不需要深入掌握。
Linux:系统安装、基本命令、网络配置、Vim编辑器、进程管理、Shell脚本、虚拟机的菜单熟悉等等。
Python:基础语法,数据结构,函数,条件判断,循环等基础知识。
2、环境准备
这里介绍在windows电脑搭建完全分布式,1主2从。
VMware虚拟机、Linux系统(Centos65)、Hadoop安装包,这里准备好Hadoop完全分布式集群环境。
3、MapReduce
MapReduce分布式离线计算框架,是Hadoop核心编程模型。
4、HDFS10/20
HDFS能提供高吞吐量的数据访问,适合大规模数据集上的应用。
5、Yarn(Hadoop20)
Yarn是一个资源调度平台,主要负责给任务分配资源。
6、Hive
Hive是一个数据仓库,所有的数据都是存储在HDFS上的。使用Hive主要是写Hql。
7、Spark
Spark 是专为大规模数据处理而设计的快速通用的计算引擎。
8、SparkStreaming
Spark Streaming是实时处理框架,数据是一批一批的处理。
9、SparkHive
Spark作为Hive的计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算,可以提高Hive查询的性能。
10、Storm
Storm是一个实时计算框架,Storm是对实时新增的每一条数据进行处理,是一条一条的处理,可以保证数据处理的时效性。
11、Zookeeper
Zookeeper是很多大数据框架的基础,是集群的管理者。
12、Hbase
Hbase是一个Nosql数据库,是高可靠、面向列的、可伸缩的、分布式的数据库。
13、Kafka
kafka是一个消息中间件,作为一个中间缓冲层。
14、Flume
Flume常见的就是采集应用产生的日志文件中的数据,一般有两个流程。
一个是Flume采集数据存储到Kafka中,方便Storm或者SparkStreaming进行实时处理。
另一个流程是Flume采集的数据存储到HDFS上,为了后期使用hadoop或者spark进行离线处理。
第二阶段:数据挖掘算法
1、中文分词
开源分词库的离线和在线应用
2、自然语言处理
文本相关性算法
3、推荐算法
基于CB、CF,归一法,Mahout应用。
4、分类算法
NB、SVM
5、回归算法
LR、DecisionTree
6、聚类算法
层次聚类、Kmeans
7、神经网络与深度学习
NN、Tensorflow
以上就是学习Hadoop开发的一个详细路线,如果需要了解具体框架的开发技术,可咨询加米谷大数据老师,详细了解。
学习大数据开发需要掌握哪些技术呢?
(1)Java语言基础
Java开发介绍、熟悉Eclipse开发工具、Java语言基础、Java流程控制、Java字符串、Java数组与类和对象、数字处理类与核心技术、I/O与反射、多线程、Swing程序与集合类
(2)HTML、CSS与Java
PC端网站布局、HTML5+CSS3基础、WebApp页面布局、原生Java交互功能开发、Ajax异步交互、jQuery应用
(3)JavaWeb和数据库
数据库、JavaWeb开发核心、JavaWeb开发内幕
Linux&Hadoop生态体系
Linux体系、Hadoop离线计算大纲、分布式数据库Hbase、数据仓库Hive、数据迁移工具Sqoop、Flume分布式日志框架
分布式计算框架和Spark&Strom生态体系
(1)分布式计算框架
Python编程语言、Scala编程语言、Spark大数据处理、Spark—Streaming大数据处理、Spark—Mlib机器学习、Spark—GraphX 图计算、实战一:基于Spark的推荐系统(某一线公司真实项目)、实战二:新浪网(>

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/dianzi/13302530.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-07-11
下一篇 2023-07-11

发表评论

登录后才能评论

评论列表(0条)

保存