任务的插入
$redis->lPush("key:task:list",$task)
任务的提取
$tasks = $redis->RPop("key:task:list",0,-1)
可是大家想,如何使用mysql来实现一个队列表呢?
映入大家脑海的一个典型的模式是一个表包含多种类型的记录:未处理记录,已处理记录,正在处理记录等。一个或者多个消费者线程在表中查询未处理的记录,然后声称正在处理这个任务,处理完成之后,再讲记录更新为已处理状态。
这个典型的模式,存在两个问题;1:随着队列表越来越大,查找未处理记录的速度会越来越慢。2:频繁的加锁会让多个消费者线程增加竞争。
首先我们来创建一个表
create table unsent_emails{id int not null primary key auto_increment,status enum("unsent","claimed","sent"),
owner int unsigned not null default 0,
ts timestamp,key (owner,status,ts)
}
该表的列owner用来存储当前正在处理这个记录的连接id,由函数 CONNECTION_ID()返回的连接id或者线程id。如果这个记录当前被没有被处理,则该值为0
我们在 owner status ts上面做了索引的处理,所以查找未处理的记录会很快。
通过我们会采用 select for update的方式来标记待处理的记录,方法如下
beginselect id from unsent_emailswhere owner = 0 and status = 'unsent'
limit 10 for update-- result 10,20,33update unsent_emailsset status = 'claimed',owner = CONNECTION_ID()where id in (10,20,33)
commit
select的时候,使用了两个索引,应该会很快。问题出在select 和 update两个查询之间的间隙,这里的加锁会让其他相同的查询全部阻塞。
如果我们采用update then select的方式,那么效果就会更加高效,代码如下
set autocommit=1commitupdate unsent_emailsset statue = 'claimed',owner = CONNECTION_ID()where owner = 0 and status = 'unsent'
limit 10set autocommit=0select id from unsent_emailswhere owner = CONNECTION_ID() and status = 'claimed'
根本无需使用select去查找哪些记录还没有处理。客户端协议会告诉你更新了几条记录,就可以知道这次需要处理多少条记录。
这样是不是解决了上面的第二个问题,select for update的模式的加锁会增加多个消费队列的竞争问题。
其实所有的select for update 都可以替换为 update then select模式。
问题还没有结束,还有一种情况需要处理,就是比如正在处理任务的进程异常退出了,那么对应的进程正在处理的任务也就变为僵尸任务了。如何避免这种情况的发生呢?
所以我们还是需要一个新的定时器或者线程来定时检测并且update,将那些僵尸任务的记录更新到原始状态,就可以了。
僵尸任务的定义必须符合两点,1:任务被搁置了很久,比如十分钟,而通常一个任务只需要10秒就可以处理完;2:任务的owner(线程id或者连接id)已经不存在,只需要执行show processlist就可以获取当前正在工作的线程id了。代码如下
update unsent_emailsset owner = 0,status = 'unsent'
where owner not in (10,20,33,44) and status = 'claimed'
and ts <current_timestamp - interval 10 minute
一个基于mysql构建的队列表就完成了。
当然,最好的办法就是将任务队列从数据库中迁移出来。redis真是一个很好的队列容器,当然也可以使用ssdb(基于leveldb,内存占用更少)。
这完全是文不对题啊,队列是一种先进先出的数据结构,通常在各种编程语言中都提供相应的类库支持,但MySQL是一个关系型数据库管理系统,并不直接提供这种功能,也不应该提供这种功能。如果真需要先进先出,就把查询的结果放入到对应高级语言的队列中即可。
直接在mysql里从本地文件系统导入数据mysql》LOAD DATA LOCAL INFILE 'C:\\Users\\asys\\Documents\\Tencent Files\\13174605\\FileRecv\\2015082818' INTO TABLE track_log
FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' (注意这里文件是从linux导出的,以\n结尾)
sqoop从hive导出数据到mysql
先在mysql建立表
grant all privileges on *.* to 'root'@'%' identified by 'Nokia123' with grant option
CREATE TABLE `track_log` (
`id` varchar(1000) DEFAULT NULL,
`url` varchar(5000) DEFAULT NULL,
`referer` varchar(5000) DEFAULT NULL,
`keyword` varchar(5000) DEFAULT NULL,
`type` varchar(1000) DEFAULT NULL,
`guid` varchar(1000) DEFAULT NULL,
`pageId` varchar(1000) DEFAULT NULL,
`moduleId` varchar(1000) DEFAULT NULL,
`linkId` varchar(1000) DEFAULT NULL,
`attachedInfo` varchar(1000) DEFAULT NULL,
`sessionId` varchar(1000) DEFAULT NULL,
`trackerU` varchar(1000) DEFAULT NULL,
`trackerType` varchar(1000) DEFAULT NULL,
`ip` varchar(1000) DEFAULT NULL,
`trackerSrc` varchar(1000) DEFAULT NULL,
`cookie` varchar(5000) DEFAULT NULL,
`orderCode` varchar(1000) DEFAULT NULL,
`trackTime` varchar(1000) DEFAULT NULL,
`endUserId` varchar(1000) DEFAULT NULL,
`firstLink` varchar(1000) DEFAULT NULL,
`sessionViewNo` varchar(5000) DEFAULT NULL,
`productId` varchar(1000) DEFAULT NULL,
`curMerchantId` varchar(1000) DEFAULT NULL,
`provinceId` varchar(1000) DEFAULT NULL,
`cityId` varchar(1000) DEFAULT NULL,
`ds` varchar(20) DEFAULT NULL,
`hour` varchar(20) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
然后运行sqoop
jimmy》sqoop export --connect jdbc:mysql://localhost:3306/track_log --username root --password Nokia123 --table track_log --export-dir "/user/hive/warehouse/track_log/ds=20150828/hour=18" --fields-terminated-by '\t' --columns "id, url,referer,keyword,type,guid,pageId,moduleId,linkId,attachedInfo,sessionId,trackerU,trackerType,ip,trackerSrc,cookie,orderCode,endUserId,firstLink,sessionViewNo,productId,curMerchantId,provinceId,cityId,ds ,hour"
注意: 1)--table track_log 是目的mysql的表,库在链接指定(这里库名也叫track_log) --connect jdbc:mysql://localhost:3306/track_log
2) 不能直接从hive表里导出(不能--hive-table),只能从hdfs导出
3)必须指定字段--columns
4)字段分割符号要和hdfs文件里的一致 --fields-terminated-by '\t'
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)