多线程分批次查询数据

多线程分批次查询数据,第1张

线程分批次查询数据

多线程分批次查询数据
  • 在springboot+mybaties系统中结合多线程实现分批次查询数据

在springboot+mybaties系统中结合多线程实现分批次查询数据

需求: 在系统开发和对接过程中,常常出现大数据量获取的情况,比如你是A系统,去获取B系统的数据,B系统只是给你接口。并且返回只能返回10w条数据,但是实际上你查出来的量到达100w甚至上千万,那么怎么办呢?其实核心思想大家都应该猜的到那就是多线程分批次查询,比如100w分十个线程,然后是一个线程查询10w数据。那么接下来我就用最简单的具体代码例子给大家实践一下吧,话不多说,上Code!
最核心的代码如下,实际应用时把这个多线程放到你要的接口里面即可

package cc.mrbird.febs.system.controller;

import cc.mrbird.febs.common.annotation.Log;
import cc.mrbird.febs.common.controller.baseController;
import cc.mrbird.febs.common.domain.QueryRequest;
import cc.mrbird.febs.common.exception.FebsException;
import cc.mrbird.febs.common.utils.MD5Util;
import cc.mrbird.febs.common.utils.ThredQuery;
import cc.mrbird.febs.system.dao.UserMapper;
import cc.mrbird.febs.system.domain.User;
import cc.mrbird.febs.system.domain.UserConfig;
import cc.mrbird.febs.system.service.UserConfigService;
import cc.mrbird.febs.system.service.UserService;
import cc.mrbird.febs.system.service.impl.UserServiceImpl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Maps;
import com.wuwenze.poi.ExcelKit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;

import javax.servlet.http.HttpServletResponse;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

@Slf4j
@Validated
@RestController
@RequestMapping("user")
public class UserController extends baseController {

    private String message;

    @Autowired
    private UserServiceImpl userService;
    @Autowired
    UserMapper userMapper;
    @Autowired
    private UserConfigService userConfigService;
    //创建线程池
    //创建线程 7个参数
    //private static ExecutorService executorService = Executors.newFixedThreadPool(5);
    //1.corePoolSize 线程池种的的线程数量
    //2.maxmumPoolSize 线程池种最大的线程数量
    //3.keepAliveTime 线程池的数量大于线程数量时,多余的线程会在多长时间内销毁 一般设置0
    //4.TimeUnit keepAlive的时间单位 一般设置分钟  TimeUnit.MILLISECONDS
    //5.workQueue:任务队列,被提交但是未被执行的任务 一般设置10
    //6.threadFactory:线程工厂, 一般设置默认值  Executors.defaultThreadFactory(),
    //7.handler:拒绝略,任务太多来不及处理,如何拒绝  也设置 ThreadPoolExecutor.DiscardPolicy()
    //四种拒绝策略分别是:
    //          7.1 AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。 (默认)
    //          7.2 DiscardPolicy:也是丢弃任务,但是不抛出异常。
    //          7.3 DiscardOldestPolicy:忽略最早的任务(把最早添加任务到队列的任务忽略掉,然后执行当前的任务)
    //          7.4 CallerRunsPolicy:把超出的任务交给当前线程执行
    ExecutorService executorService=new ThreadPoolExecutor(
            5,
            5, 0l,
            TimeUnit.MILLISECONDS,
            new linkedBlockingDeque(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.DiscardPolicy());

   
    @PostMapping("queryUserByThread")
    public Map queryUserByThread(QueryRequest queryRequest, User user) throws FebsException {
        List result = new ArrayList<>();//返回结果

        try {
            log.error("start....");
            //方法一创建 多线程查询 87ms
            long start = System.currentTimeMillis();
            //查询数据库总数量
            int count = userMapper.countUser();
            int num = 10;//一次查询多少条
            //需要查询的次数
            int times = count / num;
            if (count % num != 0) {
                times = times + 1;
            }
            int bindex = 1;
             
            //Callable用于产生结果
            List>> tasks = new ArrayList>>();
            for (int i = 0; i < times; i++) {
                //1 反射去查询的版本
                Callable> qfe = new ThredQuery(user, bindex, num);
                //2.不用反射查询的版本
               // Callable> qfe = new ThredQueryWithOutInvok(userService, user, bindex, num);
                tasks.add(qfe);
                bindex += bindex;
            }
            //定义固定长度的线程池  防止线程过多
            ExecutorService executorService = new ThreadPoolExecutor(
                    15,
                    30, 0l,
                    TimeUnit.MILLISECONDS,
                    new linkedBlockingDeque(10),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.DiscardPolicy());
            //Future用于获取结果
            List>> futures = executorService.invokeAll(tasks);
            //处理线程返回结果
            if (futures != null && futures.size() > 0) {
                for (Future> future : futures) {
                    result.addAll(future.get());
                }
            }
            executorService.shutdown();//关闭线程池
            long end = System.currentTimeMillis();
           System.out.println("方法一创建 多线程查询时:" + (end - start) + "ms");
			//方法二:用流的多线程查询 效率更高 4ms
            result = Collections.synchronizedList(new ArrayList<>());//返回结果
            log.error("start....");
            long start1 = System.currentTimeMillis();
            //查询数据库总数量
            int count1 = userMapper.countUser();
            int num1 = 10;//一次查询多少条
            //需要查询的次数
            int times1 = count1 / num+1;
            List> searchList=new ArrayList<>();

            for (int i = 0; i < times1; i++) {
                IPage ipage = new Page<>((i+1), num1, false);//分页查询不做总数量查询
                searchList.add(ipage);
            }
            List finalResult1 = result;
            //parallelStream 是jdk源码调用计算机指令开多线程
            searchList.parallelStream().forEach(ipage -> {
                //调用业务方法查询数据,返回集合
                List list = userService.findUserThread2(ipage);
                finalResult1.addAll(list);
            });
            long end1 = System.currentTimeMillis();
            System.out.println("方法二:用流的多线程查询用时 :"+(end1-start1)+"ms");
            
            HashMap map = Maps.newHashMap();
            map.put("list", result);
            return map;
        } catch (Exception e) {
            message = "查询失败";
            log.error(message, e);
            throw new FebsException(message);
        }

    }
}

首先我们需要创建一个多线程的查询的对象, ThredQuery.java.。其中需要用到通过ApplicationContext获取bean,这个是需要用到的反射的版本

package cc.mrbird.febs.common.utils;


import cc.mrbird.febs.system.domain.User;
import cc.mrbird.febs.system.service.impl.UserServiceImpl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.context.ApplicationContext;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

public class ThredQuery implements Callable> {
    
    private ApplicationContext ac = SpringContextUtil.getApplicationContext();
    private UserServiceImpl userServiceimpl;//需要通过够方法把对应的业务service传进来 实际用的时候把类型变为对应的类型
    private User user;//查询条件 根据条件来定义该类的属性
    private int bindex;//分页 页面
    private int num;//数量


    
    public ThredQuery(User user, int bindex, int num) {
        this.user = user;
        this.bindex = bindex;
        this.num = num;
    }

    @Override
    public List call() throws Exception {
        // 用反射的原因个人猜测是 因为入参没有 userServiceimpl 接口对象spring框架不会注入 	 userMapper.findUserDetail(page, user)方法
       // 所以要用反射去找 userServiceimpl
      userServiceimpl = (UserServiceImpl) ac.getBean("userServiceImpl");
       Method idMethod = userServiceimpl.getClass().getMethod("findUserThread", Page.class);
       Page page=new Page();
       page.setSize(num);
       page.setCurrent(bindex);
       //调用业务方法查询数据,返回集合
       Object invoke = idMethod.invoke(userServiceimpl, page);
       IPage userPageInof=(IPage)invoke;
       List records = userPageInof.getRecords();
        return records;
    }
}

这个是不用到反射的线程查询对象 ThredQueryWithOutInvok.java

package cc.mrbird.febs.common.utils;


import cc.mrbird.febs.system.domain.User;
import cc.mrbird.febs.system.service.impl.UserServiceImpl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.context.ApplicationContext;

import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.Callable;

public class ThredQueryWithOutInvok implements Callable> {

    private UserServiceImpl userServiceimpl;//需要通过够方法把对应的业务service传进来 实际用的时候把类型变为对应的类型
    private User user;//查询条件 根据条件来定义该类的属性
    private int bindex;//分页 页面
    private int num;//数量

    
    public ThredQueryWithOutInvok(UserServiceImpl userServiceimpl, User user, int bindex, int num) {
        this.userServiceimpl = userServiceimpl;
        this.user = user;
        this.bindex = bindex;
        this.num = num;
    }

    @Override
    public List call() throws Exception {
        Page page = new Page();
        page.setSize(num);
        page.setCurrent(bindex);
        //调用业务方法查询数据,返回集合
        IPage list = userServiceimpl.findUserThread(page);
        List records = list.getRecords();
        return records;
    }
}

SpringContextUtil.java.的代码如下:

package cc.mrbird.febs.common.utils;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;


@SuppressWarnings({"unused"})
@Component
public class SpringContextUtil implements ApplicationContextAware {

  public static final Logger logger = LoggerFactory.getLogger(SpringContextUtil.class);

  private static ApplicationContext applicationContext;

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
      this.applicationContext = applicationContext;
      logger.info("applicationContext:" + SpringContextUtil.applicationContext);
  }

  public static ApplicationContext getApplicationContext() {
      return applicationContext;
  }

  public static Object getBean(String name) {
      return getApplicationContext().getBean(name);
  }

  public static  T getBean(Class clazz) {
      return getApplicationContext().getBean(clazz);
  }

  public static  T getBean(String name, Class clazz) {
      return getApplicationContext().getBean(name, clazz);
  }

}

然后需要编写调用的查询接口 UserService.java 和实现类 UserServiceImpl.java。还有查询的对应表的对象User.java 和对应的 userMapper.java 和 userMapper.xml 文件如下

UserService.java

package cc.mrbird.febs.system.service;

import cc.mrbird.febs.common.domain.QueryRequest;
import cc.mrbird.febs.system.domain.User;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;

import java.util.List;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;


public interface UserService extends IService {
   
     IPage findUserThread(Page page);
     List findUserThread2(IPage ipage);
}

UserServiceImpl.java

package cc.mrbird.febs.system.service.impl;

import cc.mrbird.febs.common.domain.FebsConstant;
import cc.mrbird.febs.common.domain.QueryRequest;
import cc.mrbird.febs.common.service.CacheService;
import cc.mrbird.febs.common.utils.SortUtil;
import cc.mrbird.febs.common.utils.MD5Util;
import cc.mrbird.febs.common.utils.ThredQuery;
import cc.mrbird.febs.system.dao.UserMapper;
import cc.mrbird.febs.system.dao.UserRoleMapper;
import cc.mrbird.febs.system.domain.User;
import cc.mrbird.febs.system.domain.UserRole;
import cc.mrbird.febs.system.manager.UserManager;
import cc.mrbird.febs.system.service.UserConfigService;
import cc.mrbird.febs.system.service.UserRoleService;
import cc.mrbird.febs.system.service.UserService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
import java.util.concurrent.*;

@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readonly = true, rollbackFor = Exception.class)
public class UserServiceImpl extends ServiceImpl implements UserService {
    @Override
    public IPage findUserThread(Page page) {
        try {
            User user=new User();
            return this.baseMapper.findUserList(page, user);
        } catch (Exception e) {
            log.error("查询用户异常", e);
            return null;
        }
    }
   @Override
    public List findUserThread2(IPage page) {
        User user=new User();
        List userList = baseMapper.findUserList2(page, user);
        return userList;
    }

}

UserMapper.java

package cc.mrbird.febs.system.dao;

import cc.mrbird.febs.system.domain.User;
import com.baomidou.mybatisplus.core.mapper.baseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;

import java.util.List;

public interface UserMapper extends baseMapper {

   IPage findUserList(Page page, @Param("user") User user);
   List findUserList2(IPage page, @Param("user") User user);
}

UserMapper.xml





        SELECT
        u.user_id userId,
        u.username,
        u.password,
        u.email,
        u.mobile,
        u. STATUS,
        u.create_time createTime,
        u.ssex,
        u.AVATAR,
        u.DEscriptION,
        u.LAST_LOGIN_TIME lastLoginTime
        FROM
        t_user u
       
           
               AND u.username = #{user.username}
           
           
               AND d.dept_id = #{user.deptId}
           
           
               And u.create_time ">> #{user.createTimeFrom}
           
           
               And u.create_time < #{user.createTimeTo}
           
           
               AND u.ssex = #{user.ssex}
           
           
               AND u.status = #{user.status}