Laravel 实现 Kafka 消息推送与接收处理

Laravel 实现 Kafka 消息推送与接收处理,第1张

Laravel 实现 Kafka 消息推送与接收处理
 "require": {
        "php": ">=7.3",
        "laravel/lumen-framework": "^6.*",
        "nmred/kafka-php": "v0.2.0.8"
    },

 

创建 KafkaService

setmetadataRefreshIntervalMs(10000);
        $config->setmetadataBrokerList($url);
        $config->setBrokerVersion('1.0.0');
        $config->setRequiredAck(1);
        $config->setIsAsyn(false);
        $config->setProduceInterval(500);
        $producer = new KafkaProducer(function () use($value,$topic){
            return [
                [
                    'topic' => $topic,
                    'value' => $value,
                    'key' => '',
                ],
            ];
        });
        $producer->success(function ($result){
            return "success";
        });
        $producer->error(function ($errorCode){
            var_dump($errorCode);
        });
        $producer->send(true);
    }

    
    public function consumer($group,$topics , $url){
        $config = KafkaConsumerConfig::getInstance();
        $config->setmetadataRefreshIntervalMs(500);
        $config->setmetadataBrokerList($url);
        $config->setGroupId($group);
        $config->setBrokerVersion('0.9.0.1');
        $config->setTopics([$topics]);
        $config->setOffsetReset('earliest');
        $consumer = new KafkaConsumer();
        $consumer->start(function($topic, $part, $message) {
            echo "receive a message:".$message['message']['value']."n";
            app('consumerKafka')->consumerData($message['message']['value']);//你的接收处理逻辑
            file_put_contents("consumer.log",$message['message']['value']);
        });
    }
}

执行 produce 方法生产消息

 'test',
                'data_type' => 'personal',
                'action' => 'update',
                'data' =>
                    [
                        'id' => 1,
                        'name' => 'tom',
                        'gender' => 2
                    ],
                'redirect_url' => '',
                'operator' => 'system',
            ];
        $value = json_encode ($value, JSON_FORCE_OBJECT );
        $kafka = new KafkaService();
        $kafka->Producer($topic, $value , $url);
    }
}

执行 php artisan consumer:kafka 消费消息

php artisan consumer:kafka
log('开始监听消息...');
        app('kafkaService')->consumer(
            $group=env('KAFKA_GROUP'),
            $topics =env('KAFKA_TOPIC'),
            $url=env('KAFKA_URL')
        );
        return $this;
    }

    private function log($msg = '')
    {
        if (!$msg) {
            return $this;
        }
        if (php_sapi_name() == 'cli') {
            echo $msg, PHP_EOL;
        }
        file_put_contents("kafka.log",$msg);
        //app('myLog')->lumenLog($msg, 'kafka_consumer');
        return $this;
    }
}

注册config/app.php

 'aliases' => [

        'kafkaService' => AppServicesKafkaService::class,
        'consumerKafka'=>AppServicesConsumerService::class
]

修改.env

KAFKA_GROUP=192.168.102.46:2181
KAFKA_TOPIC=test
KAFKA_URL=192.168.102.46:9092

kafka部署方法docker部署kafka_飞鱼计划-CSDN博客

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700155.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存