pom.xml
org.springframework.boot spring-boot-starter-aop
异步阻塞队列
用于数据存储。
AsyncQueue.java
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import com.mg.ecio.base.core.consts.HwsConstants; public class AsyncQueue { private BlockingQueue
接口任务日志类
要处理的任务。
IfaceLogTask.java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import com.resource.model.IfaceLog; import com.service.ResourceService; @Component @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class IfaceLogTask implements Runnable { private IfaceLog ifaceLog; @Autowired private ResourceService resourceService; @Override public void run() { synchronized(this){ if (ifaceLog != null && resourceService != null) { resourceService.insertIfaceLog(this.ifaceLog); } } } public IfaceLog getIfaceLog() { return ifaceLog; } public void setIfaceLog(IfaceLog ifaceLog) throws InterruptedException { Thread.sleep(100); this.ifaceLog = ifaceLog; } }
线程池
MonitorThreadPool.java
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class MonitorThreadPool { private static int corePoolSize = 20;// 线程池维护线程的最少数量 private static int maximumPoolSize = 100;// 线程池维护线程的最大数量 private static int keepAliveTime = 30;// 线程池维护线程所允许的空闲时间,多出corePoolSize之外的线程的允许发呆时间 private static int queueSize = 100; // 队列大小 private static ThreadPoolExecutor threadPool; // 先进先出阻塞队列 private static BlockingQueueworkQueue = new ArrayBlockingQueue<>(queueSize); private static MonitorThreadPool instance = null; public static MonitorThreadPool getInstance() { if (instance == null) { instance = new MonitorThreadPool(); } return instance; } public MonitorThreadPool() { // 构造一个线程池 threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.CallerRunsPolicy()); } public void shutdown() { threadPool.shutdownNow(); } public void execute(Runnable a) { threadPool.execute(a); } }
异步代理类
用于向阻塞队列存取数据。
start:构造并加载阻塞队列
post:向阻塞队列添加数据
take:从阻塞队列中取数据
AsyncAgent.java
import javax.annotation.PostConstruct; import org.springframework.stereotype.Component; @Component public class AsyncAgent { private static AsyncQueue asyncQueue = new AsyncQueue(); // PostConstruct 启动加载队列 @PostConstruct public void start() { System.out.println("============AsyncAgent is starting============"); asyncQueue.startQueue(); System.out.println("============AsyncAgent starts OK============"); } public static void post(Object object) { if (object != null) { asyncQueue.post(object); } } public static Object take() { return asyncQueue.take(); } }
异步消费类
AsyncConsumer.java
import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.resource.model.IfaceLog; @Component public class AsyncConsumer implements Runnable { private Thread asyncThread; @Autowired private IfaceLogTask ifaceLogTask; public void init() { if (asyncThread == null) { asyncThread = new Thread(this, "AsyncThreadConsumer-Thread"); } } @PostConstruct public void start() { System.out.println("============AsyncConsumer is starting============"); init(); asyncThread.start(); System.out.println("============AsyncConsumer starts OK============"); } @Override public void run() { while (true) { try { Object object = AsyncAgent.take(); if (object != null) { if (object instanceof IfaceLog) { ifaceLogTask.setIfaceLog((IfaceLog) object); MonitorThreadPool.getInstance().execute(ifaceLogTask); } } } catch (Exception e) { } } } }
aop实现
ClientInterceptor.java
@Component @Aspect public class ClientInterceptor { @Autowired private ResourceService resourceService; // 切点 @Pointcut("execution(* com.service.BussinessbaseService.excute(..))") public void clientPoint() { } // 环绕通知 @Around("clientPoint()") public Object clientAround(ProceedingJoinPoint pjp) throws Throwable { long start = System.currentTimeMillis(); // 获取执行方法的参数 Object[] args = pjp.getArgs(); IfaceLog ifaceLog = new IfaceLog(); ifaceLog.setService_id(args[0] == null ? "" : args[0].toString());// FIXME ifaceLog.setRequest_id(args[1] == null ? "" : args[1].toString()); ifaceLog.setServer_ip(systemConfig.getServerIp()); String iface = args[2].toString(); logger.info("iface:{}",iface); ifaceLog.setIface(iface); ifaceLog.setIface_desc(Services.ifaceTable.get(iface)); ifaceLog.setLog_sn(CommonUtil.getNextId() + CommonUtil.getRandomNumber()); // 前端接口请求的AOP baseRequest request = (baseRequest) args[3]; logger.info("baseRequest args[3]:----{}", JSON.toJSON(request)); ifaceLog.setParams(GsonUtil.toJson(request)); ifaceLog.setReq_time((int) (start / 1000)); ifaceLog.setIs_main(1); ifaceLog.setPlatform(request.getPlatform()); Object object = pjp.proceed(); long end = System.currentTimeMillis(); int time_consuming = (int) (end - start); int ent_time = (int) (end / 1000); baseResponse response = (baseResponse) object; ifaceLog.setError_content(response.getExceptionMsg()); //response.setExceptionMsg(null); ifaceLog.setRes_code(response.getCode()); ifaceLog.setRes_code_desc(response.getMsg()); ifaceLog.setRes_msg(GsonUtil.toJson(response)); ifaceLog.setEnd_time(ent_time); ifaceLog.setLog_time(ent_time); ifaceLog.setTime_consuming(time_consuming); if (!Services.SERVICE_UPLOAD_DEV_STATUS.equalsIgnoreCase(iface)) { AsyncAgent.post(ifaceLog);// 放入异步队列存储 } return object; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)