通过Redis消息队列实现大文件处理

通过Redis消息队列实现大文件处理,第1张

一、故事背景

1、读取离线文件数据,再通过【离线数据】作为条件,查询第三方接口,返回最终的结果,再入库。

2、 业务逻辑是很简单, 读取文件、查询接口、返回数据集、入库 四步。

3、业务特性:第三方接口调用400毫秒(ms) 。

如果用普通单线程去跑算500毫秒一个请求,一天也就跑8W多数据量,20多亿的数据不知道跑到猴年马月了。

二、处理方案

A) 初步方案采用ganymed-ssh2(文件都存储在Linux服务器雹租消上) 来读文件,Redis来存储型盯消息、多线程来提升处理能力。

B) 流程图:

三、呈现问题

四、优化问题

最终流程图:

1、 通过Redis做一个计数器 每读取一行记录数值,即使服务终止后,先从Redis读取这个源知数值

再通过cat指定行数开始读数据即可。

2、 通过取模拆Key 分片到不同小Key存储 ,降低单个节点存储压力,也充分利用了存储资源。

3、Redis Push 提供了批量方式(leftPushAll) ,可以指定读取行数再批量入库,而pop并没有提供批量 只能一个一个pop。

4、消费者通过多线程pop、再分发到线程去处理。

五、总结问题

具体实现步骤如下:

1. 新建一个文本文件,仿穗包含redis命令

SET Key0 Value0

SET Key1 Value1

...

SET KeyN ValueN

如果有了原始数据,其实构造这个文件并不难,譬如shell,python都可以

2. 将这些命令转化成Redis Protocol。

因为Redis管道功能支持的是Redis Protocol,而不是直接的Redis命令。

如何转化,可参考后面的脚本。

3. 利用管道插入

cat data.txt | redis-cli --pipe

Shell VS Redis pipe

下面通过测试来具体看看Shell批量导入和Redis pipe之间的效率。

测试思路肢敬:分别通过shell脚本和Redis pipe向数据库中插入10万相同数据,查看各自所花费的时间。

Shell

脚本如下:

#!/bin/bash

for ((i=0i<100000i++))

do

echo -en "helloworld" | redis-cli -x set name$i >>redis.log

done

每次插入的值都是helloworld,但键不同,name0,name1...name99999。

Redis pipe

Redis pipe会稍微麻烦一点

1>首先构造redis命令的文本文件

在这里,我选用了python

#!/usr/bin/python

for i in range(100000):

print 'set name'+str(i),'helloworld'

# python 1.py >redis_commands.txt

# head -2 redis_commands.txt

set name0 helloworld

set name1 helloworld

2>将这些命令转化成Redis Protocol

在这里,我利用了github上一个shell脚本,

#!/bin/bash

while read CMDdo

# each command begins with *{number arguments in command}\r\n

XS=($CMD)printf "*${#XS[@]}\r\n"历大慎

# for each argument, we append ${length}\r\n{argument}\r\n

for X in $CMDdo printf "\$${#X}\r\n$X\r\n"done

done <redis_commands.txt

# sh 20.sh >redis_data.txt

# head -7 redis_data.txt

*3

$3

set

$5

name0

$10

helloworld

至此,数据构造完毕。

测试结果

在开始本文之前请确保安装好

Redis

Node.js

以及

Node.js

Redis

扩展

——

node_redis

首先创建一个新文件夹并新建文本文件

app.js

文件内容如下:

var

redis

=

require("redis")

,

client

=

redis.createClient()

client.on("error",

function

(err)

{

console.log("Error

"

+

err)

})

client.on("connect",

runSample)

function

runSample()

{

//

Set

a

value

client.set("型侍羡string

key",

"Hello

World",

function

(err,

reply)

{

console.log(reply.toString())

})

//

Get

a

value

client.get("string

key",

function

(err,

reply)

{

console.log(reply.toString())

})

}

当连接到

Redis

后会调用

runSample

函数并设置一个值,紧接着便读出该值,运行的结果如下:

OK

Hello

World

我们也可以使用

EXPIRE

命令来设置对象的失效时间,代码如下:

var

redis

=

require('redis')

,

client

=

redis.createClient()

client.on('error',

function

(err)

{

console.log('Error

'

+

err)

})

client.on('connect',

runSample)

function

runSample()

{

//

Set

a

value

with

an

expiration

client.set('string

key',

'Hello

World',

redis.print)

//

Expire

in

3

seconds

client.expire('string

key',

3)

//

This

timer

is

only

to

demo

the

TTL

//

Runs

every

second

until

the

timeout

//

occurs

on

the

value

var

myTimer

=

setInterval(function()

{

client.get('string

key',

function

(err,

reply)

{

if(reply)

{

console.log('I

live:

'

+

reply.toString())

}

else

{

clearTimeout(myTimer)

console.log('I

expired')

client.quit()

}

})

},

1000)

}

注意:

上述使用的定时器只是为了演示

EXPIRE

命令,你必须在

Node.js

项目中谨慎使用定时器。

运行上述程序的输出卜拍结谈皮果是:

Reply:

OK

I

live:

Hello

World

I

live:

Hello

World

I

live:

Hello

World

I

expired

接下来我们检查一个值在失效之前存留了多长时间:

var

redis

=

require('redis')

,

client

=

redis.createClient()

client.on('error',

function

(err)

{

console.log('Error

'

+

err)

})

client.on('connect',

runSample)

function

runSample()

{

//

Set

a

value

client.set('string

key',

'Hello

World',

redis.print)

//

Expire

in

3

seconds

client.expire('string

key',

3)

//

This

timer

is

only

to

demo

the

TTL

//

Runs

every

second

until

the

timeout

//

occurs

on

the

value

var

myTimer

=

setInterval(function()

{

client.get('string

key',

function

(err,

reply)

{

if(reply)

{

console.log('I

live:

'

+

reply.toString())

client.ttl('string

key',

writeTTL)

}

else

{

clearTimeout(myTimer)

console.log('I

expired')

client.quit()

}

})

},

1000)

}

function

writeTTL(err,

data)

{

console.log('I

live

for

this

long

yet:

'

+

data)

}

运行结果:

Reply:

OK

I

live:

Hello

World

I

live

for

this

long

yet:

2

I

live:

Hello

World

I

live

for

this

long

yet:

1

I

live:

Hello

World

I

live

for

this

long

yet:

0

I

expired


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/12297483.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-24
下一篇 2023-05-24

发表评论

登录后才能评论

评论列表(0条)

保存