网易传媒技术团队:消息中间件实现延迟队列的应用与实践

网易传媒技术团队:消息中间件实现延迟队列的应用与实践,第1张

早期需要延迟处理的业务场景,更多的是通过定时任务扫表,然后执行满足条件的记录,具有频率高、命中低、资源消耗大的缺点。随着消息中间件的普及,延迟消息可以很好的处理这种场景,本文主要介绍延迟消息的使用场景以及基于常见的消息中间件如何实现延迟队列,最后给出了一个在网易公开课使用延迟队列的实践。

1、有效期:限时活动、拼团。。。

2、超时处理:取消超时未支付订单、超时自动确认收货。。。

4、重试:网络异常重试、打车派单、依赖条件未满足重试。。。

5、定时任务:智能设备定时启动。。。

1、RabbitMQ

1)简介:基于AMQP协议,使用Erlang编写,实现了一个Broker框架

a、Broker:接收和分发消息的代理服务器

b、Virtual Host:虚拟主机之间相互隔离,可理解为一个虚拟主机对应一个消息服务

c、Exchange:交换机,消息发送到指定虚拟机的交换机上

d、Binding:交换机与队列绑定,并通过路由策略和routingKey将消息投递到一个或多个队列中

e、Queue:存放消息的队列,FIFO,可持久化

f、Channel:信道,消费者通过信道消费消息,一个TCP连接上可同时创建成百上千个信道,作为消息隔离

2)延迟队列实现:RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现

a、TTL:RabbitMQ支持对队列和消息各自设置存活时间,取二者中较小的值,即队列无消费者连接或消息在队列中一直未被消费的过期时间

b、DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息

3)缺点:

a、配置麻烦,额外增加一个死信交换机和一个死信队列的配置

b、脆弱性,配置错误或者生产者消费者连接的队列错误都有可能造成延迟失效

2、RocketMQ

1)简介:来源于阿里,目前为Apache顶级开源项目,使用Java编写,基于长轮询的拉取方式,支持事务消息,并解决了顺序消息和海量堆积的问题

a、Broker:存放Topic并根据读取Producer的提交日志,将逻辑上的一个Topic分多个Queue存储,每个Queue上存储消息在提交日志上的位置

b、Name Server:无状态的节点,维护Topic与Broker的对应关系以及Broker的主从关系

2)延迟队列实现:RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中),然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中

3)缺点:延迟时间粒度受限制(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)

3、Kafka

1)简介:来源于Linkedin,目前为Apache顶级开源项目,使用Scala和Java编写,基于zookeeper协调的分布式、流处理的日志系统,升级版为Jafka

2)延迟队列实现:Kafka支持延时生产、延时拉取、延时删除等,其基于时间轮和JDK的DelayQueue实现

a、时间轮(TimingWheel):是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表

b、定时任务列表(TimerTaskList):是一个环形的双向链表,链表中的每一项表示的都是定时任务项

c、定时任务项(TimerTaskEntry):封装了真正的定时任务TimerTask

d、层级时间轮:当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中,类似于钟表就是一个三级时间轮

e、JDK DelayQueue:存储TimerTaskList,并根据其expiration来推进时间轮的时间,每推进一次除执行相应任务列表外,层级时间轮也会进行相应调整

3)缺点:

a、延迟精度取决于时间格设置

b、延迟任务除由超时触发还可能被外部事件触发而执行

4、ActiveMQ

1)简介:基于JMS协议,Java编写的Apache顶级开源项目,支持点对点和发布订阅两种模式。

a、点对点(point-to-point):消息发送到指定的队列,每条消息只有一个消费者能够消费,基于拉模型

b、发布订阅(publish/subscribe):消息发送到主题Topic上,每条消息会被订阅该Topic的所有消费者各自消费,基于推模型

2)延迟队列实现:需要延迟的消息会先存储在JobStore中,通过异步线程任务JobScheduler将到达投递时间的消息投递到相应队列上

a、Broker Filter:Broker中定义了一系列BrokerFilter的子类构成拦截器链,按顺序对消息进行相应处理

b、ScheduleBroker:当消息中指定了延迟相关属性,并且jobId为空时,会生成调度任务存储到JobStore中,此时消息不会进入到队列

c、JobStore:基于BTree存储,key为任务执行的时间戳,value为该时间戳下需要执行的任务列表

d、JobScheduler:取JobStore中最小的key执行(调度时间最早的),执行时间<=当前时间,将该任务列表依次投递到所属的队列,对于需要重复投递和投递失败的会再次存入JobStore中。

注: 此处JobScheduler的执行时间间隔可动态变化,默认05s,有新任务时会立即执行(Object->notifyAll())并设置时间间隔为01s,没有新任务后,下次执行时间为最近任务的调度执行时间。

3)缺点:投递到队列失败,将消息重新存入JobStore,消息调度执行时间=系统当前时间+延迟时间,会导致消息被真实投递的时间可能为设置的延迟时间的整数倍

5、Redis

1)简介:基于Key-Value的NoSQL数据库,由于其极高的性能常被当作缓存来使用,其数据结构支持:字符串、哈希、列表、集合、有序集合

2)延迟队列实现:Redis的延迟队列基于有序集合,score为执行时间戳,value为任务实体或任务实体引用

3)缺点:

a、实现复杂,本身不支持

b、完全基于内存,延迟时间长浪费内存资源

6、消息队列对比

1、公开课延迟队列技术选型

1)业务场景:关闭超时未支付订单、限时优惠活动、拼团

2)性能要求:订单、活动、拼团 数据量可控,上述MQ均能满足要求

3)可靠性:使用ActiveMQ、RabbitMQ、RocketMQ作为延迟队列更普遍

4)可用性:ActiveMQ、RocketMQ自身支持延迟队列功能,且目前公开课业务中使用的中间件为ActiveMQ和Kafka

5)延迟时间灵活:活动的开始和结束时间比较灵活,而RocketMQ时间粒度较粗,Kafka会依赖时间格有精度缺失

结论: 最终选择ActiveMQ来作为延迟队列

2、业务场景:关闭未支付订单

1)关闭微信未支付订单

2)关闭IOS未支付订单

3、ActiveMQ使用方式

1)activemqxml中支持调度任务

2)发送消息时,设置message的延迟属性

其中:

a、延迟处理

AMQ_SCHEDULED_DELAY:设置多长时间后,投递给消费者(毫秒)

b、重复投递

AMQ_SCHEDULED_PERIOD:重复投递时间间隔(毫秒)

AMQ_SCHEDULED_REPEAT:重复投递次数

c、指定调度计划

AMQ_SCHEDULED_CRON:corn正则表达式

4、公开课使用中进行的优化

1)可靠性:针对实际投递时间可能翻倍的问题,结合ActiveMQ的重复投递,在消费者逻辑中做幂等处理来保证延迟时间的准确性

2)可追溯性:延迟消息及消费情况做数据库冗余存储

3)易用性:业务上定义好延迟枚举类型,直接使用JmsDelayTemplate发送,无需关心数据备份和参数等细节

1、无论是基于死信队列还是基于数据先存储后投递,本质上都是将延迟待发送的消息数据与正常订阅的队列分开存储,从而降低耦合度

2、无论是检查队头消息TTL还是调度存储的延迟数据,本质上都是通过定时任务来完成的,但是定时任务的触发策略以及延迟数据的存储方式决定了不同中间件之间的性能优劣

张浩,2018年加入网易传媒,高级Java开发工程师,目前在网易公开课主要做支付财务体系、版本迭代相关的工作。

1,MySQL 使用3 个线程来执行复制功能(其中1 个在主服务器上,另两个在从服务器上。当从服务器发出START SLAVE时,从服务器创建一个I/O线程,以连接主服务器并让主服务器发送二进制日志。主服务器创建一个线程将二进制日志中的内容发送到从服务器。从服务器I/O 线程读取主服务

器Binlog Dump线程发送的内容并将该数据拷贝到从服务器数据目录中的本地文件中,即中继

日志。第3个线程是SQL 线程,从服务器使用此线程读取中继日志并执行日志中包含的更新。

2,如1所述,主服务器收到从服务器的同步请求后,开始向从服务器发送二进制日志

3,附件的文档可以参考下

注:你使用java程序同步数据,先不说java的效率,仅仅全部sql都同步到从表再顺序执行就有问题,反复查库,插数据,还有数据不一致的问题,如insert into tab values (now())等等;

个人意见,仅供参考

private void timer1_Tick(object sender, SystemEventArgs e)

{

try{

//你的代码

}

catch{}

}

因为如果网络断开,是可预知的异常,可以实现做好截获的机制这样程序还能继续运行

在QT的widget中用tableview显示sqlite数据库表中的内容。

假设有数据库文件testdb,有表table(id integer, name nvarchar(20),age integer),且有数条数据。

首先用QTcreator创建一个基于Widget类的窗口,再拖一个tableview到widget中,保存,然后按照如下方法进行:

1在widgeth中增添头文件:QtSql/qsqlh、QtSql/QsqlDatabase、QtSql/QsqlQuery、QtSql/QsqlQueryModel

2在pro工程文件中添加:QT+=sql

3在widgetcpp中widget的构造函数中添加如下代码:

QsqDatabase db = QsqlDatabase::addDatabase("QSQLITE");

dbsetDatabaseName("testdb");

if(!dbopen())

{

//错误处理

}

static QSqlQueryModel model = new QSqlQueryModel(ui->tableview);

model->setQuery(QString("select from table"));

model->setHeaderData(0,Qt::Horizontal,QObject::tr("编号"));

model->setHeaderData(1,Qt::Horizontal,QObject::tr("姓名"));

model->setHeaderData(2,Qt::Horizontal,QObject::tr("年龄"));

ui->tableview->setModel(model);

db->close();

这样之后,table表里的内容就会显示到tableview中了。

对于账户锁定有很多方法,最简单就是把逻辑放在程序端控制。

新增一张登陆记录表(login_record),每次登陆做一次记录,不管是否登陆成功

在此表中可以设置一个字段来标识是否登陆成功

所以每次登陆是否通过的条件就有两个:

1登陆账号和密码要正确

2login_record中本日是否存在三条以上登陆失败记录

还有其他办法,可以利用spring的事务调度或sqlserver存储过程都可以解决这个问题。

// 实例化Ajax -------

var ajax = null;

// 接收URL地址

var url = "/asp"; //这个文件是你处理的文件

if(windowXML>

写个定时器,定时将一年前的数据备份到线下数据库中,并删除线上数据库一年前的数据,或者用kettle写个数据抽取调度,异步部署,每天抽取当前时间减去365的那天数据,并删除源库该天数据。

以上就是关于网易传媒技术团队:消息中间件实现延迟队列的应用与实践全部的内容,包括:网易传媒技术团队:消息中间件实现延迟队列的应用与实践、mysql主从数据库的问题:到底谁触发谁、c# winform网络断开重新连接后连接数据库的问题等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/sjk/9335452.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-27
下一篇 2023-04-27

发表评论

登录后才能评论

评论列表(0条)

保存