public void division(){
//获取要导入的总的数据条数
String sql3="SELECT count(*) FROM [CMD].[dbo].[mycopy1]"
try {
pss=cons.prepareStatement(sql3)
rss=pss.executeQuery()
while(rss.next()){
System.out.println("总记录条数:"+rss.getInt(1))
sum=rss.getInt(1)
}
//每30000条记录作为一个分割点
if(sum>=30000){
n=sum/30000
residue=sum%30000
}else{
residue=sum
}
System.out.println(n+" "+residue)
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace()
}
}
线程类
public MyThread(int start,int end) {
this.end=end
this.start=start
System.out.println("处理掉余数")
try {
System.out.println("--------"+Thread.currentThread().getName()+"------------")
Class.forName(SQLSERVERDRIVER)
System.out.println("加载sqlserver驱动...")
cons = DriverManager.getConnection(CONTENTS,UNS,UPS)
stas = cons.createStatement()
System.out.println("连接SQLServer数据库成功!!")
System.out.println("加载mysql驱动.....")
Class.forName(MYSQLDRIVER)
con = DriverManager.getConnection(CONTENT,UN,UP)
sta = con.createStatement()
// 关闭事务自动提交
con.setAutoCommit(false)
System.out.println("连接mysql数据库成功!!")
} catch (Exception e) {
e.printStackTrace()
}
// TODO Auto-generated constructor stub
}
public ArrayList<Member> getAll(){
Member member
String sql1="select * from (select row_number() over (order by pmcode) as rowNum,*" +
" from [CMD].[dbo].[mycopy1]) as t where rowNum between "+start+" and "+end
try {
System.out.println("正在获取数据...")
allmembers=new ArrayList()
rss=stas.executeQuery(sql1)
while(rss.next()){
member=new Member()
member.setAddress1(rss.getString("address1"))
member.setBnpoints(rss.getString("bnpoints"))
member.setDbno(rss.getString("dbno"))
member.setExpiry(rss.getString("expiry"))
member.setHispoints(rss.getString("hispoints"))
member.setKypoints(rss.getString("kypoints"))
member.setLevels(rss.getString("levels"))
member.setNames(rss.getString("names"))
member.setPmcode(rss.getString("pmcode"))
member.setRemark(rss.getString("remark"))
member.setSex(rss.getString("sex"))
member.setTelephone(rss.getString("telephone"))
member.setWxno(rss.getString("wxno"))
member.setPmdate(rss.getString("pmdate"))
allmembers.add(member)
// System.out.println(member.getNames())
}
System.out.println("成功获取sqlserver数据库数据!")
return allmembers
} catch (SQLException e) {
// TODO Auto-generated catch block
System.out.println("获取sqlserver数据库数据发送异常!")
e.printStackTrace()
}
try {
rss.close()
stas.close()
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace()
}
return null
}
public void inputAll(ArrayList<Member> allmembers){
System.out.println("开始向mysql中写入")
String sql2="insert into test.mycopy2 values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
try {
ps=con.prepareStatement(sql2)
System.out.println("-------------------------等待写入数据条数: "+allmembers.size())
for(int i=0i<allmembers.size()i++){
ps.setString(1, allmembers.get(i).getPmcode())
ps.setString(2, allmembers.get(i).getNames())
//System.out.println(allmembers.get(i).getNames())
ps.setString(3, allmembers.get(i).getSex())
ps.setString(4, allmembers.get(i).getTelephone())
ps.setString(5, allmembers.get(i).getAddress1())
ps.setString(6, allmembers.get(i).getPmdate())
ps.setString(7, allmembers.get(i).getExpiry())
ps.setString(8, allmembers.get(i).getLevels())
ps.setString(9, allmembers.get(i).getDbno())
ps.setString(10, allmembers.get(i).getHispoints())
ps.setString(11, allmembers.get(i).getBnpoints())
ps.setString(12, allmembers.get(i).getKypoints())
ps.setString(13, allmembers.get(i).getWxno())
ps.setString(14, allmembers.get(i).getRemark())
//插入命令列表
//ps.addBatch()
ps.executeUpdate()
}
//ps.executeBatch()
con.commit()
ps.close()
con.close()
this.flag=false
System.out.println(Thread.currentThread().getName()+"--->OK")
} catch (SQLException e) {
// TODO Auto-generated catch block
System.out.println("向mysql中更新数据时发生异常!")
e.printStackTrace()
}
}
@Override
public void run() {
// TODO Auto-generated method stub
while(true&&flag){
this.inputAll(getAll())
}
}
//将数据库中的数据条数分段public void division(){
//获取要导入的总的数据条数
String sql3="SELECT count(*) FROM [CMD].[dbo].[mycopy1]"
try {
pss=cons.prepareStatement(sql3)
rss=pss.executeQuery()
while(rss.next()){
System.out.println("总记录条数:"+rss.getInt(1))
sum=rss.getInt(1)
}
//每30000条记录作为一个分割点
if(sum>=30000){
n=sum/30000
residue=sum%30000
}else{
residue=sum
}
System.out.println(n+" "+residue)
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace()
}
}
线程类
public MyThread(int start,int end) {
this.end=end
this.start=start
System.out.println("处理掉余数")
try {
System.out.println("--------"+Thread.currentThread().getName()+"------------")
Class.forName(SQLSERVERDRIVER)
System.out.println("加载sqlserver驱动...")
cons = DriverManager.getConnection(CONTENTS,UNS,UPS)
stas = cons.createStatement()
System.out.println("连接SQLServer数据库成功!!")
System.out.println("加载mysql驱动.....")
Class.forName(MYSQLDRIVER)
con = DriverManager.getConnection(CONTENT,UN,UP)
sta = con.createStatement()
// 关闭事务自动提交
con.setAutoCommit(false)
System.out.println("连接mysql数据库成功!!")
} catch (Exception e) {
e.printStackTrace()
}
// TODO Auto-generated constructor stub
}
public ArrayList<Member>getAll(){
Member member
String sql1="select * from (select row_number() over (order by pmcode) as rowNum,*" +
" from [CMD].[dbo].[mycopy1]) as t where rowNum between "+start+" and "+end
try {
System.out.println("正在获取数据...")
allmembers=new ArrayList()
rss=stas.executeQuery(sql1)
while(rss.next()){
member=new Member()
member.setAddress1(rss.getString("address1"))
member.setBnpoints(rss.getString("bnpoints"))
member.setDbno(rss.getString("dbno"))
member.setExpiry(rss.getString("expiry"))
member.setHispoints(rss.getString("hispoints"))
member.setKypoints(rss.getString("kypoints"))
member.setLevels(rss.getString("levels"))
member.setNames(rss.getString("names"))
member.setPmcode(rss.getString("pmcode"))
member.setRemark(rss.getString("remark"))
member.setSex(rss.getString("sex"))
member.setTelephone(rss.getString("telephone"))
member.setWxno(rss.getString("wxno"))
member.setPmdate(rss.getString("pmdate"))
allmembers.add(member)
// System.out.println(member.getNames())
}
System.out.println("成功获取sqlserver数据库数据!")
return allmembers
} catch (SQLException e) {
// TODO Auto-generated catch block
System.out.println("获取sqlserver数据库数据发送异常!")
e.printStackTrace()
}
try {
rss.close()
stas.close()
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace()
}
return null
}
public void inputAll(ArrayList<Member>allmembers){
System.out.println("开始向mysql中写入")
String sql2="insert into test.mycopy2 values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
try {
ps=con.prepareStatement(sql2)
System.out.println("-------------------------等待写入数据条数: "+allmembers.size())
for(int i=0i<allmembers.size()i++){
ps.setString(1, allmembers.get(i).getPmcode())
ps.setString(2, allmembers.get(i).getNames())
//System.out.println(allmembers.get(i).getNames())
ps.setString(3, allmembers.get(i).getSex())
ps.setString(4, allmembers.get(i).getTelephone())
ps.setString(5, allmembers.get(i).getAddress1())
ps.setString(6, allmembers.get(i).getPmdate())
ps.setString(7, allmembers.get(i).getExpiry())
ps.setString(8, allmembers.get(i).getLevels())
ps.setString(9, allmembers.get(i).getDbno())
ps.setString(10, allmembers.get(i).getHispoints())
ps.setString(11, allmembers.get(i).getBnpoints())
ps.setString(12, allmembers.get(i).getKypoints())
ps.setString(13, allmembers.get(i).getWxno())
ps.setString(14, allmembers.get(i).getRemark())
//插入命令列表
//ps.addBatch()
ps.executeUpdate()
}
//ps.executeBatch()
con.commit()
ps.close()
con.close()
this.flag=false
System.out.println(Thread.currentThread().getName()+"--->OK")
} catch (SQLException e) {
// TODO Auto-generated catch block
System.out.println("向mysql中更新数据时发生异常!")
e.printStackTrace()
}
}
@Override
public void run() {
// TODO Auto-generated method stub
while(true&&flag){
this.inputAll(getAll())
}
}
以mysql为数据库写的一个粗陋的demo,你参考一下,希望不会因为代码过多被百度吞了——
import java.sql.Connectionimport java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import java.util.ArrayList
import java.util.List
public class Test {
public static void main(String[] args) {
allotThread()
}
/**
* 将100条数据分成10份并启动10个线程分别 *** 作
*/
public static void allotThread() {
List<String[]>datas = buildDatas()
for (int i=0i<100i+=10) {
List<String[]>tenDatas = datas.subList(i, i + 10)
insertData(tenDatas)
}
}
/**
* 创建100条模拟数据
* @return
*/
public static List<String[]>buildDatas() {
List<String[]>datas = new ArrayList<String[]>()
for (int i=0i<100i++) {
String[] data = {"id " + i, "name " + i}
datas.add(data)
}
return datas
}
/**
* 启动线程进行数据插入 *** 作
* @param tenDatas
*/
public static void insertData(final List<String[]>tenDatas) {
new Thread(new Runnable() {
public void run() {
String sql = "insert into testtable (id, name) values (?, ?)"
Connection conn = null
PreparedStatement pstmt = null
try {
conn = getConnection()
conn.setAutoCommit(false)
pstmt = getPstmt(conn, sql)
for (String[] data : tenDatas) {
pstmt.setString(1, data[0])
pstmt.setString(2, data[1])
pstmt.addBatch()
}
pstmt.executeBatch()
conn.commit()
conn.setAutoCommit(true)
} catch (SQLException e) {
e.printStackTrace()
rollback(conn)
} catch (ClassNotFoundException e) {
e.printStackTrace()
} finally {
close(pstmt)
close(conn)
}
}
}).start()
}
public static Connection getConnection() throws SQLException, ClassNotFoundException {
Class.forName("com.mysql.jdbc.Driver")
String dbUrl = "jdbc:mysql://localhost/test?useUnicode=true&characterEncoding=UTF-8"
Connection conn = DriverManager.getConnection(dbUrl, "root", "tooeasy")
return conn
}
public static PreparedStatement getPstmt(Connection conn, String sql) throws SQLException, ClassNotFoundException {
PreparedStatement pstmt = conn.prepareStatement(sql)
return pstmt
}
public static void rollback(Connection conn) {
try {
if (null != conn) {
conn.rollback()
}
} catch (SQLException e) {
e.printStackTrace()
}
}
public static void close(Connection conn) {
try {
if (null != conn) {
conn.close()
}
} catch (SQLException e) {
e.printStackTrace()
}
}
public static void close(PreparedStatement pstmt) {
try {
if (null != pstmt) {
pstmt.close()
}
} catch (SQLException e) {
e.printStackTrace()
}
}
public static void close(ResultSet rs) {
try {
if (null != rs) {
rs.close()
}
} catch (SQLException e) {
e.printStackTrace()
}
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)