本文实例为大家分享了Python每天迁移MysqL历史数据到历史库的具体代码,供大家参考,具体内容如下
#!/usr/bin/env python # Coding:utf-8 __author__ = 'John' import MysqLdb import sys import datetime import time class ClassMigrate(object): def _get_argv(self): self.usage = """ usage(): python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \ --dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \ --delete_strategy=delete --primary_key=auto_ID --date_col=ut --time_interval=180 """ if len(sys.argv) == 1: print self.usage sys.exit(1) elif sys.argv[1] == '--help' or sys.argv[1] == '-h': print self.usage sys.exit() elif len(sys.argv) > 2: for i in sys.argv[1:]: _argv = i.split('=') if _argv[0] == '--source': _List = _argv[1].split('/') self.source_host = _List[0].split(':')[0] self.source_port = int(_List[0].split(':')[1]) self.source_db = _List[1].split(':')[0] self.source_tab = _List[1].split(':')[1] self.source_user = _List[2] self.source_password = _List[3] elif _argv[0] == '--dest': _List = _argv[1].split('/') self.dest_host = _List[0].split(':')[0] self.dest_port = int(_List[0].split(':')[1]) self.dest_db = _List[1].split(':')[0] self.dest_tab = _List[1].split(':')[1] self.dest_user = _List[2] self.dest_password = _List[3] elif _argv[0] == '--delete_strategy': self.deleteStrategy = _argv[1] if self.deleteStrategy not in ('delete','drop'): print (self.usage) sys.exit(1) elif _argv[0] == '--primary_key': self.pk = _argv[1] elif _argv[0] == '--date_col': self.date_col = _argv[1] elif _argv[0] == '--time_interval': self.interval = _argv[1] else: print (self.usage) sys.exit(1) def __init__(self): self._get_argv() ## -------------------------------------------------------------------- self.sourcedb_conn_str = MysqLdb.connect(host=self.source_host,port=self.source_port,user=self.source_user,passwd=self.source_password,db=self.source_db,charset='utf8') self.sourcedb_conn_str.autocommit(True) self.destdb_conn_str = MysqLdb.connect(host=self.dest_host,port=self.dest_port,user=self.dest_user,passwd=self.dest_password,db=self.dest_db,charset='utf8') self.destdb_conn_str.autocommit(True) ## -------------------------------------------------------------------- self.template_tab = self.source_tab + '_template' self.step_size = 20000 ## -------------------------------------------------------------------- self._migCompleteState = False self._deleteCompleteState = False ## -------------------------------------------------------------------- self.source_cnt = '' self.source_min_ID = '' self.source_max_ID = '' self.source_checksum = '' self.dest_cn = '' ## -------------------------------------------------------------------- self.today = time.strftime("%Y-%m-%d") # self.today = '2016-05-30 09:59:40' def sourcedb_query(self,sql,sql_type): try: cr = self.sourcedb_conn_str.cursor() cr.execute(sql) if sql_type == 'select': return cr.fetchall() elif sql_type == 'dml': rows = self.sourcedb_conn_str.affected_rows() return rows else: return True except Exception,e: print (str(e) + "<br>") return False finally: cr.close() def destdb_query(self,sql_type,values=''): try: cr = self.destdb_conn_str.cursor() if sql_type == 'select': cr.execute(sql) return cr.fetchall() elif sql_type == 'insertmany': cr.executemany(sql,values) rows = self.destdb_conn_str.affected_rows() return rows else: cr.execute(sql) return True except Exception,e: print (str(e) + "<br>") return False finally: cr.close() def create_table_from_source(self): '''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。 预留作其他用途。''' try: sql = "show create table %s;" % self.source_tab create_str = self.sourcedb_query(sql,'select')[0][1] create_str = create_str.replace('CREATE table','CREATE table IF NOT EXISTS') self.destdb_query(create_str,'ddl') return True except Exception,e: print (str(e) + "<br>") return False def create_table_from_template(self): try: sql = 'CREATE table IF NOT EXISTS %s like %s;' % (self.dest_tab,self.template_tab) state = self.destdb_query(sql,'ddl') if state: return True else: return False except Exception,e: print (str(e + "<br>") + "<br>") return False def get_min_max(self): """ 创建目标表、并获取源表需要迁移的总条数、最小ID、最大ID """ try: print ("\nStarting Migrate at -- %s <br>") % (datetime.datetime.Now().__str__()) sql = """select count(*),IFNulL(min(%s),-1),IFNulL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL -%s day),'%%Y-%%m-%%d'),' 00:00:00') \ and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s',' 23:59:59') """ \ % (self.pk,self.pk,self.source_tab,self.date_col,self.today,self.interval,self.interval) q = self.sourcedb_query(sql,'select') self.source_cnt = q[0][0] self.source_min_ID = q[0][1] self.source_max_ID = q[0][2] self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_ID) + '_' + str(self.source_max_ID) if self.source_cnt == 0 or self.source_min_ID == -1 or self.source_max_ID == -1: print ("There is 0 record in source table been matched! <br>") return False else: return True except Exception,e: print (str(e) + "<br>") return False def migrate_2_destdb(self): try: get_min_max_ID = self.get_min_max() if get_min_max_ID: k = self.source_min_ID desc_sql = "desc %s;" % self.source_tab # self.filed = [] cols = self.sourcedb_query(desc_sql,'select') # for j in cols: # self.filed.append(j[0]) fileds = "%s," * len(cols) # 源表有多少个字段,就拼凑多少个%s,拼接到insert语句 fileds = fileds.rstrip(',') while k <= self.source_max_ID: sql = """select * from %s where %s >= %d and %s< %d \ and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s',' 00:00:00') \ and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s',' 23:59:59') """\ % (self.source_tab,k,k+self.step_size,self.interval) print ("\n%s <br>") % sql starttime = datetime.datetime.Now() results = self.sourcedb_query(sql,'select') insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds rows = self.destdb_query(insert_sql,'insertmany',results) if rows == False: print ("Insert Failed!! <br>") else: print ("Inserted %s rows. <br>") % rows endtime = datetime.datetime.Now() timeinterval = endtime - starttime print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>") k += self.step_size print ("\nInsert complete at -- %s <br>") % (datetime.datetime.Now().__str__()) return True else: return False except Exception,e: print (str(e) + "<br>") return False def verify_total_cnt(self): try: sql = """select count(*),self.dest_tab,self.interval) dest_result = self.destdb_query(sql,'select') self.dest_cnt = dest_result[0][0] dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2]) print ("source_checksum: %s,dest_checksum: %s <br>") % (self.source_checksum,dest_checksum) if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum: self._migCompleteState = True print ("Verify successfully !!<br>") else: print ("Verify Failed !!<br>") sys.exit(77) except Exception,e: print (str(e) + "<br>") def drop_daily_partition(self): try: if self._migCompleteState: sql = """explain partitions select * from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s',' 00:00:00') and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s',self.interval) partition_name = self.sourcedb_query(sql,'select') partition_name = partition_name[0][3] sql = """select count(*),-1) from %s partition (%s)""" \ % (self.pk,partition_name) q = self.sourcedb_query(sql,'select') source_cnt = q[0][0] source_min_ID = q[0][1] source_max_ID = q[0][2] checksum = str(source_cnt) + '_' + str(source_min_ID) + '_' + str(source_max_ID) if source_cnt == 0 or source_min_ID == -1 or source_max_ID == -1: print ("There is 0 record in source PARTITION been matched! <br>") else: if checksum == self.source_checksum: drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab,partition_name) droped = self.sourcedb_query(drop_par_sql,'ddl') if droped: print (drop_par_sql + " <br>") print ("\nDrop partition complete at -- %s <br>") % (datetime.datetime.Now().__str__()) self._deleteCompleteState = True else: print (drop_par_sql + " <br>") print ("Drop partition Failed.. <br>") else: print ("The partition %s checksum Failed !! Drop Failed !!") % partition_name sys.exit(77) except Exception,e: print (str(e) + "<br>") def delete_data(self): try: if self._migCompleteState: k = self.source_min_ID while k <= self.source_max_ID: sql = """delete from %s where %s >= %d and %s< %d \ and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s',' 23:59:59') """ \ % (self.source_tab,self.interval) print ("\n%s <br>") % sql starttime = datetime.datetime.Now() rows = self.sourcedb_query(sql,'dml') if rows == False: print ("Delete Failed!! <br>") else: print ("Deleted %s rows. <br>") % rows endtime = datetime.datetime.Now() timeinterval = endtime - starttime print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>") time.sleep(1) k += self.step_size print ("\nDelete complete at -- %s <br>") % (datetime.datetime.Now().__str__()) self._deleteCompleteState = True except Exception,e: print (str(e) + "<br>") def do(self): tab_create = self.create_table_from_template() if tab_create: migration = self.migrate_2_destdb() if migration: self.verify_total_cnt() if self._migCompleteState: if self.deleteStrategy == 'drop': self.drop_daily_partition() else: self.delete_data() print ("\n<br>") print ("====="*5 + '<br>') print ("source_total_cnt: %s <br>") % self.source_cnt print ("dest_total_cnt: %s <br>") % self.dest_cnt print ("====="*5 + '<br>') if self._deleteCompleteState: print ("\nFinal result: Successfully !! <br>") sys.exit(88) else: print ("\nFinal result: Failed !! <br>") sys.exit(254) else: print ("Create table Failed ! Exiting. . .") sys.exit(255) f = ClassMigrate() f.do()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。
您可能感兴趣的文章:python实现数据库跨服务器迁移使用python和Django完成博客数据库的迁移方法Python的Flask框架中使用Flask-Migrate扩展迁移数据库的教程Python中MySQL数据迁移到MongoDB脚本的方法在Python中利用Into包整洁地进行数据迁移的教程 总结以上是内存溢出为你收集整理的每天迁移MySQL历史数据到历史库Python脚本全部内容,希望文章能够帮你解决每天迁移MySQL历史数据到历史库Python脚本所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)