管道模式(Pipeline Pattern) 是责任链模式的常用变体之一。在该模式中,管道扮演的是流水线的角色,负责将数据传递到一个加工处理序列中,数据在每个步骤中被加工处理后,传递到下一个步骤进行加工处理,直到全部步骤处理完毕。
场景适用于业务 *** 作由多个步骤组成而成,特别是后期可能在任意位置对子步骤进行增删改。本博,将继续模拟前文责任链模式的业务场景,如下
运用 上下文基类@Getter
@Setter
public class PipelineContext{
/**
* 处理开始时间
*/
private LocalDateTime startTime;
/**
* 处理结束时间
*/
private LocalDateTime endTime;
/**
* 获取实例名称
*/
public String getName() {
return this.getClass().getSimpleName();
}
}
审核业务上下文
@Getter
@Setter
public class AuditOrderContext extends PipelineContext {
/**
* 流水号
*/
private String tradeFlowNo;
/**
* 审核内容
*/
private String content;
/**
* 审核单 id
*/
private Long auditOrderId;
@Override
public String getName() {
return "审核单处理";
}
}
上下文处理器
public interface PipelineHandler {
/**
* 处理输入的上下文数据
*
* @param context 处理时的上下文数据
* @return 返回 true 则表示由下一个 ContextHandler 继续处理,返回 false 则表示处理结束
*/
boolean handle(T context);
}
审核单前置处理
@Component
@Slf4j
public class BeforeAuditPipelineHandler implements PipelineHandler {
@Override
public boolean handle(AuditOrderContext context) {
log.info("tradeFlowNo:{},审核前置开始了.",context.getTradeFlowNo());
if (StringUtils.isEmpty(context.getContent())){
log.error("The content must is not null");
// 结束流程
return false;
}
return true;
}
}
审核单初始化处理
@Component
@Slf4j
public class InitAuditPipelineHandler implements PipelineHandler {
@Override
public boolean handle(AuditOrderContext context) {
log.info("tradeFlowNo:{},审核初始化开始了.", context.getTradeFlowNo());
if (StringUtils.isEmpty(context.getAuditOrderId())) {
log.error("审核单初始化失败");
// 结束流程
return false;
}
return true;
}
}
审核单后置处理
@Component
@Slf4j
public class PostAuditPipelineHandler implements PipelineHandler {
@Override
public boolean handle(AuditOrderContext context) {
log.info("tradeFlowNo:{},审核后置处理开始了.", context.getTradeFlowNo());
// todo 业务逻辑
return true;
}
}
管道路由配置
类似于前文中,把相关的管道注册到容器中,模拟一个注册表功能。
@Configuration
@Slf4j
public class PipelineRouteConfig implements ApplicationContextAware {
private ApplicationContext appContext;
/**
* 定义容器,存放pipeline,模拟注册表
*/
private static final Map, List>>>
PIPELINE_ROUTE_MAP = new HashMap<>(4);
/*
* 在这里配置各种上下文类型对应的处理管道:键为上下文类型,值为处理器类型的列表
* 后续可以增删改,调整位置等。
*/
static {
log.info("管道列表初始化开始...");
PIPELINE_ROUTE_MAP.put(AuditOrderContext.class,
Arrays.asList(
BeforeAuditPipelineHandler.class,
InitAuditPipelineHandler.class,
PostAuditPipelineHandler.class
));
}
/**
* 在 Spring 启动时,根据路由表生成对应的管道映射关系
*/
@Bean("pipelineRouteMap")
public Map, List extends PipelineHandler extends PipelineContext>>> getHandlerPipelineMap() {
return PIPELINE_ROUTE_MAP.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline));
}
/**
* 根据给定的管道中 ContextHandler 的类型的列表,构建管道
*/
private List extends PipelineHandler extends PipelineContext>> toPipeline(
Map.Entry, List>>> entry) {
return entry.getValue().stream().map(appContext::getBean).collect(Collectors.toList());
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
appContext = applicationContext;
}
}
管道执行器
@Component
@Slf4j
public class PipelineExecutor {
/**
* 引用 PipelineRouteConfig 中的 pipelineRouteMap
*/
@Resource
private Map, List extends PipelineHandler super PipelineContext>>> pipelineRouteMap;
/**
* 异步处理输入的上下文数据
*
* @param context 上下文数据
* @param callback 处理完成的回调
*/
public void acceptAsync(PipelineContext context, BiConsumer callback) {
CompletableFuture.runAsync(()->{
boolean success = acceptSync(context);
if (callback != null) {
callback.accept(context, success);
}
});
}
/**
* 同步处理输入的上下文数据
* 如果处理时上下文数据流通到最后一个处理器且最后一个处理器返回 true,则返回 true,否则返回 false
*
* @param context 输入的上下文数据
* @return 处理过程中管道是否畅通,畅通返回 true,不畅通返回 false
*/
public boolean acceptSync(PipelineContext context) {
Objects.requireNonNull(context, "上下文数据不能为 null");
// 拿到数据类型
Class extends PipelineContext> dataType = context.getClass();
// 获取数据处理管道
List extends PipelineHandler super PipelineContext>> pipeline = pipelineRouteMap.get(dataType);
if (CollectionUtils.isEmpty(pipeline)) {
log.error("{} 的管道为空", dataType.getSimpleName());
return false;
}
// 管道是否畅通
boolean lastSuccess = true;
for (PipelineHandler super PipelineContext> handler : pipeline) {
try {
// 当前处理器处理数据,并返回是否继续向下处理
lastSuccess = handler.handle(context);
} catch (Throwable ex) {
lastSuccess = false;
log.error("[{}] 处理异常,handler:{}", context.getName(), handler.getClass().getSimpleName(), ex);
}
// 不再向下处理
if (!lastSuccess) { break; }
}
return lastSuccess;
}
}
单元测试
失败案例
@Slf4j
public class AuditPipelineTest extends BaseTest {
@Resource
private PipelineExecutor pipelineExecutor;
@Test
public void pipelineTest(){
AuditOrderContext auditOrderContext = new AuditOrderContext();
auditOrderContext.setTradeFlowNo("OID_202205092148_11000_01_123");
auditOrderContext.setContent("这是审核内容撒!");
boolean flag = pipelineExecutor.acceptSync(auditOrderContext);
if (flag){
log.info("审核单号:{},执行通道成功。",auditOrderContext.getTradeFlowNo());
}
}
}
成功案例
@Slf4j
public class AuditPipelineTest extends BaseTest {
@Resource
private PipelineExecutor pipelineExecutor;
@Test
public void pipelineTest(){
AuditOrderContext auditOrderContext = new AuditOrderContext();
auditOrderContext.setTradeFlowNo("OID_202205092148_11000_01_123");
auditOrderContext.setContent("这是审核内容撒!");
auditOrderContext.setAuditOrderId(1000L);
boolean flag = pipelineExecutor.acceptSync(auditOrderContext);
if (flag){
log.info("审核单号:{},执行通道成功。",auditOrderContext.getTradeFlowNo());
}
}
}
总结
- 通过管道模式,降低了系统的耦合度,提升了内聚程度与扩展性;
- 职责单一分明,PipelineExecutor 只做执行工作,不用关心具体的管道细节;
- 每个 PipelineHandler独立,只负责自己的业务逻辑实现;
- 新增、删除或者交换子步骤时,都只需要 *** 作路由表的配置,而不需要修改原来的调用代码。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)