java 多线程 *** 作数据库

java 多线程 *** 作数据库,第1张

//将数据库中的数据条数分段

 public void division(){

  //获取要导入的总的数据条数

  String sql3="SELECT  count(*)  FROM [CMD].[dbo].[mycopy1]"

  try {

   pss=cons.prepareStatement(sql3)

   rss=pss.executeQuery()

   

   while(rss.next()){

   System.out.println("总记录条数:"+rss.getInt(1))

   sum=rss.getInt(1)

   }

   //每30000条记录作为一个分割点

   if(sum>=30000){

    n=sum/30000

    residue=sum%30000

   }else{

    residue=sum

   }

   

   System.out.println(n+"  "+residue)

   

  } catch (SQLException e) {

   // TODO Auto-generated catch block

   e.printStackTrace()

  }

  

 }

线程类

public MyThread(int start,int end) {

  this.end=end 

     this.start=start

  System.out.println("处理掉余数")

    try {

    

         System.out.println("--------"+Thread.currentThread().getName()+"------------")

    Class.forName(SQLSERVERDRIVER)

    System.out.println("加载sqlserver驱动...")

    cons = DriverManager.getConnection(CONTENTS,UNS,UPS)

    stas = cons.createStatement()

    System.out.println("连接SQLServer数据库成功!!")

    

    System.out.println("加载mysql驱动.....")

    Class.forName(MYSQLDRIVER)

    con = DriverManager.getConnection(CONTENT,UN,UP)

    sta = con.createStatement()

    // 关闭事务自动提交

    con.setAutoCommit(false)

    System.out.println("连接mysql数据库成功!!")

    

   } catch (Exception e) {

    e.printStackTrace() 

   }

  // TODO Auto-generated constructor stub

 }

 

 

 public ArrayList<Member> getAll(){

  Member member

  String sql1="select * from (select row_number() over (order by pmcode) as rowNum,*" +

    " from [CMD].[dbo].[mycopy1]) as t where rowNum between "+start+" and "+end

  try {

   System.out.println("正在获取数据...")

   allmembers=new ArrayList()

   rss=stas.executeQuery(sql1)

   while(rss.next()){

    member=new Member()

    member.setAddress1(rss.getString("address1"))

    member.setBnpoints(rss.getString("bnpoints"))

    member.setDbno(rss.getString("dbno"))

    member.setExpiry(rss.getString("expiry"))

    member.setHispoints(rss.getString("hispoints"))

    member.setKypoints(rss.getString("kypoints"))

    member.setLevels(rss.getString("levels"))

    member.setNames(rss.getString("names"))

    member.setPmcode(rss.getString("pmcode"))

    member.setRemark(rss.getString("remark"))

    member.setSex(rss.getString("sex"))

    member.setTelephone(rss.getString("telephone"))

    member.setWxno(rss.getString("wxno"))

    member.setPmdate(rss.getString("pmdate"))

    allmembers.add(member)

   // System.out.println(member.getNames())

   }

   System.out.println("成功获取sqlserver数据库数据!")

   return allmembers

   

  } catch (SQLException e) {

   // TODO Auto-generated catch block

   System.out.println("获取sqlserver数据库数据发送异常!")

   e.printStackTrace()

  }

  try {

   rss.close()

   stas.close()

  } catch (SQLException e) {

   // TODO Auto-generated catch block

   e.printStackTrace()

  }

  return null

 }

 

 public void inputAll(ArrayList<Member> allmembers){

  System.out.println("开始向mysql中写入")

  String sql2="insert into test.mycopy2 values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)"

  try {

   ps=con.prepareStatement(sql2)

   System.out.println("-------------------------等待写入数据条数: "+allmembers.size())

   for(int i=0i<allmembers.size()i++){

    ps.setString(1, allmembers.get(i).getPmcode())

    ps.setString(2, allmembers.get(i).getNames())

    //System.out.println(allmembers.get(i).getNames())

    ps.setString(3, allmembers.get(i).getSex())

    ps.setString(4, allmembers.get(i).getTelephone())

    ps.setString(5, allmembers.get(i).getAddress1())

    ps.setString(6, allmembers.get(i).getPmdate())

    ps.setString(7, allmembers.get(i).getExpiry())

    ps.setString(8, allmembers.get(i).getLevels())

    ps.setString(9, allmembers.get(i).getDbno())

    ps.setString(10, allmembers.get(i).getHispoints())

    ps.setString(11, allmembers.get(i).getBnpoints())

    ps.setString(12, allmembers.get(i).getKypoints())

    ps.setString(13, allmembers.get(i).getWxno())

    ps.setString(14, allmembers.get(i).getRemark())

    //插入命令列表

    //ps.addBatch()

    ps.executeUpdate()

   }

   //ps.executeBatch()

   con.commit()

   

   ps.close()

   con.close()

   this.flag=false

   System.out.println(Thread.currentThread().getName()+"--->OK")

  } catch (SQLException e) {

   // TODO Auto-generated catch block

   System.out.println("向mysql中更新数据时发生异常!")

   e.printStackTrace() 

  }

 }

 @Override

 public void run() {

  // TODO Auto-generated method stub

  while(true&&flag){

   this.inputAll(getAll())

  }

 }

//将数据库中的数据条数分段

public void division(){

//获取要导入的总的数据条数

String sql3="SELECT count(*) FROM [CMD].[dbo].[mycopy1]"

try {

pss=cons.prepareStatement(sql3)

rss=pss.executeQuery()

while(rss.next()){

System.out.println("总记录条数:"+rss.getInt(1))

sum=rss.getInt(1)

}

//每30000条记录作为一个分割点

if(sum>=30000){

n=sum/30000

residue=sum%30000

}else{

residue=sum

}

System.out.println(n+" "+residue)

} catch (SQLException e) {

// TODO Auto-generated catch block

e.printStackTrace()

}

}

线程类

public MyThread(int start,int end) {

this.end=end

this.start=start

System.out.println("处理掉余数")

try {

System.out.println("--------"+Thread.currentThread().getName()+"------------")

Class.forName(SQLSERVERDRIVER)

System.out.println("加载sqlserver驱动...")

cons = DriverManager.getConnection(CONTENTS,UNS,UPS)

stas = cons.createStatement()

System.out.println("连接SQLServer数据库成功!!")

System.out.println("加载mysql驱动.....")

Class.forName(MYSQLDRIVER)

con = DriverManager.getConnection(CONTENT,UN,UP)

sta = con.createStatement()

// 关闭事务自动提交

con.setAutoCommit(false)

System.out.println("连接mysql数据库成功!!")

} catch (Exception e) {

e.printStackTrace()

}

// TODO Auto-generated constructor stub

}

public ArrayList<Member>getAll(){

Member member

String sql1="select * from (select row_number() over (order by pmcode) as rowNum,*" +

" from [CMD].[dbo].[mycopy1]) as t where rowNum between "+start+" and "+end

try {

System.out.println("正在获取数据...")

allmembers=new ArrayList()

rss=stas.executeQuery(sql1)

while(rss.next()){

member=new Member()

member.setAddress1(rss.getString("address1"))

member.setBnpoints(rss.getString("bnpoints"))

member.setDbno(rss.getString("dbno"))

member.setExpiry(rss.getString("expiry"))

member.setHispoints(rss.getString("hispoints"))

member.setKypoints(rss.getString("kypoints"))

member.setLevels(rss.getString("levels"))

member.setNames(rss.getString("names"))

member.setPmcode(rss.getString("pmcode"))

member.setRemark(rss.getString("remark"))

member.setSex(rss.getString("sex"))

member.setTelephone(rss.getString("telephone"))

member.setWxno(rss.getString("wxno"))

member.setPmdate(rss.getString("pmdate"))

allmembers.add(member)

// System.out.println(member.getNames())

}

System.out.println("成功获取sqlserver数据库数据!")

return allmembers

} catch (SQLException e) {

// TODO Auto-generated catch block

System.out.println("获取sqlserver数据库数据发送异常!")

e.printStackTrace()

}

try {

rss.close()

stas.close()

} catch (SQLException e) {

// TODO Auto-generated catch block

e.printStackTrace()

}

return null

}

public void inputAll(ArrayList<Member>allmembers){

System.out.println("开始向mysql中写入")

String sql2="insert into test.mycopy2 values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)"

try {

ps=con.prepareStatement(sql2)

System.out.println("-------------------------等待写入数据条数: "+allmembers.size())

for(int i=0i<allmembers.size()i++){

ps.setString(1, allmembers.get(i).getPmcode())

ps.setString(2, allmembers.get(i).getNames())

//System.out.println(allmembers.get(i).getNames())

ps.setString(3, allmembers.get(i).getSex())

ps.setString(4, allmembers.get(i).getTelephone())

ps.setString(5, allmembers.get(i).getAddress1())

ps.setString(6, allmembers.get(i).getPmdate())

ps.setString(7, allmembers.get(i).getExpiry())

ps.setString(8, allmembers.get(i).getLevels())

ps.setString(9, allmembers.get(i).getDbno())

ps.setString(10, allmembers.get(i).getHispoints())

ps.setString(11, allmembers.get(i).getBnpoints())

ps.setString(12, allmembers.get(i).getKypoints())

ps.setString(13, allmembers.get(i).getWxno())

ps.setString(14, allmembers.get(i).getRemark())

//插入命令列表

//ps.addBatch()

ps.executeUpdate()

}

//ps.executeBatch()

con.commit()

ps.close()

con.close()

this.flag=false

System.out.println(Thread.currentThread().getName()+"--->OK")

} catch (SQLException e) {

// TODO Auto-generated catch block

System.out.println("向mysql中更新数据时发生异常!")

e.printStackTrace()

}

}

@Override

public void run() {

// TODO Auto-generated method stub

while(true&&flag){

this.inputAll(getAll())

}

}

以mysql为数据库写的一个粗陋的demo,你参考一下,希望不会因为代码过多被百度吞了——

import java.sql.Connection

import java.sql.DriverManager

import java.sql.PreparedStatement

import java.sql.ResultSet

import java.sql.SQLException

import java.util.ArrayList

import java.util.List

public class Test {

       

    public static void main(String[] args) {

        allotThread()

    }

       

    /**

     * 将100条数据分成10份并启动10个线程分别 *** 作

     */

    public static void allotThread() {

        List<String[]>datas = buildDatas()

        for (int i=0i<100i+=10) {

            List<String[]>tenDatas = datas.subList(i, i + 10)

            insertData(tenDatas)

        }

    }

       

    /**

     * 创建100条模拟数据

     * @return

     */

    public static List<String[]>buildDatas() {

        List<String[]>datas = new ArrayList<String[]>()

        for (int i=0i<100i++) {

            String[] data = {"id " + i, "name " + i}

            datas.add(data)

        }

        return datas

    }

       

    /**

     * 启动线程进行数据插入 *** 作

     * @param tenDatas

     */

    public static void insertData(final List<String[]>tenDatas) {

        new Thread(new Runnable() {

            public void run() {

                String sql = "insert into testtable (id, name) values (?, ?)"

                Connection conn = null

                PreparedStatement pstmt = null

                try {

                    conn = getConnection()

                    conn.setAutoCommit(false)

                    pstmt = getPstmt(conn, sql)

                    for (String[] data : tenDatas) {

                        pstmt.setString(1, data[0])

                        pstmt.setString(2, data[1])

                        pstmt.addBatch()

                    }

                    pstmt.executeBatch()

                    conn.commit()

                    conn.setAutoCommit(true)

                } catch (SQLException e) {

                    e.printStackTrace()

                    rollback(conn)

                } catch (ClassNotFoundException e) {

                    e.printStackTrace()

                } finally {

                    close(pstmt)

                    close(conn)

                }

            }

        }).start()

    }

       

    public static Connection getConnection() throws SQLException, ClassNotFoundException {

        Class.forName("com.mysql.jdbc.Driver")

        String dbUrl = "jdbc:mysql://localhost/test?useUnicode=true&characterEncoding=UTF-8"

        Connection conn = DriverManager.getConnection(dbUrl, "root", "tooeasy")

        return conn

    }

       

    public static PreparedStatement getPstmt(Connection conn, String sql) throws SQLException, ClassNotFoundException {

        PreparedStatement pstmt = conn.prepareStatement(sql)

        return pstmt

    }

       

    public static void rollback(Connection conn) {

        try {

            if (null != conn) {

                conn.rollback()

            }

        } catch (SQLException e) {

            e.printStackTrace()

        }

    }

       

    public static void close(Connection conn) {

        try {

            if (null != conn) {

                conn.close()

            }

        } catch (SQLException e) {

            e.printStackTrace()

        }

    }

       

    public static void close(PreparedStatement pstmt) {

        try {

            if (null != pstmt) {

                pstmt.close()

            }

        } catch (SQLException e) {

            e.printStackTrace()

        }

    }

       

    public static void close(ResultSet rs) {

        try {

            if (null != rs) {

                rs.close()

            }

        } catch (SQLException e) {

            e.printStackTrace()

        }

    }

}


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/9972595.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-03
下一篇 2023-05-03

发表评论

登录后才能评论

评论列表(0条)

保存