sqlserver数据导入mysql五:多线程导数据脚本(读取前面拆分的表名进行数据导入)

sqlserver数据导入mysql五:多线程导数据脚本(读取前面拆分的表名进行数据导入),第1张

概述#!/usr/bin/perluse Encode;use Encode::CN;use DBI;use Switch;use strict;use Net::HandlerSocket;use threads;use Time::HiRes 'time'; my $aim_ip="192.168.0.208";my $aim_db_name = "mysqldb";
#!/usr/bin/perluse Encode;use Encode::CN;use DBI;use Switch;use strict;use Net::HandlerSocket;use threads;use Time::HiRes 'time';		my $aim_ip="192.168.0.208";my $aim_db_name = "MysqLdb";my $hs_port = 9999;my $source_name = "sqldb";my $source_user_name = "sa";my $source_user_psd = "123";		my $db_name="MysqLdb";	my $location="192.168.0.208";	my $port="3306";	my $db_user="zoe";	my $db_pass="123";my $dbh=DBI->connect("dbi:ODBC:$source_name",$source_user_name,$source_user_psd); # my $sth=$dbh->prepare("select name,object_ID from sys.all_objects ao where type='U' and not exists(# select 1 from  sys.all_columns col where col.object_ID=ao.object_ID and system_type_ID=240)");my $sth=$dbh->prepare("select name,object_ID from sys.all_objects where type='U' and is_ms_shipped=0 and name <>'sysdiagrams'"); $sth->execute();  my $threads_cnt=(not defined $ARGV[0])?10:$ARGV[0];  my $per_records=(not defined $ARGV[1])?3400:$ARGV[1]; my @data; my $datacount; my $n=0;   my $ok=0;    my $geo=0;  #print "请输入数字确认运行第几份表的 *** 作"; #my $var=0;#$var=<STDIN>; #chop ($var);  my $var=$ARGV[2]; my $openname="alltablename_exportname_"."$var"."\.txt";my $losetxtname="alltablename_loseprimary_"."$var"."\.txt";my $bigtxtname="alltablename_bigcount_"."$var"."\.txt";my $geotxtname='alltablename_geo_'.$var.'.txt';my $okname="alltablename_ok_"."$var"."\.txt";my $logname="alltablename_errorlog_"."$var"."\.txt";my $repairname="alltablename_repair_"."$var"."\.txt";my $linename="alltablename_line_"."$var"."\.txt"; open(file,">>$losetxtname");   syswrite(file,"缺少主键的表有:\n");  close(file);  open(file,">>$bigtxtname");   syswrite(file,"超过千万条的表有:\n");  close(file); my($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst);my $format_time;my @a;open IN,"<","$openname" or dIE "IN: $!"; while (<IN>) {    chomp;     if(defined($_ ))  { push @a,$_;   } }close IN;   while (@data=$sth->fetchrow_array()) {$datacount=0; $ok=0; $geo=0;  my ($select_columns,$insert_columns,$column_count,$sort_column,$column_types);   ($select_columns,$column_types)=get_columns($data[0],$data[1]);    $n+=1;    if($ok ==1){       print '该表有主键'."\n";       if($geo ==1){       if($data[0]~~@a){ print '该表有地理值'."\n"; open(file,">>$geotxtname");  syswrite(file,"$n\n"); 	 syswrite(file,"$data[0]\n");  ($sec,$isdst) = localtime(time());  $format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$sec); syswrite(file,$format_time."\n");    close(file); my $relt = export_data_in ($select_columns,$data[0],$column_types);		 open(file,">>$okname");  syswrite(file,$n."\n"); 	 syswrite(file,$data[0]."\n");  syswrite(file,$datacount."\n"); ($sec,$format_time."\n");  close(file); 	 open(file,">>$linename");  	 syswrite(file,'数量:'.$datacount."\n"); syswrite(file,'完成'.$data[0].'复制'."\n"); 	 syswrite(file,$format_time."\n");  close(file); }}else{   if($data[0]~~@a){ print '该表在表名单内'."\n";	 open(file,">>$linename");  syswrite(file,$data[0].'开始复制表'."\n"); 	 ($sec,$format_time."\n");  close(file);  my $relt = export_data_in ($select_columns,$format_time."\n");  close(file); }  else{print '该表不在表名单内'."\n";}}}else{if($data[0]~~@a){ print '该表无主键'."\n"; open(file,">>$losetxtname");  syswrite(file,$format_time."\n");    close(file); }}	 }	 $sth->finish;	 $dbh->disconnect;	 print '所有表的入库执行完毕!!!!'."\n";	 	 	 sub export_data_in	 {	      my($select_columns,$columns_count,$table_name,$column_types) = @_;		   print '开始读取MysqL中的数量------------'."\n";	   	my $data_base = "DBI:MysqL:$aim_db_name:$aim_ip:$port";	my $dbhMysqL=DBI -> connect($data_base,$db_user,$db_pass);	$dbhMysqL->do("SET character_set_clIEnt = 'utf8'");   $dbhMysqL->do("SET character_set_connection = 'utf8'");		  my $MysqL="select count(1),max($sort_column) from $aim_db_name\.$table_name";    print "$MysqL\n";    #print "执行语句$MysqL\n";	my $MysqLsth=$dbhMysqL->prepare($MysqL);		$MysqLsth->execute() or dIE "ERROR::$_[0]::$MysqLsth->errstr";		     my @data_count1=$MysqLsth->fetchrow_array();		 	my	$MysqLcount=@data_count1[0];			my $mymaxID=@data_count1[1];	$MysqLsth->finish;	$dbhMysqL->disconnect;		print "MysqL中已有数量:$MysqLcount\n目前ID:$mymaxID\n";			$sort_column="[$sort_column]";		  #构建sql       	 print '开始读取sqlserver中的数量------------'."\n";         my $rows_count=0;         my $dbh2=DBI->connect("dbi:ODBC:$source_name",$source_user_psd);         my $sth_sc=$dbh2->prepare("select count(1),min($sort_column),max($sort_column) from $table_name");         $sth_sc->execute();          my @data_count=$sth_sc->fetchrow_array();		  $datacount=@data_count[0];		my  $minID=@data_count[1];		my   $maxID=@data_count[2];		  	  print 'sqlserver中数量为:'. $datacount."\n";		  if($datacount>=10000000)		  {		  		   open(file,">>$bigtxtname");   		   syswrite(file,"$n\n");syswrite(file,"$table_name\n");  close(file);		  }		 		  		 		  if($MysqLcount==0)		  {		  $mymaxID=$minID;		  }		  else		  {		  $mymaxID=$mymaxID+1;		  		  }		  		  if($MysqLcount<$datacount)		  {          my $begin_cnt = $mymaxID;          ##这里不-1,会报21         my $end_cnt =$begin_cnt+$per_records ;		 	my $thread;    while($begin_cnt-1-$per_records < $maxID){     if($datacount<$per_records)  {    my $sql_select="SELECT $select_columns  FROM $table_name  where $sort_column BETWEEN $begin_cnt and $end_cnt";						                         ##开线程。参数请参照上边的描述             export_data($table_name,$sql_select,$column_types,$datacount,$maxID,$begin_cnt,$end_cnt);                         $begin_cnt = $begin_cnt + $per_records+1;                         $end_cnt = $end_cnt + $per_records+1;    }  else  {             while(scalar(threads->List())<$threads_cnt)    {  				  my $sql_select="SELECT $select_columns  FROM $table_name  where $sort_column BETWEEN $begin_cnt and $end_cnt";						                         ##开线程。参数请参照上边的描述                 threads->new(\&export_data,$end_cnt);                         $begin_cnt = $begin_cnt + $per_records+1;                         $end_cnt = $end_cnt + $per_records+1;    }    foreach $thread(threads->List(threads::all))    {    if($thread->is_joinable())        {    $thread->join();        }    }	}}foreach $thread(threads->List(threads::all)){    $thread->join();}		 }		$sth_sc->finish;     $dbh2->disconnect;	 }  sub export_data { #print "进入export_data函数\n"; my ($table_name,$end_cnt);   my $insert_sql;   eval{         my $startTime=time;          ($table_name,$end_cnt)=@_;				 ($sec,$sec);                          print 'exporting data '.$table_name.'; total:'.$datacount.'; maxID:'.$maxID.'; NowID:'.$begin_cnt.' to '.$end_cnt."\n";						 print  $format_time."\n";		      	 print '开始读取sqlserver数据------------'."\n";         my $dbh_mssql=DBI->connect("dbi:ODBC:$source_name",$source_user_psd,{raiseerror =>1});		  	   $dbh_mssql->{LongTruncOk}=1;               $dbh_mssql->{LongReadLen}=1048576;          my $sth_select=$dbh_mssql->prepare($sql_select);		 		 # open(file,">>all_export_data222.txt");    # syswrite(file,"$sql_select\n");    # close(file); 		         $sth_select->execute() or dIE "Cannot execute: ". $sth_select->errstr();               my $data_str="";	        my @select_col;        	   my $select_data;         #while($select_data=$sth_select->fetchrow_arrayref())		        #{				            #    if($data_str ne "")            #  {                   #    $data_str="$data_str,";              #  }               #   $data_str=$data_str."[$gID,'+',['".join("','",@{$select_data})."']]";				             # }q		  my $nn=0;		  while($select_data=$sth_select->fetchrow_arrayref())		   {	              $select_col[$nn]=[@$select_data];  				   $nn++;           }		      printf("读出时间%.1f seconds.\n",time-$startTime);			  			    my $col=@select_col;			  			  print '提出行数为:'.$col."\n";			  			   if($col !=0)			  {			  			   	 print '开始组合字符串------------'."\n";			  	   my $startTime=time;			  my @col_data;			  foreach my  $aref (@select_col) 			  {			 @col_data=@{$aref};			 #print @col_data;			 		  if($data_str ne '')        {            $data_str.=',';        }        my $data_col="";     			 					 			   for(my $nnn=0;$nnn<@col_data;$nnn++)		  {		   				           		            if ($data_col ne "")            {                $data_col.=",";            }                    if(@$column_types[$nnn] eq "geometry")            {                $data_col.=" GeomFromText('@col_data[$nnn]',4326)";            }			elsif(@$column_types[$nnn] eq "int")			{   			   if(@col_data[$nnn]>4200000000)			   {			   my $value=4294967295-@col_data[$nnn]+1;			    $data_col.='-'."$value";			   }			   else			   {			   $data_col.="'@col_data[$nnn]'";			   			   }									}			elsif(@$column_types[$nnn] eq "date")			{   			   if(@col_data[$nnn] eq '1900-01-01')			   {			  $data_col.="'1000-01-01'";			   }			   else			   {			   $data_col.="'@col_data[$nnn]'";			   			   }									}				elsif(@$column_types[$nnn] eq "datetime")			{   			   if(@col_data[$nnn] eq '1900-01-01 00:00:00')			   {			  $data_col.="'1000-01-01 00:00:00'";			   }			   else			   {			   $data_col.="'@col_data[$nnn]'";			   			   }									}            else             {                $data_col.="'@col_data[$nnn]'";            }                     }        #构建插入的时候的值字符串        $data_str.="($data_col)\n";										 } 				  			  		   				 		 #open(file,">>all_export_data.txt");    #syswrite(file,"$data_str\n");    #close(file); 			         printf("组合字符串时间%.1f seconds.\n",time-$startTime);$sth_select->finish;$dbh_mssql->disconnect;      # open(file,">>all_export_data_fre.txt");    # syswrite(file,"$data_str\n");    # close(file);      $data_str=encode("utf8",decode("gbk",$data_str));           ##测试的时候,查看数据的语句。     # print "\n",$data_str,"\n";  $startTime=time;         if($data_str ne "")         {		    	 print '开始写入MysqL------------'."\n";           	my $data_base = "DBI:MysqL:$aim_db_name:$aim_ip:$port";        my $dbh_MysqL=DBI -> connect($data_base,$db_pass);					$dbh_MysqL->do("SET character_set_clIEnt = 'utf8'");   $dbh_MysqL->do("SET character_set_connection = 'utf8'");        #插入的SQL语句		  $data_str=~s/\//g;       $insert_sql = 'INSERT '.$table_name.'('.$insert_columns.') values '.$data_str .';';	   	   # open(file,">>all_export_data_insert.txt");    # syswrite(file,"$insert_sql\n");    # close(file); 	   	           my $sth_MysqL=$dbh_MysqL->prepare($insert_sql);                $sth_MysqL->execute() or dIE $dbh_MysqL->errstr()."\n" ;		 $sth_MysqL->finish();        $dbh_MysqL->disconnect;    }                    undef $data_str;         printf("写入时间%.1f seconds.\n",time-$startTime);		 }		 		 		 else		 {		  undef $data_str;		  printf('数目为0,不执行'."\n",time-$startTime);		 }		 };		 		 				  print "An error occurred: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"  if  $@;if($@){ open(file,">>$logname");  syswrite(file,"$table_name\n");    syswrite(file,"$sql_select\n");  # syswrite(file,"$insert_sql\n");  syswrite(file,"$@\n"); ($sec,$format_time."\n");    close(file);  open(file,">>$repairname");  syswrite(file,"$table_name\:\:$sql_select\:\:");    close(file); }print "An error occurred: $@"  if  $@;} sub get_columns {     ($sec,$sec);          print "loading columns of $_[0] \n";		  print   "$format_time\n";         my $sql="select col.name,tp.name,col.[is_IDentity] from sys.all_columns col                                         inner join sys.types tp on col.system_type_ID=tp.system_type_ID  and col.user_type_ID=tp.user_type_ID                                         where object_ID=$_[1]";         my $dbh2=DBI->connect("dbi:ODBC:$source_name",$source_user_psd);         my $cols=$dbh2 -> prepare($sql);         $cols->execute();         my $cols_insert = "";         my $cols_select = "";        my $cols_count = 0;         my $sort_column="";         my @cols_types;         my @col;         while(@col= $cols->fetchrow_array())        {                 my ($col_name,$type_name,$is_IDentity)=@col;                 @cols_types[$cols_count]=$type_name;                 if($cols_count>0)                 {                         $cols_insert="$cols_insert,";                         $cols_select="$cols_select,";                 }                 #else                 #{                         #$sort_column="[$col_name]";                 #}              if($type_name eq "hIErarchyID")                 {                         $cols_select = "$cols_select [$col_name].ToString() as [$col_name]";                         $cols_insert = "$cols_insert$col_name";                 }				  elsif($type_name eq "nvarchar")				 {          					 $cols_select.="CAST((ISNulL(replace(replace(replace(replace([$col_name],'',''),' ',char(10),char(13),'')) as TEXT) as [$col_name]";                        $cols_insert = "$cols_insert$col_name";				 				 } 				 		  elsif($type_name eq "varchar")				 {          					 $cols_select.="CAST((ISNulL(replace(replace(replace(replace([$col_name],'')) as TEXT) as [$col_name]";                        $cols_insert = "$cols_insert$col_name";				 				 } 			                   elsif ($type_name eq "int")                 {                         $cols_select="$cols_select ISNulL([$col_name],0) as [$col_name]";                        $cols_insert = "$cols_insert$col_name";                } 				elsif ($type_name eq "numeric")                 {                         $cols_select="$cols_select ISNulL([$col_name],0) as [$col_name]";                        $cols_insert = "$cols_insert$col_name";                }				elsif ($type_name eq "bit")                 {                         $cols_select="$cols_select ISNulL([$col_name],0) as [$col_name]";                        $cols_insert = "$cols_insert$col_name";                }							elsif ($type_name eq "money")                 {                         $cols_select="$cols_select ISNulL([$col_name],0) as [$col_name]";                        $cols_insert = "$cols_insert$col_name";                }		             elsif ($type_name eq "datetime")                 {				                              $cols_select="$cols_select ISNulL(CONVERT(VARCHAR(24),[$col_name],20),'1900-01-01 00:00:00') as [$col_name]";                        $cols_insert = "$cols_insert$col_name";                }	             elsif ($type_name eq "date")                 {                         $cols_select="$cols_select ISNulL([$col_name],'1900-01-01') as [$col_name]";                        $cols_insert = "$cols_insert$col_name";                }										     elsif($type_name eq "geometry")        {		# $geo=1;          $cols_select = "$cols_select isnull([$col_name],'POINT (0 0)').STAsText() as $col_name";            $cols_insert = "$cols_insert `$col_name`";        }				else                  {                         $cols_select="$cols_select [$col_name]";                        $cols_insert = "$cols_insert$col_name";                }				if($is_IDentity == 1)			{			$ok=1;			$sort_column="$col_name";			}                 $cols_count++;         }		 $cols->finish;         $dbh2->disconnect;         ($cols_select,$cols_insert,$cols_count,\@cols_types); }
总结

以上是内存溢出为你收集整理的sqlserver数据导入mysql五:多线程导数脚本(读取前面拆分的表名进行数据导入)全部内容,希望文章能够帮你解决sqlserver数据导入mysql五:多线程导数据脚本(读取前面拆分的表名进行数据导入)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/1170224.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-02
下一篇 2022-06-02

发表评论

登录后才能评论

评论列表(0条)

保存