- 在springboot+mybaties系统中结合多线程实现分批次查询数据
需求: 在系统开发和对接过程中,常常出现大数据量获取的情况,比如你是A系统,去获取B系统的数据,B系统只是给你接口。并且返回只能返回10w条数据,但是实际上你查出来的量到达100w甚至上千万,那么怎么办呢?其实核心思想大家都应该猜的到那就是多线程分批次查询,比如100w分十个线程,然后是一个线程查询10w数据。那么接下来我就用最简单的具体代码例子给大家实践一下吧,话不多说,上Code!
最核心的代码如下,实际应用时把这个多线程放到你要的接口里面即可
package cc.mrbird.febs.system.controller; import cc.mrbird.febs.common.annotation.Log; import cc.mrbird.febs.common.controller.baseController; import cc.mrbird.febs.common.domain.QueryRequest; import cc.mrbird.febs.common.exception.FebsException; import cc.mrbird.febs.common.utils.MD5Util; import cc.mrbird.febs.common.utils.ThredQuery; import cc.mrbird.febs.system.dao.UserMapper; import cc.mrbird.febs.system.domain.User; import cc.mrbird.febs.system.domain.UserConfig; import cc.mrbird.febs.system.service.UserConfigService; import cc.mrbird.febs.system.service.UserService; import cc.mrbird.febs.system.service.impl.UserServiceImpl; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.StringPool; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.google.common.collect.Maps; import com.wuwenze.poi.ExcelKit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.shiro.authz.annotation.RequiresPermissions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletResponse; import javax.validation.Valid; import javax.validation.constraints.NotBlank; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; @Slf4j @Validated @RestController @RequestMapping("user") public class UserController extends baseController { private String message; @Autowired private UserServiceImpl userService; @Autowired UserMapper userMapper; @Autowired private UserConfigService userConfigService; //创建线程池 //创建线程 7个参数 //private static ExecutorService executorService = Executors.newFixedThreadPool(5); //1.corePoolSize 线程池种的的线程数量 //2.maxmumPoolSize 线程池种最大的线程数量 //3.keepAliveTime 线程池的数量大于线程数量时,多余的线程会在多长时间内销毁 一般设置0 //4.TimeUnit keepAlive的时间单位 一般设置分钟 TimeUnit.MILLISECONDS //5.workQueue:任务队列,被提交但是未被执行的任务 一般设置10 //6.threadFactory:线程工厂, 一般设置默认值 Executors.defaultThreadFactory(), //7.handler:拒绝略,任务太多来不及处理,如何拒绝 也设置 ThreadPoolExecutor.DiscardPolicy() //四种拒绝策略分别是: // 7.1 AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。 (默认) // 7.2 DiscardPolicy:也是丢弃任务,但是不抛出异常。 // 7.3 DiscardOldestPolicy:忽略最早的任务(把最早添加任务到队列的任务忽略掉,然后执行当前的任务) // 7.4 CallerRunsPolicy:把超出的任务交给当前线程执行 ExecutorService executorService=new ThreadPoolExecutor( 5, 5, 0l, TimeUnit.MILLISECONDS, new linkedBlockingDeque(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy()); @PostMapping("queryUserByThread") public Map queryUserByThread(QueryRequest queryRequest, User user) throws FebsException { List result = new ArrayList<>();//返回结果 try { log.error("start...."); //方法一创建 多线程查询 87ms long start = System.currentTimeMillis(); //查询数据库总数量 int count = userMapper.countUser(); int num = 10;//一次查询多少条 //需要查询的次数 int times = count / num; if (count % num != 0) { times = times + 1; } int bindex = 1; //Callable用于产生结果 List >> tasks = new ArrayList >>(); for (int i = 0; i < times; i++) { //1 反射去查询的版本 Callable > qfe = new ThredQuery(user, bindex, num); //2.不用反射查询的版本 // Callable
> qfe = new ThredQueryWithOutInvok(userService, user, bindex, num); tasks.add(qfe); bindex += bindex; } //定义固定长度的线程池 防止线程过多 ExecutorService executorService = new ThreadPoolExecutor( 15, 30, 0l, TimeUnit.MILLISECONDS, new linkedBlockingDeque
(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy()); //Future用于获取结果 List >> futures = executorService.invokeAll(tasks); //处理线程返回结果 if (futures != null && futures.size() > 0) { for (Future > future : futures) { result.addAll(future.get()); } } executorService.shutdown();//关闭线程池 long end = System.currentTimeMillis(); System.out.println("方法一创建 多线程查询时:" + (end - start) + "ms"); //方法二:用流的多线程查询 效率更高 4ms result = Collections.synchronizedList(new ArrayList<>());//返回结果 log.error("start...."); long start1 = System.currentTimeMillis(); //查询数据库总数量 int count1 = userMapper.countUser(); int num1 = 10;//一次查询多少条 //需要查询的次数 int times1 = count1 / num+1; List
> searchList=new ArrayList<>(); for (int i = 0; i < times1; i++) { IPage ipage = new Page<>((i+1), num1, false);//分页查询不做总数量查询 searchList.add(ipage); } List finalResult1 = result; //parallelStream 是jdk源码调用计算机指令开多线程 searchList.parallelStream().forEach(ipage -> { //调用业务方法查询数据,返回集合 List list = userService.findUserThread2(ipage); finalResult1.addAll(list); }); long end1 = System.currentTimeMillis(); System.out.println("方法二:用流的多线程查询用时 :"+(end1-start1)+"ms"); HashMap map = Maps.newHashMap(); map.put("list", result); return map; } catch (Exception e) { message = "查询失败"; log.error(message, e); throw new FebsException(message); } } }
首先我们需要创建一个多线程的查询的对象, ThredQuery.java.。其中需要用到通过ApplicationContext获取bean,这个是需要用到的反射的版本
package cc.mrbird.febs.common.utils; import cc.mrbird.febs.system.domain.User; import cc.mrbird.febs.system.service.impl.UserServiceImpl; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.springframework.context.ApplicationContext; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; public class ThredQuery implements Callable> { private ApplicationContext ac = SpringContextUtil.getApplicationContext(); private UserServiceImpl userServiceimpl;//需要通过够方法把对应的业务service传进来 实际用的时候把类型变为对应的类型 private User user;//查询条件 根据条件来定义该类的属性 private int bindex;//分页 页面 private int num;//数量 public ThredQuery(User user, int bindex, int num) { this.user = user; this.bindex = bindex; this.num = num; } @Override public List
call() throws Exception { // 用反射的原因个人猜测是 因为入参没有 userServiceimpl 接口对象spring框架不会注入 userMapper.findUserDetail(page, user)方法 // 所以要用反射去找 userServiceimpl userServiceimpl = (UserServiceImpl) ac.getBean("userServiceImpl"); Method idMethod = userServiceimpl.getClass().getMethod("findUserThread", Page.class); Page page=new Page(); page.setSize(num); page.setCurrent(bindex); //调用业务方法查询数据,返回集合 Object invoke = idMethod.invoke(userServiceimpl, page); IPage userPageInof=(IPage )invoke; List records = userPageInof.getRecords(); return records; } }
这个是不用到反射的线程查询对象 ThredQueryWithOutInvok.java
package cc.mrbird.febs.common.utils; import cc.mrbird.febs.system.domain.User; import cc.mrbird.febs.system.service.impl.UserServiceImpl; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.springframework.context.ApplicationContext; import java.lang.reflect.Method; import java.util.List; import java.util.concurrent.Callable; public class ThredQueryWithOutInvok implements Callable> { private UserServiceImpl userServiceimpl;//需要通过够方法把对应的业务service传进来 实际用的时候把类型变为对应的类型 private User user;//查询条件 根据条件来定义该类的属性 private int bindex;//分页 页面 private int num;//数量 public ThredQueryWithOutInvok(UserServiceImpl userServiceimpl, User user, int bindex, int num) { this.userServiceimpl = userServiceimpl; this.user = user; this.bindex = bindex; this.num = num; } @Override public List
call() throws Exception { Page page = new Page(); page.setSize(num); page.setCurrent(bindex); //调用业务方法查询数据,返回集合 IPage list = userServiceimpl.findUserThread(page); List records = list.getRecords(); return records; } }
SpringContextUtil.java.的代码如下:
package cc.mrbird.febs.common.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @SuppressWarnings({"unused"}) @Component public class SpringContextUtil implements ApplicationContextAware { public static final Logger logger = LoggerFactory.getLogger(SpringContextUtil.class); private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; logger.info("applicationContext:" + SpringContextUtil.applicationContext); } public static ApplicationContext getApplicationContext() { return applicationContext; } public static Object getBean(String name) { return getApplicationContext().getBean(name); } public staticT getBean(Class clazz) { return getApplicationContext().getBean(clazz); } public static T getBean(String name, Class clazz) { return getApplicationContext().getBean(name, clazz); } }
然后需要编写调用的查询接口 UserService.java 和实现类 UserServiceImpl.java。还有查询的对应表的对象User.java 和对应的 userMapper.java 和 userMapper.xml 文件如下
UserService.java
package cc.mrbird.febs.system.service; import cc.mrbird.febs.common.domain.QueryRequest; import cc.mrbird.febs.system.domain.User; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; import java.util.List; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; public interface UserService extends IService{ IPage findUserThread(Page page); List findUserThread2(IPage ipage); }
UserServiceImpl.java
package cc.mrbird.febs.system.service.impl; import cc.mrbird.febs.common.domain.FebsConstant; import cc.mrbird.febs.common.domain.QueryRequest; import cc.mrbird.febs.common.service.CacheService; import cc.mrbird.febs.common.utils.SortUtil; import cc.mrbird.febs.common.utils.MD5Util; import cc.mrbird.febs.common.utils.ThredQuery; import cc.mrbird.febs.system.dao.UserMapper; import cc.mrbird.febs.system.dao.UserRoleMapper; import cc.mrbird.febs.system.domain.User; import cc.mrbird.febs.system.domain.UserRole; import cc.mrbird.febs.system.manager.UserManager; import cc.mrbird.febs.system.service.UserConfigService; import cc.mrbird.febs.system.service.UserRoleService; import cc.mrbird.febs.system.service.UserService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.StringPool; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.concurrent.*; @Slf4j @Service @Transactional(propagation = Propagation.SUPPORTS, readonly = true, rollbackFor = Exception.class) public class UserServiceImpl extends ServiceImplimplements UserService { @Override public IPage findUserThread(Page page) { try { User user=new User(); return this.baseMapper.findUserList(page, user); } catch (Exception e) { log.error("查询用户异常", e); return null; } } @Override public List findUserThread2(IPage page) { User user=new User(); List userList = baseMapper.findUserList2(page, user); return userList; } }
UserMapper.java
package cc.mrbird.febs.system.dao; import cc.mrbird.febs.system.domain.User; import com.baomidou.mybatisplus.core.mapper.baseMapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.ibatis.annotations.Param; import java.util.List; public interface UserMapper extends baseMapper{ IPage findUserList(Page page, @Param("user") User user); List findUserList2(IPage page, @Param("user") User user); }
UserMapper.xml
SELECT u.user_id userId, u.username, u.password, u.email, u.mobile, u. STATUS, u.create_time createTime, u.ssex, u.AVATAR, u.DEscriptION, u.LAST_LOGIN_TIME lastLoginTime FROM t_user u AND u.username = #{user.username} AND d.dept_id = #{user.deptId} And u.create_time ">> #{user.createTimeFrom} And u.create_time < #{user.createTimeTo} AND u.ssex = #{user.ssex} AND u.status = #{user.status}
User.java
package cc.mrbird.febs.system.domain; import cc.mrbird.febs.common.converter.TimeConverter; import cc.mrbird.febs.common.domain.RegexpConstant; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.wuwenze.poi.annotation.Excel; import com.wuwenze.poi.annotation.ExcelField; import lombok.Data; import lombok.ToString; import javax.validation.constraints.Email; import javax.validation.constraints.NotBlank; import javax.validation.constraints.Pattern; import javax.validation.constraints.Size; import java.io.Serializable; import java.util.Date; @Data @TableName("t_user") @Excel("用户信息表") public class User implements Serializable { private static final long serialVersionUID = -4852732617765810959L; public static final String STATUS_VALID = "1"; public static final String STATUS_LOCK = "0"; public static final String DEFAULT_AVATAR = "default.jpg"; public static final String SEX_MALE = "0"; public static final String SEX_FEMALE = "1"; public static final String SEX_UNKNOW = "2"; // 默认密码 public static final String DEFAULT_PASSWORD = "1234qwer"; @TableId(value = "USER_ID", type = IdType.AUTO) private Long userId; @Size(min = 4, max = 10, message = "{range}") @ExcelField(value = "用户名") private String username; private String password; private Long deptId; @ExcelField(value = "部门") private transient String deptName; @Size(max = 50, message = "{noMoreThan}") @Email(message = "{email}") @ExcelField(value = "邮箱") private String email; @Pattern(regexp = RegexpConstant.MOBILE_REG, message = "{mobile}") @ExcelField(value = "手机号") private String mobile; @NotBlank(message = "{required}") @ExcelField(value = "状态", writeConverterExp = "0=锁定,1=有效") private String status; @ExcelField(value = "创建时间", writeConverter = TimeConverter.class) private Date createTime; private Date modifyTime; @ExcelField(value = "最后登录时间", writeConverter = TimeConverter.class) private Date lastLoginTime; @NotBlank(message = "{required}") @ExcelField(value = "性别", writeConverterExp = "0=男,1=女,2=保密") private String ssex; @Size(max = 100, message = "{noMoreThan}") @ExcelField(value = "个人描述") private String description; private String avatar; @NotBlank(message = "{required}") private transient String roleId; @ExcelField(value = "角色") private transient String roleName; // 排序字段 private transient String sortField; // 排序规则 ascend 升序 descend 降序 private transient String sortOrder; private transient String createTimeFrom; private transient String createTimeTo; private transient String id; public Long getAuthCacheKey() { return userId; } } ``
初始化数据user表的SQL脚本
DROP TABLE IF EXISTS `t_user`; CREATE TABLE `t_user` ( `USER_ID` bigint(10) NOT NULL AUTO_INCREMENT COMMENT '用户ID', `USERNAME` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '用户名', `PASSWORD` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '密码', `DEPT_ID` bigint(20) NULL DEFAULT NULL COMMENT '部门ID', `EMAIL` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '邮箱', `MOBILE` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '联系电话', `STATUS` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '状态 0锁定 1有效', `CREATE_TIME` datetime(0) NOT NULL COMMENT '创建时间', `MODIFY_TIME` datetime(0) NULL DEFAULT NULL COMMENT '修改时间', `LAST_LOGIN_TIME` datetime(0) NULL DEFAULT NULL COMMENT '最近访问时间', `SSEX` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '性别 0男 1女 2保密', `DEscriptION` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '描述', `AVATAR` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '用户头像', PRIMARY KEY (`USER_ID`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 23 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of t_user -- ---------------------------- INSERT INTO `t_user` VALUES (1, 'mrbird', '94f860c4bbfeb2f49c84e321fdda4b07', 1, 'mrbird123@hotmail.com', '13455533233', '1', '2017-12-27 15:47:19', '2019-01-17 02:34:19', '2021-11-24 03:52:23', '2', '我是帅比作者。', 'ubnKSIfAJTxIgXOKlciN.png'); INSERT INTO `t_user` VALUES (2, 'scott', '7b44a5363e3fd52435beb472e1d2b91f', 6, 'scott@qq.com', '15134627380', '1', '2017-12-29 16:16:39', '2019-01-18 00:59:09', '2019-01-28 01:54:09', '0', '我是scott,嗯嗯', 'jZUIxmJycoymBprLOUbT.png'); INSERT INTO `t_user` VALUES (12, 'jack', '552649f10640385d0728a80a4242893e', 6, 'jack@hotmail.com', NULL, '1', '2019-01-23 07:34:05', '2019-01-24 03:08:01', '2019-01-24 08:52:03', '0', NULL, 'default.jpg'); INSERT INTO `t_user` VALUES (13, '测试1111', '2d4dafa9fbd6e09ac90b9294d5968291', 1, '', '13677099534', '1', '2021-11-19 17:09:33', NULL, NULL, '0', NULL, 'default.jpg'); INSERT INTO `t_user` VALUES (14, '测试222', '998e5d3e3a4d56adfb470c8f66a625fc', 1, '', '13677099534', '1', '2021-11-19 17:09:53', NULL, NULL, '0', NULL, 'default.jpg'); INSERT INTO `t_user` VALUES (15, '测试333', '8b9cc6544ae29450c410cf69b485ff8a', 4, '', '13677099534', '1', '2021-11-19 17:10:09', NULL, NULL, '1', NULL, 'default.jpg'); INSERT INTO `t_user` VALUES (16, '测试444', '7fb9c177a4109ec7e7c8bb58d79445d6', 7, '', '13677099534', '1', '2021-11-19 17:10:29', NULL, NULL, '0', NULL, 'default.jpg'); INSERT INTO `t_user` VALUES (17, '测试555', '601cd1bfda9687bdb143e7fb61559fbe', 4, '', '13677099534', '1', '2021-11-19 17:10:46', NULL, NULL, '1', NULL, 'default.jpg'); INSERT INTO `t_user` VALUES (18, '测试666', 'b13d1b09502014ddfc6380cc63b63884', 4, '', '13677099534', '1', '2021-11-19 17:10:58', NULL, NULL, '0', NULL, 'default.jpg'); INSERT INTO `t_user` VALUES (19, '测试777', '5c01ebec4ed06a34016165091856c627', 4, '', '13677099534', '1', '2021-11-19 17:11:16', NULL, NULL, '0', NULL, 'default.jpg'); INSERT INTO `t_user` VALUES (20, '测试888', '06d30f505bdfd1ebf4ec7af3959a9828', 4, '', '13677099534', '1', '2021-11-19 17:11:28', NULL, NULL, '0', NULL, 'default.jpg'); INSERT INTO `t_user` VALUES (21, '测试999', '2ac27b70c2ed591582528453a174c6c0', 4, '', '13677099534', '1', '2021-11-19 17:11:48', NULL, NULL, '1', NULL, 'default.jpg'); INSERT INTO `t_user` VALUES (22, '测试10000', 'c72fad8a2cf1947f8709ddc3ce36fafb', 4, '', '13677099534', '1', '2021-11-19 17:11:59', NULL, NULL, '1', NULL, 'default.jpg');
postman请求url: http://127.0.0.1:9527/user/queryUserByThread
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)