#!/usr/bin/perluse Encode;use Encode::CN;use DBI;use Switch;use strict;use Net::HandlerSocket;use threads;use Time::HiRes 'time'; my $aim_ip="192.168.0.208";my $aim_db_name = "MysqLdb";my $hs_port = 9999;my $source_name = "sqldb";my $source_user_name = "sa";my $source_user_psd = "123"; my $db_name="MysqLdb"; my $location="192.168.0.208"; my $port="3306"; my $db_user="zoe"; my $db_pass="123";my $dbh=DBI->connect("dbi:ODBC:$source_name",$source_user_name,$source_user_psd); # my $sth=$dbh->prepare("select name,object_ID from sys.all_objects ao where type='U' and not exists(# select 1 from sys.all_columns col where col.object_ID=ao.object_ID and system_type_ID=240)");my $sth=$dbh->prepare("select name,object_ID from sys.all_objects where type='U' and is_ms_shipped=0 and name <>'sysdiagrams'"); $sth->execute(); my $threads_cnt=(not defined $ARGV[0])?10:$ARGV[0]; my $per_records=(not defined $ARGV[1])?3400:$ARGV[1]; my @data; my $datacount; my $n=0; my $ok=0; my $geo=0; #print "请输入数字确认运行第几份表的 *** 作"; #my $var=0;#$var=<STDIN>; #chop ($var); my $var=$ARGV[2]; my $openname="alltablename_exportname_"."$var"."\.txt";my $losetxtname="alltablename_loseprimary_"."$var"."\.txt";my $bigtxtname="alltablename_bigcount_"."$var"."\.txt";my $geotxtname='alltablename_geo_'.$var.'.txt';my $okname="alltablename_ok_"."$var"."\.txt";my $logname="alltablename_errorlog_"."$var"."\.txt";my $repairname="alltablename_repair_"."$var"."\.txt";my $linename="alltablename_line_"."$var"."\.txt"; open(file,">>$losetxtname"); syswrite(file,"缺少主键的表有:\n"); close(file); open(file,">>$bigtxtname"); syswrite(file,"超过千万条的表有:\n"); close(file); my($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst);my $format_time;my @a;open IN,"<","$openname" or dIE "IN: $!"; while (<IN>) { chomp; if(defined($_ )) { push @a,$_; } }close IN; while (@data=$sth->fetchrow_array()) {$datacount=0; $ok=0; $geo=0; my ($select_columns,$insert_columns,$column_count,$sort_column,$column_types); ($select_columns,$column_types)=get_columns($data[0],$data[1]); $n+=1; if($ok ==1){ print '该表有主键'."\n"; if($geo ==1){ if($data[0]~~@a){ print '该表有地理值'."\n"; open(file,">>$geotxtname"); syswrite(file,"$n\n"); syswrite(file,"$data[0]\n"); ($sec,$isdst) = localtime(time()); $format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$sec); syswrite(file,$format_time."\n"); close(file); my $relt = export_data_in ($select_columns,$data[0],$column_types); open(file,">>$okname"); syswrite(file,$n."\n"); syswrite(file,$data[0]."\n"); syswrite(file,$datacount."\n"); ($sec,$format_time."\n"); close(file); open(file,">>$linename"); syswrite(file,'数量:'.$datacount."\n"); syswrite(file,'完成'.$data[0].'复制'."\n"); syswrite(file,$format_time."\n"); close(file); }}else{ if($data[0]~~@a){ print '该表在表名单内'."\n"; open(file,">>$linename"); syswrite(file,$data[0].'开始复制表'."\n"); ($sec,$format_time."\n"); close(file); my $relt = export_data_in ($select_columns,$format_time."\n"); close(file); } else{print '该表不在表名单内'."\n";}}}else{if($data[0]~~@a){ print '该表无主键'."\n"; open(file,">>$losetxtname"); syswrite(file,$format_time."\n"); close(file); }} } $sth->finish; $dbh->disconnect; print '所有表的入库执行完毕!!!!'."\n"; sub export_data_in { my($select_columns,$columns_count,$table_name,$column_types) = @_; print '开始读取MysqL中的数量------------'."\n"; my $data_base = "DBI:MysqL:$aim_db_name:$aim_ip:$port"; my $dbhMysqL=DBI -> connect($data_base,$db_user,$db_pass); $dbhMysqL->do("SET character_set_clIEnt = 'utf8'"); $dbhMysqL->do("SET character_set_connection = 'utf8'"); my $MysqL="select count(1),max($sort_column) from $aim_db_name\.$table_name"; print "$MysqL\n"; #print "执行语句$MysqL\n"; my $MysqLsth=$dbhMysqL->prepare($MysqL); $MysqLsth->execute() or dIE "ERROR::$_[0]::$MysqLsth->errstr"; my @data_count1=$MysqLsth->fetchrow_array(); my $MysqLcount=@data_count1[0]; my $mymaxID=@data_count1[1]; $MysqLsth->finish; $dbhMysqL->disconnect; print "MysqL中已有数量:$MysqLcount\n目前ID:$mymaxID\n"; $sort_column="[$sort_column]"; #构建sql print '开始读取sqlserver中的数量------------'."\n"; my $rows_count=0; my $dbh2=DBI->connect("dbi:ODBC:$source_name",$source_user_psd); my $sth_sc=$dbh2->prepare("select count(1),min($sort_column),max($sort_column) from $table_name"); $sth_sc->execute(); my @data_count=$sth_sc->fetchrow_array(); $datacount=@data_count[0]; my $minID=@data_count[1]; my $maxID=@data_count[2]; print 'sqlserver中数量为:'. $datacount."\n"; if($datacount>=10000000) { open(file,">>$bigtxtname"); syswrite(file,"$n\n");syswrite(file,"$table_name\n"); close(file); } if($MysqLcount==0) { $mymaxID=$minID; } else { $mymaxID=$mymaxID+1; } if($MysqLcount<$datacount) { my $begin_cnt = $mymaxID; ##这里不-1,会报21 my $end_cnt =$begin_cnt+$per_records ; my $thread; while($begin_cnt-1-$per_records < $maxID){ if($datacount<$per_records) { my $sql_select="SELECT $select_columns FROM $table_name where $sort_column BETWEEN $begin_cnt and $end_cnt"; ##开线程。参数请参照上边的描述 export_data($table_name,$sql_select,$column_types,$datacount,$maxID,$begin_cnt,$end_cnt); $begin_cnt = $begin_cnt + $per_records+1; $end_cnt = $end_cnt + $per_records+1; } else { while(scalar(threads->List())<$threads_cnt) { my $sql_select="SELECT $select_columns FROM $table_name where $sort_column BETWEEN $begin_cnt and $end_cnt"; ##开线程。参数请参照上边的描述 threads->new(\&export_data,$end_cnt); $begin_cnt = $begin_cnt + $per_records+1; $end_cnt = $end_cnt + $per_records+1; } foreach $thread(threads->List(threads::all)) { if($thread->is_joinable()) { $thread->join(); } } }}foreach $thread(threads->List(threads::all)){ $thread->join();} } $sth_sc->finish; $dbh2->disconnect; } sub export_data { #print "进入export_data函数\n"; my ($table_name,$end_cnt); my $insert_sql; eval{ my $startTime=time; ($table_name,$end_cnt)=@_; ($sec,$sec); print 'exporting data '.$table_name.'; total:'.$datacount.'; maxID:'.$maxID.'; NowID:'.$begin_cnt.' to '.$end_cnt."\n"; print $format_time."\n"; print '开始读取sqlserver数据------------'."\n"; my $dbh_mssql=DBI->connect("dbi:ODBC:$source_name",$source_user_psd,{raiseerror =>1}); $dbh_mssql->{LongTruncOk}=1; $dbh_mssql->{LongReadLen}=1048576; my $sth_select=$dbh_mssql->prepare($sql_select); # open(file,">>all_export_data222.txt"); # syswrite(file,"$sql_select\n"); # close(file); $sth_select->execute() or dIE "Cannot execute: ". $sth_select->errstr(); my $data_str=""; my @select_col; my $select_data; #while($select_data=$sth_select->fetchrow_arrayref()) #{ # if($data_str ne "") # { # $data_str="$data_str,"; # } # $data_str=$data_str."[$gID,'+',['".join("','",@{$select_data})."']]"; # }q my $nn=0; while($select_data=$sth_select->fetchrow_arrayref()) { $select_col[$nn]=[@$select_data]; $nn++; } printf("读出时间%.1f seconds.\n",time-$startTime); my $col=@select_col; print '提出行数为:'.$col."\n"; if($col !=0) { print '开始组合字符串------------'."\n"; my $startTime=time; my @col_data; foreach my $aref (@select_col) { @col_data=@{$aref}; #print @col_data; if($data_str ne '') { $data_str.=','; } my $data_col=""; for(my $nnn=0;$nnn<@col_data;$nnn++) { if ($data_col ne "") { $data_col.=","; } if(@$column_types[$nnn] eq "geometry") { $data_col.=" GeomFromText('@col_data[$nnn]',4326)"; } elsif(@$column_types[$nnn] eq "int") { if(@col_data[$nnn]>4200000000) { my $value=4294967295-@col_data[$nnn]+1; $data_col.='-'."$value"; } else { $data_col.="'@col_data[$nnn]'"; } } elsif(@$column_types[$nnn] eq "date") { if(@col_data[$nnn] eq '1900-01-01') { $data_col.="'1000-01-01'"; } else { $data_col.="'@col_data[$nnn]'"; } } elsif(@$column_types[$nnn] eq "datetime") { if(@col_data[$nnn] eq '1900-01-01 00:00:00') { $data_col.="'1000-01-01 00:00:00'"; } else { $data_col.="'@col_data[$nnn]'"; } } else { $data_col.="'@col_data[$nnn]'"; } } #构建插入的时候的值字符串 $data_str.="($data_col)\n"; } #open(file,">>all_export_data.txt"); #syswrite(file,"$data_str\n"); #close(file); printf("组合字符串时间%.1f seconds.\n",time-$startTime);$sth_select->finish;$dbh_mssql->disconnect; # open(file,">>all_export_data_fre.txt"); # syswrite(file,"$data_str\n"); # close(file); $data_str=encode("utf8",decode("gbk",$data_str)); ##测试的时候,查看数据的语句。 # print "\n",$data_str,"\n"; $startTime=time; if($data_str ne "") { print '开始写入MysqL------------'."\n"; my $data_base = "DBI:MysqL:$aim_db_name:$aim_ip:$port"; my $dbh_MysqL=DBI -> connect($data_base,$db_pass); $dbh_MysqL->do("SET character_set_clIEnt = 'utf8'"); $dbh_MysqL->do("SET character_set_connection = 'utf8'"); #插入的SQL语句 $data_str=~s/\//g; $insert_sql = 'INSERT '.$table_name.'('.$insert_columns.') values '.$data_str .';'; # open(file,">>all_export_data_insert.txt"); # syswrite(file,"$insert_sql\n"); # close(file); my $sth_MysqL=$dbh_MysqL->prepare($insert_sql); $sth_MysqL->execute() or dIE $dbh_MysqL->errstr()."\n" ; $sth_MysqL->finish(); $dbh_MysqL->disconnect; } undef $data_str; printf("写入时间%.1f seconds.\n",time-$startTime); } else { undef $data_str; printf('数目为0,不执行'."\n",time-$startTime); } }; print "An error occurred: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" if $@;if($@){ open(file,">>$logname"); syswrite(file,"$table_name\n"); syswrite(file,"$sql_select\n"); # syswrite(file,"$insert_sql\n"); syswrite(file,"$@\n"); ($sec,$format_time."\n"); close(file); open(file,">>$repairname"); syswrite(file,"$table_name\:\:$sql_select\:\:"); close(file); }print "An error occurred: $@" if $@;} sub get_columns { ($sec,$sec); print "loading columns of $_[0] \n"; print "$format_time\n"; my $sql="select col.name,tp.name,col.[is_IDentity] from sys.all_columns col inner join sys.types tp on col.system_type_ID=tp.system_type_ID and col.user_type_ID=tp.user_type_ID where object_ID=$_[1]"; my $dbh2=DBI->connect("dbi:ODBC:$source_name",$source_user_psd); my $cols=$dbh2 -> prepare($sql); $cols->execute(); my $cols_insert = ""; my $cols_select = ""; my $cols_count = 0; my $sort_column=""; my @cols_types; my @col; while(@col= $cols->fetchrow_array()) { my ($col_name,$type_name,$is_IDentity)=@col; @cols_types[$cols_count]=$type_name; if($cols_count>0) { $cols_insert="$cols_insert,"; $cols_select="$cols_select,"; } #else #{ #$sort_column="[$col_name]"; #} if($type_name eq "hIErarchyID") { $cols_select = "$cols_select [$col_name].ToString() as [$col_name]"; $cols_insert = "$cols_insert$col_name"; } elsif($type_name eq "nvarchar") { $cols_select.="CAST((ISNulL(replace(replace(replace(replace([$col_name],'',''),' ',char(10),char(13),'')) as TEXT) as [$col_name]"; $cols_insert = "$cols_insert$col_name"; } elsif($type_name eq "varchar") { $cols_select.="CAST((ISNulL(replace(replace(replace(replace([$col_name],'')) as TEXT) as [$col_name]"; $cols_insert = "$cols_insert$col_name"; } elsif ($type_name eq "int") { $cols_select="$cols_select ISNulL([$col_name],0) as [$col_name]"; $cols_insert = "$cols_insert$col_name"; } elsif ($type_name eq "numeric") { $cols_select="$cols_select ISNulL([$col_name],0) as [$col_name]"; $cols_insert = "$cols_insert$col_name"; } elsif ($type_name eq "bit") { $cols_select="$cols_select ISNulL([$col_name],0) as [$col_name]"; $cols_insert = "$cols_insert$col_name"; } elsif ($type_name eq "money") { $cols_select="$cols_select ISNulL([$col_name],0) as [$col_name]"; $cols_insert = "$cols_insert$col_name"; } elsif ($type_name eq "datetime") { $cols_select="$cols_select ISNulL(CONVERT(VARCHAR(24),[$col_name],20),'1900-01-01 00:00:00') as [$col_name]"; $cols_insert = "$cols_insert$col_name"; } elsif ($type_name eq "date") { $cols_select="$cols_select ISNulL([$col_name],'1900-01-01') as [$col_name]"; $cols_insert = "$cols_insert$col_name"; } elsif($type_name eq "geometry") { # $geo=1; $cols_select = "$cols_select isnull([$col_name],'POINT (0 0)').STAsText() as $col_name"; $cols_insert = "$cols_insert `$col_name`"; } else { $cols_select="$cols_select [$col_name]"; $cols_insert = "$cols_insert$col_name"; } if($is_IDentity == 1) { $ok=1; $sort_column="$col_name"; } $cols_count++; } $cols->finish; $dbh2->disconnect; ($cols_select,$cols_insert,$cols_count,\@cols_types); }总结
以上是内存溢出为你收集整理的sqlserver数据导入mysql五:多线程导数据脚本(读取前面拆分的表名进行数据导入)全部内容,希望文章能够帮你解决sqlserver数据导入mysql五:多线程导数据脚本(读取前面拆分的表名进行数据导入)所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)