- 1.生产者
- 1)流程
- 2)参数备注
- 3)代码(c风格代码)
- 2.消费者
- 1)流程
- 2)参数备注
- 3)代码
- 3.主题和分区
- 4.日志存储
- 5.深入服务器
- 6.深入客户端
- 7.可靠性探究
- 8.kafka应用
- 9.kafka监控
- 10.kafka高级应用
- 版本信息:
1.生产者 1)流程librdkafka版本0.11.6
Kafka版本2.11-2.3.00
1.初始化配置
2.创建初始化kafka配置信息
3.根据broker修改kafka配置bootstrap.server
4.设置消息回调函数,这个消息回调函数的作用是显示将消息送到kafka是否成功或失败
5.创建生产者示例
6.创建主题(长生命周期)
7.CTRL+C会清空标准输入缓冲区
8.不断检测循环标准输入缓冲区数据
8.1如果有数据则去掉数据的换行符
8.2输入为空继续循环
8.3发送生产的消息(异步发送)
8.4若发送的消息放入消息队列失败,打印一句话并放入poll异步回调
9.回收工作
2)参数备注1.bootstrap.servers:(注意:示例代码只给了一个broker) 1)该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。 2)注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka集群上 -------------------------------------- 2.key.serializer 和 value.serializer:(这里是java的,java跟C/C++不同) broker 端接收的消息必须以字节数组(byte[])的形式存在。代码清单2-1中生产者使用的KafkaProducer<String,String>和ProducerRecord<String,String>中的泛型<String,String>对应的就是消息中key和value的类型 -------------------------------------- 3.topic 主题 -------------------------------------- 4.brokerID brokerID3)代码(c风格代码)
#include2.消费者 1)流程 2)参数备注 3)代码 3.主题和分区 4.日志存储 5.深入服务器 6.深入客户端 7.可靠性探究 8.kafka应用 9.kafka监控 10.kafka高级应用#include #include #include "rdkafka.h" static int run = 1; static void stop (int sig) { run = 0; fclose(stdin); } static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->err) fprintf(stderr, "%% Message delivery failed: %sn", rd_kafka_err2str(rkmessage->err)); else fprintf(stderr, "%% Message delivered (%zd bytes, " "partition %"PRId32")n", rkmessage->len, rkmessage->partition); } //生产者:往固定的broker生产固定的topic int main (int argc, char **argv) { //1.初始化配置 rd_kafka_t *rk; //生产者句柄 rd_kafka_topic_t *rkt; rd_kafka_conf_t *conf; char errstr[512]; char buf[512]; const char *brokers; const char *topic; //检查参数 if (argc != 3) { fprintf(stderr, "%% Usage: %s n", argv[0]); return 1; } brokers = argv[1]; //获取固定的broker topic = argv[2]; //获得主题参数 //2.创建初始化kafka配置信息 conf = rd_kafka_conf_new(); //3.根据broker修改kafka配置bootstrap.server if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%sn", errstr); return 1; } //4.设置消息回调函数,这个消息回调函数的作用是显示将消息送到kafka是否成功或失败 rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); //5.创建生产者示例 rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "%% Failed to create new producer: %sn", errstr); return 1; } //6.创建主题(长生命周期) rkt = rd_kafka_topic_new(rk, topic, NULL); if (!rkt) { fprintf(stderr, "%% Failed to create topic object: %sn", rd_kafka_err2str(rd_kafka_last_error())); rd_kafka_destroy(rk); return 1; } //7.CTRL+C会清空标准输入缓冲区 signal(SIGINT, stop); fprintf(stderr, "%% Type some text and hit enter to produce messagen" "%% Or just hit enter to only serve delivery reportsn" "%% Press Ctrl-C or Ctrl-D to exitn"); //8.不断检测循环标准输入缓冲区数据 while (run && fgets(buf, sizeof(buf), stdin)) { size_t len = strlen(buf); //8.1如果有数据则去掉数据的换行符 if (buf[len-1] == 'n') buf[--len] = ''; //8.2输入为空继续循环 if (len == 0) { rd_kafka_poll(rk, 0); continue; } //8.3发送生产的消息(异步发送) //成功的化:会将消息放入消息队列,真正发消息到broker是守护进程分发消息的,发送成功或失败都会调用dr_msg_cb函数 retry: //8.4若发送的消息放入消息队列失败,打印一句话并放入poll异步回调 if (rd_kafka_produce( rkt, //传入主题 RD_KAFKA_PARTITION_UA, // RD_KAFKA_MSG_F_COPY, buf, len, NULL, 0, NULL) == -1) { fprintf(stderr, "%% Failed to produce to topic %s: %sn", rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error())); if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) { //队列大小被queue.buffering.max.messages限制 rd_kafka_poll(rk, 1000);//设置非阻塞回调 goto retry; } } else { fprintf(stderr, "%% Enqueued message (%zd bytes) " "for topic %sn", len, rd_kafka_topic_name(rkt)); }//end of enqueue message rd_kafka_poll(rk, 0); } //9.回收工作 fprintf(stderr, "%% Flushing final messages..n"); //等待 rd_kafka_flush(rk, 10*1000 ); rd_kafka_topic_destroy(rkt); rd_kafka_destroy(rk); return 0; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)