任务的插入
$redis->lPush("key:task:list",$task)
任务的提取
$tasks = $redis->RPop("key:task:list",0,-1)
可是大家想,如何使用mysql来实现一个队列表呢?
映入大家脑海的一个典型的模式是一个表包含多种类型的记录:未处理记录,已处理记录,正在处理记录等。一个或者多个消费者线程在表中查询未处理的记录,然后声称正在处理这个任务,处理完成之后,再讲记录更新为已处理状态。
这个典型的模式,存在两个问题;1:随着队列表越来越大,查找未处理记录的速度会越来越慢。2:频繁的加锁会让多个消费者线程增加竞争。
首先我们来创建一个表
create table unsent_emails{id int not null primary key auto_increment,status enum("unsent","claimed","sent"),
owner int unsigned not null default 0,
ts timestamp,key (owner,status,ts)
}
该表的列owner用来存储当前正在处理这个记录的连接id,由函数 CONNECTION_ID()返回的连接id或者线程id。如果这个记录当前被没有被处理,则该值为0
我们在 owner status ts上面做了索引的处理,所以查找未处理的记录会很快。
通过我们会采用 select for update的方式来标记待处理的记录,方法如下
beginselect id from unsent_emailswhere owner = 0 and status = 'unsent'
limit 10 for update-- result 10,20,33update unsent_emailsset status = 'claimed',owner = CONNECTION_ID()where id in (10,20,33)
commit
select的时候,使用了两个索引,应该会很快。问题出在select 和 update两个查询之间的间隙,这里的加锁会让其他相同的查询全部阻塞。
如果我们采用update then select的方式,那么效果就会更加高效,代码如下
set autocommit=1commitupdate unsent_emailsset statue = 'claimed',owner = CONNECTION_ID()where owner = 0 and status = 'unsent'
limit 10set autocommit=0select id from unsent_emailswhere owner = CONNECTION_ID() and status = 'claimed'
根本无需使用select去查找哪些记录还没有处理。客户端协议会告诉你更新了几条记录,就可以知道这次需要处理多少条记录。
这样是不是解决了上面的第二个问题,select for update的模式的加锁会增加多个消费队列的竞争问题。
其实所有的select for update 都可以替换为 update then select模式。
问题还没有结束,还有一种情况需要处理,就是比如正在处理任务的进程异常退出了,那么对应的进程正在处理的任务也就变为僵尸任务了。如何避免这种情况的发生呢?
所以我们还是需要一个新的定时器或者线程来定时检测并且update,将那些僵尸任务的记录更新到原始状态,就可以了。
僵尸任务的定义必须符合两点,1:任务被搁置了很久,比如十分钟,而通常一个任务只需要10秒就可以处理完;2:任务的owner(线程id或者连接id)已经不存在,只需要执行show processlist就可以获取当前正在工作的线程id了。代码如下
update unsent_emailsset owner = 0,status = 'unsent'
where owner not in (10,20,33,44) and status = 'claimed'
and ts <current_timestamp - interval 10 minute
一个基于mysql构建的队列表就完成了。
当然,最好的办法就是将任务队列从数据库中迁移出来。redis真是一个很好的队列容器,当然也可以使用ssdb(基于leveldb,内存占用更少)。
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"><html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/htmlcharset=gb2312" />
<title>File Manage System</title>
<link rel="stylesheet" type="text/css" href="FiMaSy.css" />
</head>
<body>
<span></span>
<h2 align="center">PHP FILE MANAGE SYSTEM</h2>
<span></span><span></span><span></span>
<hr color="#33CCCC"/>
<div id = "systembody">
<div id = "parentpath">
<?php
//获取用户单击页面上的目录链接生成的新的目录信息
if(!isset($_GET[currentdir])||empty($_GET[currentdir]))
$dir=getcwd()
else
$dir=$_GET[currentdir]
//改变目录
chdir($dir)
echo getcwd()
?>
</div>
<form name="form" method="post" enctype="multipart/form-data">
<input type="file" id="file" name="file" value="browse"/>
<input type="button" class="buttonstyle" value="upload" onclick="return myupload()">
<input type="button" class="buttonstyle" value="edit" onclick="return myedit()"/>
<input type="button" class="buttonstyle" value="delete" onclick="return mydelete()"/>
</form>
</div>
<script language="javascript">
function myupload()
{
document.form.action="testtest.php?operation=upload"
document.form.submit()
}
function myedit()
{
document.form.action="testtest.php?operation=edit"
document.form.submit()
}
function mydelete()
{
document.form.action="testtest.php?operation=delete"
document.form.submit()
}
</script>
<?php
parse_str($_SERVER['QUERY_STRING'])
$myoperation=$operation
if($myoperation=="upload")
{
uploadmyfile()
}
if($myoperation=="edit")
{
editmyfile()
}
if($myoperation=="delete")
{
deletemyfile()
}
?>
<?php
function uploadmyfile()
{
if ($_FILES["file"]["error"] >0)
{
echo "Return Code: " . $_FILES["file"]["error"] . "<br />"
}
else
{
echo "Upload: " . $_FILES["file"]["name"] . "<br />"
echo "Type: " . $_FILES["file"]["type"] . "<br />"
echo "Size: " . ($_FILES["file"]["size"] / 1024) . " Kb<br />"
echo "Temp file: " . $_FILES["file"]["tmp_name"] . "<br />"
if (file_exists("upload/" . $_FILES["file"]["name"]))
{
echo $_FILES["file"]["name"] . " already exists. "
}
else
{
move_uploaded_file($_FILES["file"]["tmp_name"],
"upload/" . $_FILES["file"]["name"])
echo "Stored in: " . "upload/" . $_FILES["file"]["name"]
}
}
}
?>
<?php
function deletemyfile()
{
$delfilename="/upload/" . $_FILES["file"]["name"]
if(!file_exists($delfilename))
{
$msg = "File doesn't exist!"
}else{
if(unlink($delfilename))
{
$msg = 'delete '.$delfilename.' succeed'
}else{
$msg = 'delete '.$delfilename.' failed'
}
}
}
?>
<?php
function editmyfile()
{
echo "this is editmyfile echo"
}
?>
<?php
function listmyfile()
{
echo"
<table border=1 width=100%>
<tr align=center bgcolor=yellow >
<th>File Name</th>
<th>File Size</th>
<th>Last Modification</th>
<th>Yes / No<th>
</tr>"
//获取用户单击页面上的目录链接生成的新的目录信息
if(!isset($_GET[currentdir])||empty($_GET[currentdir]))
{
$dir=getcwd()
}
else
{
$dir=$_GET[currentdir]
}
//改变目录
chdir($dir)
//打开目录
$dh=opendir($dir)
//循环读取目录中的目录及文件
while($item = readdir($dh))
{
echo "<tr><td>"
//如果是目录
if(is_dir($item))
{
//对当前目录
if($item==".")
{
$currentdir=getcwd()
echo "<a href=$PHP_SELF?currentdir=$currentdir>.</a>"
}
//对上一级目录
else
if($item=="..")
{
$currentdir=getcwd()."\\.."
echo "<a href=$PHP_SELF?currentdir=$currentdir>..</a>"
}
else //对子目录
{
$currentdir=getcwd()."\\$item"
echo "<a href=$PHP_SELF?currentdir=$currentdir>$item</a>"
}
}
//如果是文件
else
{
//截取文件后缀
$extname=substr($item,strrpos($item,"."))
//显示txt和php文件的链接信息,通过链接可以打开文件
if(strtoupper($extname)==".PHP"||strtoupper($extname)==".TXT")
{
$currentdir=getcwd()
echo "<a href=./show_file.php?currentdir=$currentdir&filename=$item&$type=$extname>$item</a>"
}
//对于其他类型的文件,只给出文件名
else
{
echo $item
}
}
echo "</td>"
//显示文件或目录的其他信息
//文件类型和大小
if(is_dir($item))$file_size="目录"
else
$file_size=round(filesize($item))."字节"
echo "<td>$file_size</td>"
//文件最后修改时间
$update_date=date("y-m-d h:i:sA",filemtime($item))//filemtime --- 取得文件最后修改的时间
echo "<td>$update_date</td>"
?>
<td><input type="checkbox" /></td></tr>
<?php
}
closedir($dh) //关闭目录
echo "</table>"
}
?>
<?php
header("Content-Type:text/htmlcharset=gb2312")
listmyfile()
?>
<!--<script language="JavaScript">
function myrefresh()
{
window.location.reload()
}
setTimeout(''myrefresh()'',3000)
</script>-->
</body>
</html>
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#systembody{height: auto
width: 1000px
font-size: 14px
}
#parentpath{height: 30px
width: 450px
line-height: 30px
float: left
margin-top: 10px
background-color: #999999
}
#file{height: 30px
width: 300px
line-height: 30px
float: left
margin-top: 10px
margin-left: 10px
}
table td{background-color: #FFFFCC}
.buttonstyle{height: 30px
width: 50px
line-height: 30px
float: left
margin-top: 10px
margin-left: 5px
}
.filename{height: 30px
width: 200px
line-height: 30px
float: left
margin-top: 5px
margin-left: 10px
padding-left: 5px
background-color: #CCCCCC
}
.filesize{height: 30px
width: 100px
line-height: 30px
float: left
margin-top: 5px
padding-left: 5px
background-color: #999999
}
.filetype{height: 30px
width: 150px
line-height: 30px
float: left
margin-top: 5px
padding-left: 5px
background-color: #CCCCCC
}
.lastmodifi{height: 30px
width: 200px
line-height: 30px
float: left
margin-top: 5px
padding-left: 5px
background-color: #999999
}
.checkbox{height: 30px
width: 50px
line-height: 30px
float: left
margin-top: 5px
padding-left: 5px
background-color: #CCCCCC
}
ssdb主要看中的是和Redis兼容,这样不用改源码,就可以换个存储引擎了。基于Redis先驱开发的存储有aerospike,vedis,ssdb,解决了多语言多线程分布式环境下快速存储问题,比mysql传统数据库要快,有的和redis协议兼容,方便更换存储引擎。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)