"require": { "php": ">=7.3", "laravel/lumen-framework": "^6.*", "nmred/kafka-php": "v0.2.0.8" },
创建 KafkaService
setmetadataRefreshIntervalMs(10000); $config->setmetadataBrokerList($url); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new KafkaProducer(function () use($value,$topic){ return [ [ 'topic' => $topic, 'value' => $value, 'key' => '', ], ]; }); $producer->success(function ($result){ return "success"; }); $producer->error(function ($errorCode){ var_dump($errorCode); }); $producer->send(true); } public function consumer($group,$topics , $url){ $config = KafkaConsumerConfig::getInstance(); $config->setmetadataRefreshIntervalMs(500); $config->setmetadataBrokerList($url); $config->setGroupId($group); $config->setBrokerVersion('0.9.0.1'); $config->setTopics([$topics]); $config->setOffsetReset('earliest'); $consumer = new KafkaConsumer(); $consumer->start(function($topic, $part, $message) { echo "receive a message:".$message['message']['value']."n"; app('consumerKafka')->consumerData($message['message']['value']);//你的接收处理逻辑 file_put_contents("consumer.log",$message['message']['value']); }); } }
执行 produce 方法生产消息
'test', 'data_type' => 'personal', 'action' => 'update', 'data' => [ 'id' => 1, 'name' => 'tom', 'gender' => 2 ], 'redirect_url' => '', 'operator' => 'system', ]; $value = json_encode ($value, JSON_FORCE_OBJECT ); $kafka = new KafkaService(); $kafka->Producer($topic, $value , $url); } }
执行 php artisan consumer:kafka 消费消息
php artisan consumer:kafka
log('开始监听消息...'); app('kafkaService')->consumer( $group=env('KAFKA_GROUP'), $topics =env('KAFKA_TOPIC'), $url=env('KAFKA_URL') ); return $this; } private function log($msg = '') { if (!$msg) { return $this; } if (php_sapi_name() == 'cli') { echo $msg, PHP_EOL; } file_put_contents("kafka.log",$msg); //app('myLog')->lumenLog($msg, 'kafka_consumer'); return $this; } }
注册config/app.php
'aliases' => [ 'kafkaService' => AppServicesKafkaService::class, 'consumerKafka'=>AppServicesConsumerService::class ]
修改.env
KAFKA_GROUP=192.168.102.46:2181 KAFKA_TOPIC=test KAFKA_URL=192.168.102.46:9092
kafka部署方法docker部署kafka_飞鱼计划-CSDN博客
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)