如何使用MySQL实现队列

如何使用MySQL实现队列,第1张

这完全是文不对题啊,队列是一种先进先出的数据结构,通常在各种编程语言中都提供相应的类库支持,但MySQL是一个关系型数据库管理系统,并不直接提供这种功能,也不应该提供这种功能。

如果真需要先进先出,就把查询的结果放入到对应高级语言的队列中即可。

最近遇到一个批量发送短信的需求,短信接口是第三方提供的。刚开始想到,获取到手机号之后,循环调用接口发送不就可以了吗?

但很快发现问题:当短信数量很大时,不仅耗时,而且成功率很低。

于是想到,用PHP和MySQL实现一个消息队列,一条一条的发送短信。下面介绍具体的实现方法:

首先,建立一个数据表sms,包含以下字段:

id,

phone, //手机号

content //短信内容

将需要发送的短信和手机号存入sms表中。

接下来,需要用PHP实现一个定时器,定时读取一条记录,并发送短信:

<?php

$db = new Db()

$sms = new Sms()

while(true){

$item = $db->getFirstRecord()//获取数据表第一条记录

if(!$item){

//如果队列中没有数据,则结束定时器

break

}

$res = $sms->send($item['phone'],$item['content'])//发送短信

if($res){

$db->deleteFristRecord()//删除发送成功的记录

echo $item['phone'].'发送成功'

}else{

echo $item['phone'].'发送失败,稍后继续尝试';

}

sleep(10)//每隔十秒循环一次

}

echo '发送完毕!'

?>

将代码保存为timer_sms.php,打开命令行,执行定时器:

php timer_sms.php

好了,php定时器将会根据设定的时间间隔(这里设的是10秒),自动完成发送短信的任务。任务完成后将自动退出定时器,不再占用服务器资源。

根据我的测试,PHP定时器占用资源并不多,不会对服务器造成压力。而且是异步访问数据库,也不会影响数据库的运行。

这种方式的优点是:

1、后台运行,前台无需等待

2、成功率高,失败的记录会自动重发,直到成功

通常大家都会使用redis作为应用的任务队列表,redis的List结构,在一段进行任务的插入,在另一端进行任务的提取。

任务的插入

$redis->lPush("key:task:list",$task)

任务的提取

$tasks = $redis->RPop("key:task:list",0,-1)

可是大家想,如何使用mysql来实现一个队列表呢?

映入大家脑海的一个典型的模式是一个表包含多种类型的记录:未处理记录,已处理记录,正在处理记录等。一个或者多个消费者线程在表中查询未处理的记录,然后声称正在处理这个任务,处理完成之后,再讲记录更新为已处理状态。

这个典型的模式,存在两个问题;1:随着队列表越来越大,查找未处理记录的速度会越来越慢。2:频繁的加锁会让多个消费者线程增加竞争。

首先我们来创建一个表

create table unsent_emails{id int not null primary key auto_increment,status enum("unsent","claimed","sent"),

owner int unsigned not null default 0,

ts timestamp,key (owner,status,ts)

}

该表的列owner用来存储当前正在处理这个记录的连接id,由函数 CONNECTION_ID()返回的连接id或者线程id。如果这个记录当前被没有被处理,则该值为0

我们在 owner status ts上面做了索引的处理,所以查找未处理的记录会很快。

通过我们会采用 select for update的方式来标记待处理的记录,方法如下

beginselect id from unsent_emailswhere owner = 0 and status = 'unsent'

limit 10 for update-- result 10,20,33update unsent_emailsset status = 'claimed',owner = CONNECTION_ID()where id in (10,20,33)

commit

select的时候,使用了两个索引,应该会很快。问题出在select 和 update两个查询之间的间隙,这里的加锁会让其他相同的查询全部阻塞。

如果我们采用update then select的方式,那么效果就会更加高效,代码如下

set autocommit=1commitupdate unsent_emailsset statue = 'claimed',owner = CONNECTION_ID()where owner = 0 and status = 'unsent'

limit 10set autocommit=0select id from unsent_emailswhere owner = CONNECTION_ID() and status = 'claimed'

根本无需使用select去查找哪些记录还没有处理。客户端协议会告诉你更新了几条记录,就可以知道这次需要处理多少条记录。

这样是不是解决了上面的第二个问题,select for update的模式的加锁会增加多个消费队列的竞争问题。

其实所有的select for update 都可以替换为 update then select模式。

问题还没有结束,还有一种情况需要处理,就是比如正在处理任务的进程异常退出了,那么对应的进程正在处理的任务也就变为僵尸任务了。如何避免这种情况的发生呢?

所以我们还是需要一个新的定时器或者线程来定时检测并且update,将那些僵尸任务的记录更新到原始状态,就可以了。

僵尸任务的定义必须符合两点,1:任务被搁置了很久,比如十分钟,而通常一个任务只需要10秒就可以处理完;2:任务的owner(线程id或者连接id)已经不存在,只需要执行show processlist就可以获取当前正在工作的线程id了。代码如下

update unsent_emailsset owner = 0,status = 'unsent'

where owner not in (10,20,33,44) and status = 'claimed'

and ts <current_timestamp - interval 10 minute

一个基于mysql构建的队列表就完成了。

当然,最好的办法就是将任务队列从数据库中迁移出来。redis真是一个很好的队列容器,当然也可以使用ssdb(基于leveldb,内存占用更少)。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/7629093.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-08
下一篇 2023-04-08

发表评论

登录后才能评论

评论列表(0条)

保存