场景
在业务中有时会碰到延迟 *** 作,如下单后半小时未支付则取消订单、下单后十五分钟未支付则发短信提醒等等。那这样的需求如何去实现呢。
实现方式第一个简单的方式就是用一个后台进程死循环去查订单,根据下单时间去做不同的 *** 作第二种就是使用消息队列的定时消息,下单之后发送定时消息,不同的定时队列去处理不同的逻辑第三种可以使用框架提供的一些既有功能去做实现代码相关学习推荐:PHP编程从入门到精通
我们以订单创建15分钟后未支付,给用户发送邮件为场景进行学习
准备工作:简单的订单表:order各种需要的composer包rabbitMq本地服务开通阿里云RocketMq服务第一种代码逻辑很简单就直接死循环就行了启动这个脚本进程,可以用supervisor配置部分代码//创建订单的逻辑/** * 随机创建订单 */$order = [ 'order_number' => mt_rand(100,10000).date("YmdHis"), 'user_ID' => mt_rand(1, 100), 'order_amount' => mt_rand(100, 1000),]; /**@var $manager Illuminate\Database\Capsule\Manager **/ $conn = $manager;$insertResult = $conn::table("order") ->insert($order);print_r($insertResult);
延迟处理逻辑
while(true) { // 未支付订单列表 $orderList = $conn::table("order") ->where("created_time", '<=', date("Y-m-d H:i:s", strtotime("-15 minutes"))) ->where('sended_need_pay_notify', '=', 2) ->where('status', '=', 1) ->select(['user_ID', 'ID']) ->orderBy("ID", 'asc') ->get(); $orderList = Json_decode(Json_encode($orderList), true); foreach ($orderList as $orderInfo) { sendEmail($orderInfo['user_ID']); $conn::table('order') ->where('ID', '=', $orderInfo['ID']) ->update(['sended_need_pay_notify' => 1]); logs("update-success-orderID-". $orderInfo['ID']."-userID-".$orderInfo['user_ID']); } sleep(10);}
执行处理脚本
gaoz@nobodyMBP delay_mq_demo % PHP first_while_handler.PHPsend email to 73 success ...2020-06-24 11:37:36:update-success-orderID-3-userID-73
这种方式吧实现简单,但是不优雅,同时大批量订单产生也会遇到问题。
第二种比如使用阿里云的MQ服务,目前rocketMq与rabbitMq版本支持延迟消息,但是rabbit的延时消息收费太高了这里先使用rocketMq的延迟消息去实现需要开通阿里云的服务// 创建订单的逻辑try { /** * 随机创建订单 */ $order = [ 'order_number' => mt_rand(100,10000).date("YmdHis"), 'user_ID' => mt_rand(1, 100), 'order_amount' => mt_rand(100, 1000), ]; /**@var $manager Illuminate\Database\Capsule\Manager **/ $conn = $manager; $insertID = $conn::table("order") ->insertGetID($order); $body = Json_encode(['order_ID' => $insertID, 'created_time' => date("Y-m-d H:i:s")]); $publishMessage = new topicmessage( $body ); // 设置消息KEY $publishMessage->setMessageKey("MessageKey"); // 定时消息, 定时时间为3分钟后 $publishMessage->setStartDeliverTime(time() * 1000 + 3 * 60 * 1000); $result = $this->producer->publishMessage($publishMessage); print "Send mq message success. msgid is:" . $result->getMessageID() . ", bodyMD5 is:" . $result - >getMessageBodyMD5() . "\n"; } catch (\Exception $e) { print_r($e->getMessage() . "\n"); }
消费逻辑 同样是在消费者中处理
foreach ($messages as $message) { $receiptHandles[] = $message->getReceiptHandle(); $messageBody = $message->getMessageBody(); $orderInfo = Json_decode($messageBody, true); if (!empty($orderInfo['order_ID'])) { $orderID = $orderInfo['order_ID']; /**@var $manager Illuminate\Database\Capsule\Manager * */ $conn = $manager; $orderInfo = $conn::table("order") ->select(['ID', 'user_ID']) ->where('ID', '=', $orderID) ->where('status', '=', 1) ->first(); if (!empty($orderInfo)) { $orderInfo = Json_decode(Json_encode($orderInfo), true); sendEmail($orderInfo['user_ID']); $conn::table('order') ->where('ID', '=', $orderInfo['ID']) ->update(['sended_need_pay_notify' => 1]); logs("update-success-orderID-" . $orderInfo['ID'] . "-userID-" . $orderInfo['user_ID']); } } }
启动生产一条消息
gaoz@nobodyMBP delay_mq_demo % PHP rocket_mq_handler_producer.PHP Send mq message success. msgid is:76CF2135696C3D4EAC698A9FA1E1879D, bodyMD5 is:63448B50AA7B8AF47B07AA7CE807E3D3gaoz@nobodyMBP delay_mq_demo %
启动消费者慢慢等待
gaoz@nobodyMBP delay_mq_demo % PHP rocket_mq_handler_consumer.PHP No message, contine long polling!RequestID:5EF752583441411C74869BA9No message, contine long polling!RequestID:5EF7525B3441411C74869FE2No message, contine long polling!RequestID:5EF7525E3441411C7486A42CNo message, contine long polling!RequestID:5EF752613441411C7486A7D9consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderID-8-userID-95 Array( [0] => 76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw) ack
这种方式有现有的服务可以使用,减少开发时间
第三种 使用rabbitMq去实现查阅文档没有找到rabbitMq支持延迟队列的原生功能,但是可以通过消息的ttl+死信队列实现私信队列就是用来存放没有被消费或者消费失败等消息的队列当设置消息的有效期内没有被消费消息就会被转发到死信队列通过设置消息的有效期实现延时功能// 生产者$exchange = 'order15min_notify_exchange';$queue = 'order15minx_notify_queue';$dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue";$connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));$channel = $connection->channel();$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);// 设置队列的过期时间// 正常队列$table = new \PHPAmqplib\Wire\AMQPtable();// 消息有效期$table->set('x-message-ttl', 3*60*1000);$table->set("x-dead-letter-exchange", $dlxExchange);$channel->queue_declare($queue, false, true, false, false, false, $table);$channel->queue_bind($queue, $exchange);// 死信队列$channel->queue_declare($dlxQueue, false, true, false, false, false);$channel->queue_bind($dlxQueue, $dlxExchange);/** * 随机创建订单 */$order = [ 'order_number' => mt_rand(100,10000).date("YmdHis"), 'user_ID' => mt_rand(1, 100), 'order_amount' => mt_rand(100, 1000),];/**@var $manager Illuminate\Database\Capsule\Manager **/$conn = $manager;$insertID = $conn::table("order") ->insertGetID($order);$messageBody = Json_encode(['order_ID' => $insertID, 'created_time' => date("Y-m-d H:i:s")]); $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DEliVERY_MODE_PERSISTENT)); $channel->basic_publish($message, $exchange);
消费者
$dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue";$connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));$channel = $connection->channel();$channel->queue_declare($dlxQueue, false, true, false, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);$channel->queue_bind($dlxQueue, $dlxExchange);/** * @param \PHPAmqplib\Message\AMQPMessage $message */function process_message($message){ echo "\n--------\n"; echo $message->body; echo "\n--------\n"; $orderInfo = Json_decode($message->body, true); if (!empty($orderInfo['order_ID'])) { $orderID = $orderInfo['order_ID']; /**@var $conn Illuminate\Database\Capsule\Manager * */ $conn = getdb(); $orderInfo = $conn::table("order") ->select(['ID', 'user_ID']) ->where('ID', '=', $orderID) ->where('status', '=', 1) ->first(); if (!empty($orderInfo)) { $orderInfo = Json_decode(Json_encode($orderInfo), true); sendEmail($orderInfo['user_ID']); $conn::table('order') ->where('ID', '=', $orderInfo['ID']) ->update(['sended_need_pay_notify' => 1]); logs("update-success-orderID-" . $orderInfo['ID'] . "-userID-" . $orderInfo['user_ID']); } } $message->delivery_info['channel']->basic_ack( $message->delivery_info['delivery_tag']);}$channel->basic_consume($dlxQueue, $consumerTag, false, false, false, false, 'process_message');
启动消费者
gaoz@nobodyMBP delay_mq_demo % PHP rabbit_mq_handler_consumer.PHP--------{"order_ID":7,"created_time":"2020-06-27 11:50:08"}--------send email to 2 success ...2020-06-27 11:56:55:update-success-orderID-7-userID-2
分别启动消费者、生产者就可以了,这里面消息的流转可以看到
消息先进入到正常队列,过期后进入了死信队列而被消费
第四种使用laravel自带的Queue去实现这里没有整理详细代码,后面更新出来可以查看官方文档 队列《Laravel 5.7 中文文档》代码示例:github.com/nobody05/delay_mq_demo 总结
以上是编程之家为你收集整理的PHP 简单实现延时 *** 作全部内容,希望文章能够帮你解决PHP 简单实现延时 *** 作所遇到的程序开发问题。
如果觉得编程之家网站内容还不错,欢迎将编程之家网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)