分布式消息队列kafka讲解二(CC++librdkafka客户端1021未完待续)

分布式消息队列kafka讲解二(CC++librdkafka客户端1021未完待续),第1张

分布式消息队列kafka讲解二(C/C++librdkafka客户端10/21未完待续)

文章目录
    • 1.生产者
      • 1)流程
      • 2)参数备注
      • 3)代码(c风格代码)
    • 2.消费者
      • 1)流程
      • 2)参数备注
      • 3)代码
    • 3.主题和分区
    • 4.日志存储
    • 5.深入服务器
    • 6.深入客户端
    • 7.可靠性探究
    • 8.kafka应用
    • 9.kafka监控
    • 10.kafka高级应用

  • 版本信息:

librdkafka版本0.11.6
Kafka版本2.11-2.3.00

1.生产者 1)流程

1.初始化配置
2.创建初始化kafka配置信息
3.根据broker修改kafka配置bootstrap.server
4.设置消息回调函数,这个消息回调函数的作用是显示将消息送到kafka是否成功或失败
5.创建生产者示例
6.创建主题(长生命周期)
7.CTRL+C会清空标准输入缓冲区
8.不断检测循环标准输入缓冲区数据

8.1如果有数据则去掉数据的换行符
8.2输入为空继续循环
8.3发送生产的消息(异步发送)
8.4若发送的消息放入消息队列失败,打印一句话并放入poll异步回调

9.回收工作

2)参数备注
1.bootstrap.servers:(注意:示例代码只给了一个broker)
1)该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。
2)注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka集群上
--------------------------------------
2.key.serializer 和 value.serializer:(这里是java的,java跟C/C++不同)
broker 端接收的消息必须以字节数组(byte[])的形式存在。代码清单2-1中生产者使用的KafkaProducer<String,String>和ProducerRecord<String,String>中的泛型<String,String>对应的就是消息中key和value的类型
--------------------------------------
3.topic  主题
--------------------------------------
4.brokerID  brokerID
3)代码(c风格代码)

#include 
#include 
#include 



#include "rdkafka.h"


static int run = 1;


static void stop (int sig) {
        run = 0;
        fclose(stdin); 
}



static void dr_msg_cb (rd_kafka_t *rk,
                       const rd_kafka_message_t *rkmessage, void *opaque) {
        if (rkmessage->err)
                fprintf(stderr, "%% Message delivery failed: %sn",
                        rd_kafka_err2str(rkmessage->err));
        else
                fprintf(stderr,
                        "%% Message delivered (%zd bytes, "
                        "partition %"PRId32")n",
                        rkmessage->len, rkmessage->partition);

        
}


//生产者:往固定的broker生产固定的topic
int main (int argc, char **argv) {
        //1.初始化配置
        rd_kafka_t *rk;         //生产者句柄
        rd_kafka_topic_t *rkt;  
        rd_kafka_conf_t *conf;  
        char errstr[512];       
        char buf[512];          
        const char *brokers;    
        const char *topic;      
        //检查参数
        
        if (argc != 3) {
                fprintf(stderr, "%% Usage: %s  n", argv[0]);
                return 1;
        }
        brokers = argv[1];      //获取固定的broker
        topic   = argv[2];      //获得主题参数



        //2.创建初始化kafka配置信息
        
        conf = rd_kafka_conf_new();


        //3.根据broker修改kafka配置bootstrap.server
        
        if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
                              errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%sn", errstr);
                return 1;
        }

        //4.设置消息回调函数,这个消息回调函数的作用是显示将消息送到kafka是否成功或失败
        
        rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);


        //5.创建生产者示例
        
        rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
        if (!rk) {
                fprintf(stderr,
                        "%% Failed to create new producer: %sn", errstr);
                return 1;
        }

        //6.创建主题(长生命周期)
        
        rkt = rd_kafka_topic_new(rk, topic, NULL);
        if (!rkt) {
                fprintf(stderr, "%% Failed to create topic object: %sn",
                        rd_kafka_err2str(rd_kafka_last_error()));
                rd_kafka_destroy(rk);
                return 1;
        }

        //7.CTRL+C会清空标准输入缓冲区
        
        signal(SIGINT, stop);

        fprintf(stderr,
                "%% Type some text and hit enter to produce messagen"
                "%% Or just hit enter to only serve delivery reportsn"
                "%% Press Ctrl-C or Ctrl-D to exitn");

        //8.不断检测循环标准输入缓冲区数据
        while (run && fgets(buf, sizeof(buf), stdin)) {
                size_t len = strlen(buf);

                //8.1如果有数据则去掉数据的换行符
                if (buf[len-1] == 'n') 
                        buf[--len] = '';

                //8.2输入为空继续循环
                if (len == 0) {
                        
                        rd_kafka_poll(rk, 0);
                        continue;
                }
                
                //8.3发送生产的消息(异步发送)
                //成功的化:会将消息放入消息队列,真正发消息到broker是守护进程分发消息的,发送成功或失败都会调用dr_msg_cb函数
                
        retry:
                //8.4若发送的消息放入消息队列失败,打印一句话并放入poll异步回调
                if (rd_kafka_produce(
                            
                            rkt,                                                //传入主题
                            
                            RD_KAFKA_PARTITION_UA,                              //
                            
                            RD_KAFKA_MSG_F_COPY,
                            
                            buf, len,
                            
                            NULL, 0,
                            
                            NULL) == -1) {
                        
                        fprintf(stderr,
                                "%% Failed to produce to topic %s: %sn",
                                rd_kafka_topic_name(rkt),
                                rd_kafka_err2str(rd_kafka_last_error()));

                        
                        if (rd_kafka_last_error() ==
                            RD_KAFKA_RESP_ERR__QUEUE_FULL) {
                                //队列大小被queue.buffering.max.messages限制
                                rd_kafka_poll(rk, 1000);//设置非阻塞回调
                                goto retry;
                        }
                } 
                else 
                {
                        fprintf(stderr, "%% Enqueued message (%zd bytes) "
                                "for topic %sn",
                                len, rd_kafka_topic_name(rkt));
                }//end of enqueue message


                
                rd_kafka_poll(rk, 0);
        }

        //9.回收工作
        
        fprintf(stderr, "%% Flushing final messages..n");
        //等待
        rd_kafka_flush(rk, 10*1000 );

        
        rd_kafka_topic_destroy(rkt);

        
        rd_kafka_destroy(rk);

        return 0;
}

2.消费者 1)流程 2)参数备注 3)代码 3.主题和分区 4.日志存储 5.深入服务器 6.深入客户端 7.可靠性探究 8.kafka应用 9.kafka监控 10.kafka高级应用

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4688686.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存