kafka学习

kafka学习,第1张

kafka学习

kafka是一个在大数据领域应用非常广泛的消息中间件,可以用来做流量削峰、消息缓存等,具有吞吐量大,分区顺序性等特征,本文用于记录自己在学习kafka中的心得体会。


首先说明一下kafka中的常见概念:

    broker

    因为kafka是一个分布式的消息中间件,消息是分布在集群上的,broker就是集群中的机器。

    topic

    kafka中的消息是按照topic来进行分类的,生产者按照消息topic写入消息,消费者按照消息topic订阅消息。

    producer

    消息的生产者,生产者可以指定topic,向其中写入消息

    consumer

    消息消费者,不同的消费者可以订阅同一个topic进行独立地消费,互相不影响。这里需要注意一下消费者组(consumer group),处于同一个消费者组中的消费者对同一个topic中的消息的消费是互相影响的,同一条消息只能被一个消费者组消费一次。

    partition

    一个topic中的消息可能非常多,因此一个topic可以拥有多个partition,partition数量可以指定,同一个topic的partition可以分散在多个broker上。

    offset

    kafka中的消费者会对自己订阅的每个topic的每个partition维护一个当前的offset,offset是很有用的,比方说消费者在处理消息的时候出现失败,可以从记录中的offset进行重新读取(flink中的exactly once)。

    controller

    指kafka集群上的管理者,由broker选举出,负责集群中partition leader选举,容灾等事务。

    partition leader

    kafka为了进行容灾,保证消息不丢失,同一个partition会具有多个备份(replica)。因此,对于kafka中的每一个partition,都会选举出一个partition leader来维护这个partition的备份,当partition leader出现故障的时候,partition leader会选择这个partition的某个replica来成为新的partition leader。


学习中出现的问题:

    在java api中,想要进行kafka读写消息的时候,需要传入broker-list,那么这个broker-list是何物,我需要将整个集群中的机器地址都填进去吗?

    首先对这个问题进行一下解释,如下面的代码中,我们需要先传入一个bootstrap.servers参数:

        public static void main(String[] args) throws InterruptedException {
            Properties props = new Properties();
            //设置kafka集群的地址
            props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
            //ack模式,all是最慢但最安全的
            props.put("acks", "-1");
            //失败重试次数
            props.put("retries", 0);
            //每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
            props.put("batch.size", 10);
            //props.put("max.request.size",10);
            //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
            props.put("linger.ms", 10000);
            //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
            //buffer.memory要大于batch.size,否则会报申请内存不足的错误
            props.put("buffer.memory", 10240);
            //序列化器
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer producer = new KafkaProducer<>(props);
            for (int i = 0; i < 100; i++)
                producer.send(new ProducerRecord("mytopic1", Integer.toString(i), "dd:"+i));
            //Thread.sleep(1000000);
            producer.close();
        }
    

这个问题在刚开始使用的时候令我非常困扰,后来我逐渐理解,bootstrap.servers我们一般会传入集群中的部分机器地址,对于其中某一台机器地址,客户端会向这台机器发出一个请求,询问kafka集群的metadata,得到metadata之后客户端就会去连接kafka集群,而我们多配置几个地址也是为了保持高可用,防止某台机器失败而导致连接不上kafka集群。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5706406.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存