如何用mysql表模拟队列

如何用mysql表模拟队列,第1张

通常大家都会使用redis作为应用的任务队列表,redis的List结构,在一段进行任务的插入,在另一端进行任务的提取。

任务的插入

$redis->lPush("key:task:list",$task)

任务的提取

$tasks = $redis->RPop("key:task:list",0,-1)

可是大家想,如何使用mysql来实现一个队列表呢?

映入大家脑海的一个典型的模式是一个表包含多种类型的记录:未处理记录,已处理记录,正在处理记录等。一个或者多个消费者线程在表中查询未处理的记录,然后声称正在处理这个任务,处理完成之后,再讲记录更新为已处理状态。

这个典型的模式,存在两个问题;1:随着队列表越来越大,查找未处理记录的速度会越来越慢。2:频繁的加锁会让多个消费者线程增加竞争。

首先我们来创建一个表

create table unsent_emails{id int not null primary key auto_increment,status enum("unsent","claimed","sent"),

owner int unsigned not null default 0,

ts timestamp,key (owner,status,ts)

}

该表的列owner用来存储当前正在处理这个记录的连接id,由函数 CONNECTION_ID()返回的连接id或者线程id。如果这个记录当前被没有被处理,则该值为0

我们在 owner status ts上面做了索引的处理,所以查找未处理的记录会很快。

通过我们会采用 select for update的方式来标记待处理的记录,方法如下

beginselect id from unsent_emailswhere owner = 0 and status = 'unsent'

limit 10 for update-- result 10,20,33update unsent_emailsset status = 'claimed',owner = CONNECTION_ID()where id in (10,20,33)

commit

select的时候,使用了两个索引,应该会很快。问题出在select 和 update两个查询之间的间隙,这里的加锁会让其他相同的查询全部阻塞。

如果我们采用update then select的方式,那么效果就会更加高效,代码如下

set autocommit=1commitupdate unsent_emailsset statue = 'claimed',owner = CONNECTION_ID()where owner = 0 and status = 'unsent'

limit 10set autocommit=0select id from unsent_emailswhere owner = CONNECTION_ID() and status = 'claimed'

根本无需使用select去查找哪些记录还没有处理。客户端协议会告诉你更新了几条记录,就可以知道这次需要处理多少条记录。

这样是不是解决了上面的第二个问题,select for update的模式的加锁会增加多个消费队列的竞争问题。

其实所有的select for update 都可以替换为 update then select模式。

问题还没有结束,还有一种情况需要处理,就是比如正在处理任务的进程异常退出了,那么对应的进程正在处理的任务也就变为僵尸任务了。如何避免这种情况的发生呢?

所以我们还是需要一个新的定时器或者线程来定时检测并且update,将那些僵尸任务的记录更新到原始状态,就可以了。

僵尸任务的定义必须符合两点,1:任务被搁置了很久,比如十分钟,而通常一个任务只需要10秒就可以处理完;2:任务的owner(线程id或者连接id)已经不存在,只需要执行show processlist就可以获取当前正在工作的线程id了。代码如下

update unsent_emailsset owner = 0,status = 'unsent'

where owner not in (10,20,33,44) and status = 'claimed'

and ts <current_timestamp - interval 10 minute

一个基于mysql构建的队列表就完成了。

当然,最好的办法就是将任务队列从数据库中迁移出来。redis真是一个很好的队列容器,当然也可以使用ssdb(基于leveldb,内存占用更少)。

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">

<html xmlns="http://www.w3.org/1999/xhtml">

<head>

<meta http-equiv="Content-Type" content="text/htmlcharset=gb2312" />

<title>File Manage System</title>

<link rel="stylesheet" type="text/css" href="FiMaSy.css" />

</head>

<body>

<span></span>

<h2 align="center">PHP FILE MANAGE SYSTEM</h2>

<span></span><span></span><span></span>

<hr color="#33CCCC"/>

<div id = "systembody">

<div id = "parentpath">

<?php

//获取用户单击页面上的目录链接生成的新的目录信息

if(!isset($_GET[currentdir])||empty($_GET[currentdir]))

$dir=getcwd()

else

$dir=$_GET[currentdir]

//改变目录

chdir($dir)

echo getcwd()

?>

</div>

<form name="form" method="post" enctype="multipart/form-data">

<input type="file" id="file" name="file" value="browse"/>

<input type="button" class="buttonstyle" value="upload" onclick="return myupload()">

<input type="button" class="buttonstyle" value="edit" onclick="return myedit()"/>

<input type="button" class="buttonstyle" value="delete" onclick="return mydelete()"/>

</form>

</div>

<script language="javascript">

function myupload()

{

document.form.action="testtest.php?operation=upload"

document.form.submit()

}

function myedit()

{

document.form.action="testtest.php?operation=edit"

document.form.submit()

}

function mydelete()

{

document.form.action="testtest.php?operation=delete"

document.form.submit()

}

</script>

<?php

parse_str($_SERVER['QUERY_STRING'])

$myoperation=$operation

if($myoperation=="upload")

{

uploadmyfile()

}

if($myoperation=="edit")

{

editmyfile()

}

if($myoperation=="delete")

{

deletemyfile()

}

?>

<?php

function uploadmyfile()

{

if ($_FILES["file"]["error"] >0)

{

echo "Return Code: " . $_FILES["file"]["error"] . "<br />"

}

else

{

echo "Upload: " . $_FILES["file"]["name"] . "<br />"

echo "Type: " . $_FILES["file"]["type"] . "<br />"

echo "Size: " . ($_FILES["file"]["size"] / 1024) . " Kb<br />"

echo "Temp file: " . $_FILES["file"]["tmp_name"] . "<br />"

if (file_exists("upload/" . $_FILES["file"]["name"]))

{

echo $_FILES["file"]["name"] . " already exists. "

}

else

{

move_uploaded_file($_FILES["file"]["tmp_name"],

"upload/" . $_FILES["file"]["name"])

echo "Stored in: " . "upload/" . $_FILES["file"]["name"]

}

}

}

?>

<?php

function deletemyfile()

{

$delfilename="/upload/" . $_FILES["file"]["name"]

if(!file_exists($delfilename))

{

$msg = "File doesn't exist!"

}else{

if(unlink($delfilename))

{

$msg = 'delete '.$delfilename.' succeed'

}else{

$msg = 'delete '.$delfilename.' failed'

}

}

}

?>

<?php

function editmyfile()

{

echo "this is editmyfile echo"

}

?>

<?php

function listmyfile()

{

echo"

<table border=1 width=100%>

<tr align=center bgcolor=yellow >

<th>File Name</th>

<th>File Size</th>

<th>Last Modification</th>

<th>Yes / No<th>

</tr>"

//获取用户单击页面上的目录链接生成的新的目录信息

if(!isset($_GET[currentdir])||empty($_GET[currentdir]))

{

$dir=getcwd()

}

else

{

$dir=$_GET[currentdir]

}

//改变目录

chdir($dir)

//打开目录

$dh=opendir($dir)

//循环读取目录中的目录及文件

while($item = readdir($dh))

{

echo "<tr><td>"

//如果是目录

if(is_dir($item))

{

//对当前目录

if($item==".")

{

$currentdir=getcwd()

echo "<a href=$PHP_SELF?currentdir=$currentdir>.</a>"

}

//对上一级目录

else

if($item=="..")

{

$currentdir=getcwd()."\\.."

echo "<a href=$PHP_SELF?currentdir=$currentdir>..</a>"

}

else //对子目录

{

$currentdir=getcwd()."\\$item"

echo "<a href=$PHP_SELF?currentdir=$currentdir>$item</a>"

}

}

//如果是文件

else

{

//截取文件后缀

$extname=substr($item,strrpos($item,"."))

//显示txt和php文件的链接信息,通过链接可以打开文件

if(strtoupper($extname)==".PHP"||strtoupper($extname)==".TXT")

{

$currentdir=getcwd()

echo "<a href=./show_file.php?currentdir=$currentdir&filename=$item&$type=$extname>$item</a>"

}

//对于其他类型的文件,只给出文件名

else

{

echo $item

}

}

echo "</td>"

//显示文件或目录的其他信息

//文件类型和大小

if(is_dir($item))$file_size="目录"

else

$file_size=round(filesize($item))."字节"

echo "<td>$file_size</td>"

//文件最后修改时间

$update_date=date("y-m-d h:i:sA",filemtime($item))//filemtime --- 取得文件最后修改的时间

echo "<td>$update_date</td>"

?>

<td><input type="checkbox" /></td></tr>

<?php

}

closedir($dh) //关闭目录

echo "</table>"

}

?>

<?php

header("Content-Type:text/htmlcharset=gb2312")

listmyfile()

?>

<!--<script language="JavaScript">

function myrefresh()

{

window.location.reload()

}

setTimeout(''myrefresh()'',3000)

</script>-->

</body>

</html>

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#systembody{height: auto

width: 1000px

font-size: 14px

}

#parentpath{height: 30px

width: 450px

line-height: 30px

float: left

margin-top: 10px

background-color: #999999

}

#file{height: 30px

width: 300px

line-height: 30px

float: left

margin-top: 10px

margin-left: 10px

}

table td{background-color: #FFFFCC}

.buttonstyle{height: 30px

width: 50px

line-height: 30px

float: left

margin-top: 10px

margin-left: 5px

}

.filename{height: 30px

width: 200px

line-height: 30px

float: left

margin-top: 5px

margin-left: 10px

padding-left: 5px

background-color: #CCCCCC

}

.filesize{height: 30px

width: 100px

line-height: 30px

float: left

margin-top: 5px

padding-left: 5px

background-color: #999999

}

.filetype{height: 30px

width: 150px

line-height: 30px

float: left

margin-top: 5px

padding-left: 5px

background-color: #CCCCCC

}

.lastmodifi{height: 30px

width: 200px

line-height: 30px

float: left

margin-top: 5px

padding-left: 5px

background-color: #999999

}

.checkbox{height: 30px

width: 50px

line-height: 30px

float: left

margin-top: 5px

padding-left: 5px

background-color: #CCCCCC

}

ssdb主要看中的是和Redis兼容,这样不用改源码,就可以换个存储引擎了。

基于Redis先驱开发的存储有aerospike,vedis,ssdb,解决了多语言多线程分布式环境下快速存储问题,比mysql传统数据库要快,有的和redis协议兼容,方便更换存储引擎。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/8602287.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-18
下一篇 2023-04-18

发表评论

登录后才能评论

评论列表(0条)

保存