PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列,第1张

概述PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

延时队列

Delayproducer.PHP

Amqpbuilder.PHP

AmqpBuilder.PHP

<?PHPdeclare(strict_types = 1);namespace App\Components\Amqp;use Hyperf\Amqp\Builder\Builder;use Hyperf\Amqp\Builder\QueueBuilder;class AmqpBuilder extends QueueBuilder{    /**     * @param array|\PHPAmqplib\Wire\AMQPtable $arguments     *     * @return \Hyperf\Amqp\Builder\Builder     */    public function setArguments($arguments) : Builder    {        $this->arguments = array_merge($this->arguments, $arguments);        return $this;    }    /**     * 设置延时队列相关参数     *     * @param string $queuename     * @param int    $xMessageTtl     * @param string $xDeadLetterExchange     * @param string $xDeadLetterRoutingKey     *     * @return $this     */    public function setDelayedQueue(string $queuename, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self    {        $this->setArguments([            'x-message-ttl'             => ['I', $xMessageTtl * 1000], // 毫秒            'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],            'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],        ]);        $this->setQueue($queuename);        return $this;    }}

DelayProducer.PHP

<?PHPdeclare(strict_types = 1);namespace App\Components\Amqp;use Hyperf\Amqp\Annotation\Producer;use Hyperf\Amqp\Builder;use Hyperf\Amqp\Message\ProducerMessageInterface;use Hyperf\Di\Annotation\AnnotationCollector;use PHPAmqplib\Message\AMQPMessage;use Throwable;class DelayProducer extends Builder{    /**     * @param ProducerMessageInterface $producerMessage     * @param AmqpBuilder              $queueBuilder     * @param bool                     $confirm     * @param int                      $timeout     *     * @return bool     * @throws \Throwable     */    public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool    {        return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)        {            return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);        });    }    /**     * @param ProducerMessageInterface $producerMessage     * @param AmqpBuilder              $queueBuilder     * @param bool                     $confirm     * @param int                      $timeout     *     * @return bool     * @throws \Throwable     */    private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool    {        $result = false;        $this->injectMessageProperty($producerMessage);        $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getPropertIEs());        $pool    = $this->getConnectionPool($producerMessage->getPoolname());        /** @var \Hyperf\Amqp\Connection $connection */        $connection = $pool->get();        if ($confirm) {            $channel = $connection->getConfirmChannel();        } else {            $channel = $connection->getChannel();        }        $channel->set_ack_handler(function () use (&$result)        {            $result = true;        });        try {            // 处理延时队列            $exchangeBuilder = $producerMessage->getExchangeBuilder();            // 队列定义            $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isautoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());            // 路由定义            $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isautoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());            // 队列绑定            $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());            // 消息发送            $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());            $channel->wait_for_pending_acks_returns($timeout);        } catch (Throwable $exception) {            // Reconnect the connection before release.            $connection->reconnect();            throw $exception;        }        finally {            $connection->release();        }        return $confirm ? $result : true;    }    /**     * @param ProducerMessageInterface $producerMessage     */    private function injectMessageProperty(ProducerMessageInterface $producerMessage) : voID    {        if (class_exists(AnnotationCollector::class)) {            /** @var \Hyperf\Amqp\Annotation\Producer $annotation */            $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);            if ($annotation) {                $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);                $annotation->exchange && $producerMessage->setExchange($annotation->exchange);            }        }    }}

处理超时订单

Orderqueueconsumer.PHP

Orderqueueproducer.PHP

Orderqueueproducer.PHP

<?PHPdeclare(strict_types = 1);namespace App\Amqp\Producer;use Hyperf\Amqp\Annotation\Producer;use Hyperf\Amqp\Builder\ExchangeBuilder;use Hyperf\Amqp\Message\ProducerMessage;/** * @Producer(exchange="order_exchange", routingKey="order_exchange") */class OrderQueueProducer extends ProducerMessage{    public function __construct($data)    {        $this->payload = $data;    }    public function getExchangeBuilder() : ExchangeBuilder    {        return parent::getExchangeBuilder(); // Todo: Change the autogenerated stub    }}

Orderqueueconsumer.PHP

<?PHPdeclare(strict_types = 1);namespace App\Amqp\Consumer;use App\Service\CityTransport\OrderService;use Hyperf\Amqp\Result;use Hyperf\Amqp\Annotation\Consumer;use Hyperf\Amqp\Message\ConsumerMessage;/** * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1) */class OrderQueueConsumer extends ConsumerMessage{    public function consume($data) : string    {       ##业务处理    }    public function isEnable() : bool    {        return true;    }}

Demo

$builder = new AmqpBuilder();        $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');        $que = ApplicationContext::getContainer()->get(DelayProducer::class);        var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))

推荐教程:《PHP教程》 总结

以上是内存溢出为你收集整理的PHP 框架 Hyperf 实现处理超时未支付订单和延时队列全部内容,希望文章能够帮你解决PHP 框架 Hyperf 实现处理超时未支付订单和延时队列所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1157181.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-01
下一篇 2022-06-01

发表评论

登录后才能评论

评论列表(0条)

保存