kafka批量导出导入数据

kafka批量导出导入数据,第1张

./bin/kafka-console-consumer.sh --bootstrap-server 10.5.3.204:9092,10.5.3.205:9092 --topic processJob_pkvspp_todo --consumer-property group.id=processjob_pkvspp_todo-null-consumer --from-beginning

./kafka-console-producer.sh --broker-list 10.5.3.206:9092 --topic yutao <three_phone_pkvspp_result.kfk.txt

./bin/kafka-console-consumer.sh --bootstrap-server 10.5.3.206:9092 --topic xk-vspp-tl-msvpp-in-qmswk --consumer-property group.id=xk-vspp-tl-msvpp-in-qmswk-qmswk_kfk_group-consumer

./bin/kafka-console-consumer.sh --bootstrap-server 10.5.3.204:9092,10.5.3.205:9092 --topic xk-bz-tl-in-chin --consumer-property group.id=xk-bz-dec-tl-in-chin-tool-consumer --consumer-property auto.offset.reset=earliest >bz-dec-tl-in-chin-data.kfk

首先明确说明Kafka不是数据库,它没有schema,也没有表,更没有索引。

1.它仅仅是生产消息流、消费消息流而已。从这个角度来说Kafka的确不像数据库,至少不像我们熟知的关系型数据库。

那么到底什么是数据库呢?或者说什么特性使得一个系统可以被称为数据库?经典的教科书是这么说的:数据库是提供 ACID 特性的,我们依次讨论下ACID。

1、持久性(durability)

我们先从最容易的持久性开始说起,因为持久性最容易理解。在80年代持久性指的是把数据写入到磁带中,这是一种很古老的存储设备,现在应该已经绝迹了。目前实现持久性更常见的做法是将数据写入到物理磁盘上,而这也只能实现单机的持久性。当演进到分布式系统时代后,持久性指的是将数据通过备份机制拷贝到多台机器的磁盘上。很多数据库厂商都有自己的分布式系统解决方案,如GreenPlum和Oracle RAC。它们都提供了这种多机备份的持久性。和它们类似,Apache Kafka天然也是支持这种持久性的,它提供的副本机制在实现原理上几乎和数据库厂商的方案是一样的。

2、原子性(atomicity)

数据库中的原子性和多线程领域内的原子性不是一回事。我们知道在Java中有AtomicInteger这样的类能够提供线程安全的整数 *** 作服务,这里的atomicity关心的是在多个线程并发的情况下如何保证正确性的问题。而在数据库领域,原子性关心的是如何应对错误或异常情况,特别是对于事务的处理。如果服务发生故障,之前提交的事务要保证已经持久化,而当前运行的事务要终止(abort),它执行的所有 *** 作都要回滚,最终的状态就好像该事务从未运行过那样。举个实际的例子,

第三个方法是采用基于日志结构的消息队列来实现,比如使用Kafka来做,如下图所示:

在这个架构中app仅仅是向Kafka写入消息,而下面的数据库、cache和index作为独立的consumer消费这个日志——Kafka分区的顺序性保证了app端更新 *** 作的顺序性。如果某个consumer消费速度慢于其他consumer也没关系,毕竟消息依然在Kafka中保存着。总而言之,有了Kafka所有的异质系统都能以相同的顺序应用app端的更新 *** 作,

3、隔离性(isolation)

在传统的关系型数据库中最强的隔离级别通常是指serializability,国内一般翻译成可串行化或串行化。表达的思想就是连接数据库的每个客户端在执行各自的事务时数据库会给它们一个假象:仿佛每个客户端的事务都顺序执行的,即执行完一个事务之后再开始执行下一个事务。其实数据库端同时会处理多个事务,但serializability保证了它们就像单独执行一样。举个例子,在一个论坛系统中,每个新用户都需要注册一个唯一的用户名。一个简单的app实现逻辑大概是这样的:

4、一致性(consistency)

最后说说一致性。按照Kelppmann大神的原话,这是一个很奇怪的属性:在所有ACID特性中,其他三项特性的确属于数据库层面需要实现或保证的,但只有一致性是由用户来保证的。严格来说,它不属于数据库的特性,而应该属于使用数据库的一种方式。坦率说第一次听到这句话时我本人还是有点震惊的,因为从没有往这个方面考虑过,但仔细想想还真是这么回事。比如刚才的注册用户名的例子中我们要求每个用户名是唯一的。这种一致性约束是由我们用户做出的,而不是数据库本身。数据库本身并不关心或并不知道用户名是否应该是唯一的。针对Kafka而言,这种一致性又意味着什么呢?Kelppmann没有具体展开,

希望能帮到你,谢谢!

发送消息的主要步骤

格式:每个消息是一个 ProducerRecord 对象, 必须指定 所属的 Topic和Value , 还可以指定Partition及Key

1:序列化 ProducerRecord

2:分区: 如指定Partition,不做任何事情;否则,Partitioner 根据key得到Partition 。生产者向哪个Partition发送

3:消息添加到相应 bach中 ,独立线程将batch 发到Broker上

4:broker收到消息响应 。 成功回RecordMetaData对象 ,包含了Topic信息、Patition信息、消息在Partition中的Offset信息; 失败返回错误

有序场景:不建议retries  0。可max.in.flight.requests.per.connection  1, 影响生产者吞吐量,但保证有序          ps: 同partition消息有序

三个 必选 的属性:

(1) bootstrap.servers ,broker地址清单

(2) key.serializer: 实现org.apache.kafka.common.serialization.Serializer接口的类,key序列化成字节数组。注意: 必须被设置,即使没指定key

(3)value.serializer, value序列化成字节数组

同步发送消息

异步发送消息

(1)acks: 指定多少partition副本收到消息,生产者才会认为写成功

        0,不需等待服务器的响应,吞吐量高,如broker没有收到,生产者不知道

        1,leader partition收到消息,一个即成功

        all,所有partition都收到,才成功,leader和follower共同应答

(2)buffer.memory, 生产者内 缓存区域大小

(3)compression.type ,默认不压缩,设置成snappy、gzip或lz4对发送给broker压缩

(4)retries, 重发消息的次数

(5)batch.size, 发送同一partition消息会先存储在batch中,该参数指定一个batch内存大小,单位byte。不一定填满才发送

(6)linger.ms ,批次时间,batch被填满或者linger.ms达到上限,就把batch中的消息发送出去

(7)max.in.flight.requests.per.connection, 生产者在收到服务器响应之前可以发送的消息个数

创建ProducerRecord时,必须 指定序列化器 ,推荐序列化框架Avro、Thrift、ProtoBuf等

用 Avro 之前,先定义schema(通常用 JSON 写)

(1)创建一个类代表客户,作为消息的value

(2)定义schema

(3)生成Avro对象发送到Kafka

ProducerRecord包含Topic、value,key默认null,ey的两个作用:1)附加信息    2)被写到Topic的哪个partition

key  null ,默认partitioner, RoundRobin均衡分布

key不空,hash进行散列 ,不改变partition数量(永远不加),key和partition映射不变。

自定义paritioner 需实现Partitioner接口


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/6421976.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-21
下一篇 2023-03-21

发表评论

登录后才能评论

评论列表(0条)

保存