kafka是一个在大数据领域应用非常广泛的消息中间件,可以用来做流量削峰、消息缓存等,具有吞吐量大,分区顺序性等特征,本文用于记录自己在学习kafka中的心得体会。
首先说明一下kafka中的常见概念:
broker
因为kafka是一个分布式的消息中间件,消息是分布在集群上的,broker就是集群中的机器。
topic
kafka中的消息是按照topic来进行分类的,生产者按照消息topic写入消息,消费者按照消息topic订阅消息。
producer
消息的生产者,生产者可以指定topic,向其中写入消息
consumer
消息消费者,不同的消费者可以订阅同一个topic进行独立地消费,互相不影响。这里需要注意一下消费者组(consumer group),处于同一个消费者组中的消费者对同一个topic中的消息的消费是互相影响的,同一条消息只能被一个消费者组消费一次。
partition
一个topic中的消息可能非常多,因此一个topic可以拥有多个partition,partition数量可以指定,同一个topic的partition可以分散在多个broker上。
offset
kafka中的消费者会对自己订阅的每个topic的每个partition维护一个当前的offset,offset是很有用的,比方说消费者在处理消息的时候出现失败,可以从记录中的offset进行重新读取(flink中的exactly once)。
controller
指kafka集群上的管理者,由broker选举出,负责集群中partition leader选举,容灾等事务。
partition leader
kafka为了进行容灾,保证消息不丢失,同一个partition会具有多个备份(replica)。因此,对于kafka中的每一个partition,都会选举出一个partition leader来维护这个partition的备份,当partition leader出现故障的时候,partition leader会选择这个partition的某个replica来成为新的partition leader。
学习中出现的问题:
在java api中,想要进行kafka读写消息的时候,需要传入broker-list,那么这个broker-list是何物,我需要将整个集群中的机器地址都填进去吗?
首先对这个问题进行一下解释,如下面的代码中,我们需要先传入一个bootstrap.servers参数:
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //ack模式,all是最慢但最安全的 props.put("acks", "-1"); //失败重试次数 props.put("retries", 0); //每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端 props.put("batch.size", 10); //props.put("max.request.size",10); //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端 props.put("linger.ms", 10000); //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端 //buffer.memory要大于batch.size,否则会报申请内存不足的错误 props.put("buffer.memory", 10240); //序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord ("mytopic1", Integer.toString(i), "dd:"+i)); //Thread.sleep(1000000); producer.close(); }
这个问题在刚开始使用的时候令我非常困扰,后来我逐渐理解,bootstrap.servers我们一般会传入集群中的部分机器地址,对于其中某一台机器地址,客户端会向这台机器发出一个请求,询问kafka集群的metadata,得到metadata之后客户端就会去连接kafka集群,而我们多配置几个地址也是为了保持高可用,防止某台机器失败而导致连接不上kafka集群。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)