一、故事背景
1、读取离线文件数据,再通过【离线数据】作为条件,查询第三方接口,返回最终的结果,再入库。
2、 业务逻辑是很简单, 读取文件、查询接口、返回数据集、入库 四步。
3、业务特性:第三方接口调用400毫秒(ms) 。
如果用普通单线程去跑算500毫秒一个请求,一天也就跑8W多数据量,20多亿的数据不知道跑到猴年马月了。
二、处理方案
A) 初步方案采用ganymed-ssh2(文件都存储在Linux服务器雹租消上) 来读文件,Redis来存储型盯消息、多线程来提升处理能力。
B) 流程图:
三、呈现问题
四、优化问题
最终流程图:
1、 通过Redis做一个计数器 每读取一行记录数值,即使服务终止后,先从Redis读取这个源知数值
再通过cat指定行数开始读数据即可。
2、 通过取模拆Key 分片到不同小Key存储 ,降低单个节点存储压力,也充分利用了存储资源。
3、Redis Push 提供了批量方式(leftPushAll) ,可以指定读取行数再批量入库,而pop并没有提供批量 只能一个一个pop。
4、消费者通过多线程pop、再分发到线程去处理。
五、总结问题
具体实现步骤如下:1. 新建一个文本文件,仿穗包含redis命令
SET Key0 Value0
SET Key1 Value1
...
SET KeyN ValueN
如果有了原始数据,其实构造这个文件并不难,譬如shell,python都可以
2. 将这些命令转化成Redis Protocol。
因为Redis管道功能支持的是Redis Protocol,而不是直接的Redis命令。
如何转化,可参考后面的脚本。
3. 利用管道插入
cat data.txt | redis-cli --pipe
Shell VS Redis pipe
下面通过测试来具体看看Shell批量导入和Redis pipe之间的效率。
测试思路肢敬:分别通过shell脚本和Redis pipe向数据库中插入10万相同数据,查看各自所花费的时间。
Shell
脚本如下:
#!/bin/bash
for ((i=0i<100000i++))
do
echo -en "helloworld" | redis-cli -x set name$i >>redis.log
done
每次插入的值都是helloworld,但键不同,name0,name1...name99999。
Redis pipe
Redis pipe会稍微麻烦一点
1>首先构造redis命令的文本文件
在这里,我选用了python
#!/usr/bin/python
for i in range(100000):
print 'set name'+str(i),'helloworld'
# python 1.py >redis_commands.txt
# head -2 redis_commands.txt
set name0 helloworld
set name1 helloworld
2>将这些命令转化成Redis Protocol
在这里,我利用了github上一个shell脚本,
#!/bin/bash
while read CMDdo
# each command begins with *{number arguments in command}\r\n
XS=($CMD)printf "*${#XS[@]}\r\n"历大慎
# for each argument, we append ${length}\r\n{argument}\r\n
for X in $CMDdo printf "\$${#X}\r\n$X\r\n"done
done <redis_commands.txt
# sh 20.sh >redis_data.txt
# head -7 redis_data.txt
*3
$3
set
$5
name0
$10
helloworld
至此,数据构造完毕。
测试结果
在开始本文之前请确保安装好Redis
和
Node.js
以及
Node.js
的
Redis
扩展
——
node_redis
首先创建一个新文件夹并新建文本文件
app.js
文件内容如下:
var
redis
=
require("redis")
,
client
=
redis.createClient()
client.on("error",
function
(err)
{
console.log("Error
"
+
err)
})
client.on("connect",
runSample)
function
runSample()
{
//
Set
a
value
client.set("型侍羡string
key",
"Hello
World",
function
(err,
reply)
{
console.log(reply.toString())
})
//
Get
a
value
client.get("string
key",
function
(err,
reply)
{
console.log(reply.toString())
})
}
当连接到
Redis
后会调用
runSample
函数并设置一个值,紧接着便读出该值,运行的结果如下:
OK
Hello
World
我们也可以使用
EXPIRE
命令来设置对象的失效时间,代码如下:
var
redis
=
require('redis')
,
client
=
redis.createClient()
client.on('error',
function
(err)
{
console.log('Error
'
+
err)
})
client.on('connect',
runSample)
function
runSample()
{
//
Set
a
value
with
an
expiration
client.set('string
key',
'Hello
World',
redis.print)
//
Expire
in
3
seconds
client.expire('string
key',
3)
//
This
timer
is
only
to
demo
the
TTL
//
Runs
every
second
until
the
timeout
//
occurs
on
the
value
var
myTimer
=
setInterval(function()
{
client.get('string
key',
function
(err,
reply)
{
if(reply)
{
console.log('I
live:
'
+
reply.toString())
}
else
{
clearTimeout(myTimer)
console.log('I
expired')
client.quit()
}
})
},
1000)
}
注意:
上述使用的定时器只是为了演示
EXPIRE
命令,你必须在
Node.js
项目中谨慎使用定时器。
运行上述程序的输出卜拍结谈皮果是:
Reply:
OK
I
live:
Hello
World
I
live:
Hello
World
I
live:
Hello
World
I
expired
接下来我们检查一个值在失效之前存留了多长时间:
var
redis
=
require('redis')
,
client
=
redis.createClient()
client.on('error',
function
(err)
{
console.log('Error
'
+
err)
})
client.on('connect',
runSample)
function
runSample()
{
//
Set
a
value
client.set('string
key',
'Hello
World',
redis.print)
//
Expire
in
3
seconds
client.expire('string
key',
3)
//
This
timer
is
only
to
demo
the
TTL
//
Runs
every
second
until
the
timeout
//
occurs
on
the
value
var
myTimer
=
setInterval(function()
{
client.get('string
key',
function
(err,
reply)
{
if(reply)
{
console.log('I
live:
'
+
reply.toString())
client.ttl('string
key',
writeTTL)
}
else
{
clearTimeout(myTimer)
console.log('I
expired')
client.quit()
}
})
},
1000)
}
function
writeTTL(err,
data)
{
console.log('I
live
for
this
long
yet:
'
+
data)
}
运行结果:
Reply:
OK
I
live:
Hello
World
I
live
for
this
long
yet:
2
I
live:
Hello
World
I
live
for
this
long
yet:
1
I
live:
Hello
World
I
live
for
this
long
yet:
0
I
expired
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)