目录
1 无锁机制简介
2 RingBuffer简介
2.1 工作原理简介
3 disruptor实现流水异步入库
3.1 定义事件实体类
3.2 定义事件服务类
3.3 定义消费者
3.3.1 单任务处理
3.3.2 批处理
3.4 运行
4 pom依赖
5 参考
1 无锁机制简介
普通队列写入时需要通过锁机制避免并发,disruptor不用锁,使用CAS(Compare And Swap/Set) *** 作确保线程安全,这是一个CPU级别的指令,工作方式类似乐观锁。
2 RingBuffer简介Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首尾相连的数组。相比于linkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,linkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比linkedBlockingQueue要快。
RingBuffer中分离了读指针和写指针,从而使生产者和消费者互不干扰,两者可以完全并发执行,从而使性能达到数倍于传统基于互斥锁方式实现的消息队列模型。
RingBuffer保持数组元素永远有效,入队列直接覆盖旧的数据,相比普通数组队列,无需GC。
2.1 工作原理简介disruptor的是基于事件实现的,那么就有了生产者(provider)和消费者(consumer)存在,生产者生产元素放入数组中,消费者从数组中消费元素,这个数组就是RingBuffer。每一个生产者和消费者内部都会有一个私有指针pri-sequence,表示当前 *** 作的元素序号,同时RingBuffer内部也会有一个全局指针global-sequence指向最后一个可以被消费的元素。这样当生产者需要放数据时,只需要获取global-sequence的下一个位置,下一个位置如果还未被消费,那么就会进入等待策略,如果下一个位置已经被消费,那么就会直接覆盖当前位置的属性值。
当生产者需要向容器中存放数据时,只需要使用sequence%(数组长度-1)就可以得到要添加的元素应该放在哪儿个位置上,这样就实现了数组的首尾相连。
3 disruptor实现流水异步入库 3.1 定义事件实体类disruptor初始化时需要指定容器大小,容器大小指定为2^n,计算时可以可以使用位运算:
如果容器大小是8,要放12号元素。12%8 = 12 &(8-1)=1100&0111=0100=4。
使用位运算可以提升效率。
LogEvent作为队列RingBuffer中的元数据
import java.io.Serializable; import java.util.Date; public class LogEvent implements Serializable { private static final long serialVersionUID = 1L; private String userId;// char(32) private String rspCd;// char(2) private String rspMsg;// varchar(128) private Date transCrtTs;// timestamp(3) private Date transCfmTs;// timestamp(3) public LogEvent() { this.userId = ""; this.rspCd = ""; this.rspMsg = ""; this.transCrtTs = new Date(); this.transCfmTs = new Date(); } public String getUserId() { return userId; } public void setUserId(String userId) { if (userId == null) { return; } this.userId = userId; } public String getRspCd() { return rspCd; } public void setRspCd(String rspCd) { if (rspCd == null) { return; } this.rspCd = rspCd; } public String getRspMsg() { return rspMsg; } public void setRspMsg(String rspMsg) { if (rspMsg == null) { return; } this.rspMsg = rspMsg; } public Date getTransCrtTs() { return transCrtTs; } public void setTransCrtTs(Date transCrtTs) { if (transCrtTs == null) { return; } this.transCrtTs = transCrtTs; } public Date getTransCfmTs() { return transCfmTs; } public void setTransCfmTs(Date transCfmTs) { if (transCfmTs == null) { return; } this.transCfmTs = transCfmTs; } @Override public String toString() { StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("LogEvent{"); stringBuffer.append("userId="); stringBuffer.append(userId); stringBuffer.append(", rspCd="); stringBuffer.append(rspCd); stringBuffer.append(", rspMsg="); stringBuffer.append(rspMsg); stringBuffer.append(", transCrtTs="); stringBuffer.append(transCrtTs); stringBuffer.append(", transCfmTs="); stringBuffer.append(transCfmTs); return stringBuffer.toString(); } }3.2 定义事件服务类
LogEventService中,初始化队列RingBuffer,为生产者提供接口。
import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import org.apache.commons.beanutils.BeanUtils; import java.util.concurrent.Executors; public class LogEventService { private static LogEventService instance; private static int RING_BUFFER_SIZE = 1024 * 1024; private RingBuffer3.3 定义消费者 3.3.1 单任务处理ringBuffer; private LogEventHandler logEventHandler; public LogEventService() { initRingBuffer(); } public static LogEventService getInstance() { if (instance == null) { instance = new LogEventService(); } return instance; } private void initRingBuffer() { try { logEventHandler = new LogEventHandler(); Disruptor disruptor = new Disruptor<>(EVENT_FACTORY, RING_BUFFER_SIZE, Executors.defaultThreadFactory()); disruptor.handleEventsWith(logEventHandler); ringBuffer = disruptor.start(); } catch (Exception e) { } } public void publish(LogEvent log) { long sequence = ringBuffer.next(); try { LogEvent ringValue = ringBuffer.get(sequence); BeanUtils.copyProperties(ringValue, log);//复制对象中的所有属性 } catch (Exception e) { } finally { ringBuffer.publish(sequence); } } public final EventFactory EVENT_FACTORY = new EventFactory () { @Override public LogEvent newInstance() { return new LogEvent(); } }; }
import com.lmax.disruptor.EventHandler; import com.dto.LogEvent; import com.task.LogTask; public class LogEventHandler implements EventHandler3.3.2 批处理{ @Override public void onEvent(LogEvent log, long sequence, boolean endOfBatch) { LogTask logTask = new LogTask(); logTask.process(log);//调用相关服务 } }
使用批处理方式,消费队列中的对象,调用相关服务
import com.google.common.collect.Lists; import com.lmax.disruptor.EventHandler; import java.util.List; import com.dto.LogEvent; import com.task.LogTask; public class LogEventHandler implements EventHandler3.4 运行{ private final static int DB_BATCH_SIZE = 100; private final static int RING_BATCH_SIZE = 1024; private List
创建2个生产线程,测试生产和消费过程。
import java.util.Date; import com.dto.LogEvent; public class DisruptorTest { private final static int THREAD_NUM = 2;//生产者线程数 private final static int TASK_NUM = 10000;//每个生产者生产任务的数量 public static void main(String[] args) { for (int i = 0; i < THREAD_NUM; i++) { new DisruptorThread().start();//创建并启动生产者线程 } } private static class DisruptorThread extends Thread { @Override public void run() { for (int i = 0; i < TASK_NUM; i++) { LogEvent log = initLogEvent(); LogEventService.getInstance().publish(log); } } } private static LogEvent initLogEvent() { LogEvent log = new LogEvent(); log.setUserId("123456789"); log.setRspCd("00"); log.setRspMsg("成功"); log.setTransCrtTs(new Date()); log.setTransCfmTs(new Date()); return log; } }4 pom依赖
5 参考com.lmax disruptor3.4.1
Disruptorhttp://www.manongjc.com/detail/22-eslcjjgowuksoks.htmlJava多线程之Disruptor入门https://www.jb51.net/article/211039.htm利用disruptor DB批量存储https://blog.csdn.net/hanbaoqi99/article/details/78954915
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)