项目代码https://gitee.com/owenzhang24/tp5
其他笔记:如果你想查看Rabbitmq队列,并且想知道有多少消息存在其中,你(作为特权用户)可以使用rabbitmqctl 工具:
rabbitmqctl list_queues。
在Windows中,省略sudo:
rabbitmqctl.bat list_queues
2: 工作队列
我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。
一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。
为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在window系统运行,去掉sudo:
$ rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
3: rabbitmqctl能够列出服务器上所有的交换器:
$ sudo rabbitmqctl list_exchanges
这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。
4:列出所有现存的绑定 rabbitmqctl list_bindings
5: 如果你想把日志保存到文件中,只需要打开控制台输入: (receive_logs.php 源代码)
$ php receive_logs.php > logs_from_rabbit.log
如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:
$ php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C
如果要触发一个error级别的日志,只需要输入:
$ php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
windows环境的rabbitmq安装与启动
https://my.oschina.net/owenzhang24/blog/5051652
第二:composer require php-amqplib/php-amqplib 第三:代码类- rabbitMq实现的基础类:application/common/lib/classes/rabbitmq/RabbitMq.php
- 供外部调用的rabbitMq类:application/common/lib/classes/RabbitMqWork.php
- 测试发送消息到rabbitMq中的方法:application/index/controller/Index.php
- 添加php think命令实现接收rabbitMq中的消息:application/common/command
private function __construct($exchangeType)
{
self::$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
self::$channel = self::$connection->channel();
if (!empty($exchangeType)) {
self::$exchangeName = self::$exchangeNames[$exchangeType];
self::$channel->exchange_declare(
self::$exchangeName, //交换机名称
$exchangeType, //路由类型
false, //don't check if a queue with the same name exists 是否检测同名队列
true, //the queue will not survive server restarts 是否开启队列持久化
false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列
);
}
}
public static function instance($exchangeType = '')
{
if (!self::$instance instanceof self) {
self::$instance = new self($exchangeType);
}
return self::$instance;
}
private function __clone()
{
}
public function send()
{
self::$channel->queue_declare('hello', false, false, false);
$msg = new AMQPMessage('Hello World!');
self::$channel->basic_publish($msg, '', 'hello');
echo "[X] Sent 'Hello World!'n";
}
public function receive($callback)
{
self::$channel->queue_declare('hello', false, false, false, true);
echo "[*] Waiting for messages. To exit press CTRL+Cn";
self::$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) {
self::$channel->wait();
}
}
public function addTask($data = '')
{
self::$channel->queue_declare('task_queue', false, true, false, true);
if (empty($data)) $data = 'Hello World!';
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
self::$channel->basic_publish($msg, '', 'task_queue');
echo "[x] Sent $data n";
}
public function workTask($callback)
{
self::$channel->queue_declare('task_queue', false, true, false, true);
echo ' [*] Waiting for messages. To exit press CTRL+C', "n";
self::$channel->basic_qos(null, 1, null);
self::$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while (count(self::$channel->callbacks)) {
self::$channel->wait();
}
}
public function sendQueue($data = '')
{
if (empty($data)) $data = 'info:Hello World!';
$msg = new AMQPMessage($data);
self::$channel->basic_publish($msg, self::$exchangeName);
echo "[x] Sent $data n";
}
public function subscribeQueue($callback)
{
list($queue_name, ,) = self::$channel->queue_declare(
"", //队列名称
false, //don't check if a queue with the same name exists 是否检测同名队列
true, //the queue will not survive server restarts 是否开启队列持久化
true, //the queue might be accessed by other channels 队列是否可以被其他队列访问
false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列
);
self::$channel->queue_bind($queue_name, self::$exchangeName);
echo "[*] Waiting for logs. To exit press CTRL+C n";
self::$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) {
self::$channel->wait();
}
}
public function sendDirect($routingKey, $data = '')
{
if (empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
self::$channel->basic_publish($msg, self::$exchangeName, $routingKey);
echo "[x] Sent $routingKey:$data n";
}
public function receiveDirect(Closure $callback, array $bindingKeys)
{
list($queue_namme, ,) = self::$channel->queue_declare('', false, true, true, false);
foreach ($bindingKeys as $bindingKey) {
self::$channel->queue_bind($queue_namme, self::$exchangeName, $bindingKey);
}
echo "[x] Waiting for logs. To exit press CTRL+C n";
self::$channel->basic_consume($queue_namme, '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) {
self::$channel->wait();
}
}
public function sendTopic($routingKey, $data = '')
{
if (empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
self::$channel->basic_publish($msg, self::$exchangeName, $routingKey);
echo " [x] Sent ", $routingKey, ':', $data, " n";
}
public function receiveTopic(Closure $callback, array $bindingKeys)
{
list($queueName, ,) = self::$channel->queue_declare("", false, true, true, false);
foreach ($bindingKeys as $bindingKey) {
self::$channel->queue_bind($queueName, self::$exchangeName, $bindingKey);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "n";
self::$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) {
self::$channel->wait();
}
}
public function __destruct()
{
// TODO: Implement __destruct() method.
self::$channel->close();
self::$connection->close();
}
}
application/common/lib/classes/RabbitMqWork.php
RabbitMq = RabbitMq::instance($exchageType); } public function send() { $this->RabbitMq->send(); } public function receive($callback) { $this->RabbitMq->receive($callback); } public function addTask($data) { $this->RabbitMq->addTask($data); } public function workTask($callback) { $this->RabbitMq->workTask($callback); } public function sendQueue($data) { $this->RabbitMq->sendQueue($data); } public function subscribeQueue($callback) { $this->RabbitMq->subscribeQueue($callback); } public function sendDirect($routingKey, $data) { $this->RabbitMq->sendDirect($routingKey, $data); } public function receiveDirect(Closure $callback, array $bindingKeys) { $this->RabbitMq->receiveDirect($callback, $bindingKeys); } public function sendTopic($routingKey, $data) { $this->RabbitMq->sendTopic($routingKey, $data); } public function receiveTopic(Closure $callback, array $bindingKeys) { $this->RabbitMq->receiveTopic($callback, $bindingKeys); } }
application/index/controller/Index.php
send(); // $this->addTask(); // $this->sendQueue(); // $this->sendDirect(); $this->sendTopic(); var_dump(11); die(); } public function searchBlog() { // $id=1; // $res = SyncBlog::getInstance()->syncBlog($id); $search='11'; $res = SearchBlog::getInstance()->searchBlog($search, 1, 100); var_dump($res); die(); var_dump(1111); die(); } public function send() { $RabbitMqWork = new RabbitMqWork(); $RabbitMqWork->send(); } public function addTask() { $data = input('data', 'This is work task!'); $RabbitMqWork = new RabbitMqWork(); $RabbitMqWork->addTask($data); } public function sendQueue() { $data = input('data', 'This is send queue1'); $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT); $RabbitMqWork->sendQueue($data); } public function sendDirect() { $data = input('data', 'Hello World!'); $routingKey = input('routingKey', 'info'); $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT); $RabbitMqWork->sendDirect($routingKey, $data); } public function sendTopic() { $data = input('data', 'Hello World!'); $routingKey = input('routingKey', 'lazy.boy'); $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC); $RabbitMqWork->sendTopic($routingKey, $data); } }
application/command.php
// +---------------------------------------------------------------------- return [ 'simpleMq' => 'applicationcommandSimpleWork', 'workQueue' => 'applicationcommandWorkQueue', 'sendQueue' => 'applicationcommandSendQueue', 'directQueue' => 'applicationcommandDirectQueue', 'topicQueue' => 'applicationcommandTopicQueue', ];
application/common/command namespace appcommand; use appcommonlibclassesrabbitmqRabbitMq; use appcommonlibclassesRabbitMqWork; use thinkconsoleCommand; use thinkconsoleInput; use thinkconsoleOutput; class DirectQueue extends Command { protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('directQueue'); } protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT); $callback = function ($msg){ echo "[x] ".$msg->delivery_info['routing_key'].":$msg->body n"; }; $RabbitMqWork->receiveDirect($callback,RabbitMq::SEVERITYS); } }
application/command/SendQueue.php
setName('sendQueue'); } protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT); $callback = function ($msg) { echo 'Receive:'; echo "Msg:$msg->body n"; Log::error("Msg:$msg->body"); }; $RabbitMqWork->subscribeQueue($callback); } }
application/command/SimpleWork.php
setName('simpleMq'); } protected function execute(Input $input, Output $output) { $RabbitMqWork= new RabbitMqWork(); $callback = function ($msg){ echo 'Receive:'; $queueName = $msg->delivery_info['routing_key']; $msgData = $msg->body; $isAck = true; echo 'Msg:'.$msgData."n"; echo 'QueueName:'.$queueName."n"; if($isAck) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } }; $RabbitMqWork->receive($callback); } }
application/command/TopicQueue.php
setName('topicQueue'); } protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC); $callback = function ($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "n"; }; $bindingKeys = [ '*.orange.*', '*.*.rabbit', 'lazy.#' ]; $RabbitMqWork->receiveTopic($callback,$bindingKeys); } }
application/command/WorkQueue.php
setName('workQueue'); } protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(); $callback = function ($msg){ echo " [x] Received ", $msg->body, "n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $RabbitMqWork->workTask($callback); } }
Buy me a cup of coffee :)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)