每天迁移MySQL历史数据到历史库Python脚本

每天迁移MySQL历史数据到历史库Python脚本,第1张

每天迁移MySQL历史数据到历史库Python脚本

本文实例为大家分享了Python每天迁移MySQL历史数据到历史库的具体代码,供大家参考,具体内容如下

#!/usr/bin/env python 
# coding:utf-8 
__author__ = 'John' 
 

import MySQLdb 
import sys 
import datetime 
import time 
 
class ClassMigrate(object): 
  def _get_argv(self): 
    self.usage = """ 
      usage(): 
      python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \ 
      --dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \ 
      --delete_strategy=delete --primary_key=auto_id --date_col=ut --time_interval=180 
      """ 
    if len(sys.argv) == 1: 
      print self.usage 
      sys.exit(1) 
    elif sys.argv[1] == '--help' or sys.argv[1] == '-h': 
 print self.usage 
 sys.exit() 
    elif len(sys.argv) > 2: 
      for i in sys.argv[1:]: 
 _argv = i.split('=') 
 if _argv[0] == '--source': 
   _list = _argv[1].split('/') 
   self.source_host = _list[0].split(':')[0] 
   self.source_port = int(_list[0].split(':')[1]) 
   self.source_db = _list[1].split(':')[0] 
   self.source_tab = _list[1].split(':')[1] 
   self.source_user = _list[2] 
   self.source_password = _list[3] 
 elif _argv[0] == '--dest': 
   _list = _argv[1].split('/') 
   self.dest_host = _list[0].split(':')[0] 
   self.dest_port = int(_list[0].split(':')[1]) 
   self.dest_db = _list[1].split(':')[0] 
   self.dest_tab = _list[1].split(':')[1] 
   self.dest_user = _list[2] 
   self.dest_password = _list[3] 
 elif _argv[0] == '--delete_strategy': 
   self.deleteStrategy = _argv[1] 
   if self.deleteStrategy not in ('delete', 'drop'): 
     print (self.usage) 
     sys.exit(1) 
 elif _argv[0] == '--primary_key': 
   self.pk = _argv[1] 
 elif _argv[0] == '--date_col': 
   self.date_col = _argv[1] 
 elif _argv[0] == '--time_interval': 
   self.interval = _argv[1] 
 else: 
   print (self.usage) 
   sys.exit(1) 
 
  def __init__(self): 
    self._get_argv() 
## -------------------------------------------------------------------- 
    self.sourcedb_conn_str = MySQLdb.connect(host=self.source_host, port=self.source_port, user=self.source_user, passwd=self.source_password, db=self.source_db, charset='utf8') 
    self.sourcedb_conn_str.autocommit(True) 
    self.destdb_conn_str = MySQLdb.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user, passwd=self.dest_password, db=self.dest_db, charset='utf8') 
    self.destdb_conn_str.autocommit(True) 
## -------------------------------------------------------------------- 
    self.template_tab = self.source_tab + '_template' 
    self.step_size = 20000 
## -------------------------------------------------------------------- 
    self._migCompleteState = False 
    self._deleteCompleteState = False 
## -------------------------------------------------------------------- 
    self.source_cnt = '' 
    self.source_min_id = '' 
    self.source_max_id = '' 
    self.source_checksum = '' 
    self.dest_cn = '' 
## -------------------------------------------------------------------- 
    self.today = time.strftime("%Y-%m-%d") 
    # self.today = '2016-05-30 09:59:40' 

def sourcedb_query(self, sql, sql_type): 
    try: 
      cr = self.sourcedb_conn_str.cursor() 
      cr.execute(sql) 
      if sql_type == 'select': 
 return cr.fetchall() 
      elif sql_type == 'dml': 
 rows = self.sourcedb_conn_str.affected_rows() 
 return rows 
      else: 
 return True 
    except Exception, e: 
      print (str(e) + "
") return False finally: cr.close() def destdb_query(self, sql, sql_type, values=''): try: cr = self.destdb_conn_str.cursor() if sql_type == 'select': cr.execute(sql) return cr.fetchall() elif sql_type == 'insertmany': cr.executemany(sql, values) rows = self.destdb_conn_str.affected_rows() return rows else: cr.execute(sql) return True except Exception, e: print (str(e) + "
") return False finally: cr.close() def create_table_from_source(self): '''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。 预留作其他用途。''' try: sql = "show create table %s;" % self.source_tab create_str = self.sourcedb_query(sql, 'select')[0][1] create_str = create_str.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS') self.destdb_query(create_str, 'ddl') return True except Exception, e: print (str(e) + "
") return False def create_table_from_template(self): try: sql = 'CREATE TABLE IF NOT EXISTS %s like %s;' % (self.dest_tab, self.template_tab) state = self.destdb_query(sql, 'ddl') if state: return True else: return False except Exception, e: print (str(e + "
") + "
") return False def get_min_max(self): """ 创建目标表、并获取源表需要迁移的总条数、最小id、最大id """ try: print ("nStarting Migrate at -- %s
") % (datetime.datetime.now().__str__()) sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAt(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') and %s <= CONCAt(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ % (self.pk, self.pk, self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval) q = self.sourcedb_query(sql, 'select') self.source_cnt = q[0][0] self.source_min_id = q[0][1] self.source_max_id = q[0][2] self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_id) + '_' + str(self.source_max_id) if self.source_cnt == 0 or self.source_min_id == -1 or self.source_max_id == -1: print ("There is 0 record in source table been matched!
") return False else: return True except Exception, e: print (str(e) + "
") return False def migrate_2_destdb(self): try: get_min_max_id = self.get_min_max() if get_min_max_id: k = self.source_min_id desc_sql = "desc %s;" % self.source_tab # self.filed = [] cols = self.sourcedb_query(desc_sql, 'select') # for j in cols: # self.filed.append(j[0]) fileds = "%s," * len(cols) # 源表有多少个字段,就拼凑多少个%s,拼接到insert语句 fileds = fileds.rstrip(',') while k <= self.source_max_id: sql = """select * from %s where %s >= %d and %s< %d and %s >= CONCAt(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') and %s <= CONCAt(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval) print ("n%s
") % sql starttime = datetime.datetime.now() results = self.sourcedb_query(sql, 'select') insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds rows = self.destdb_query(insert_sql, 'insertmany', results) if rows == False: print ("Insert failed!!
") else: print ("Inserted %s rows.
") % rows endtime = datetime.datetime.now() timeinterval = endtime - starttime print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds
") k += self.step_size print ("nInsert complete at -- %s
") % (datetime.datetime.now().__str__()) return True else: return False except Exception, e: print (str(e) + "
") return False def verify_total_cnt(self): try: sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAt(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') and %s <= CONCAt(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ % (self.pk, self.pk, self.dest_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval) dest_result = self.destdb_query(sql, 'select') self.dest_cnt = dest_result[0][0] dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2]) print ("source_checksum: %s, dest_checksum: %s
") % (self.source_checksum, dest_checksum) if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum: self._migCompleteState = True print ("Verify successfully !!
") else: print ("Verify failed !!
") sys.exit(77) except Exception, e: print (str(e) + "
") def drop_daily_partition(self): try: if self._migCompleteState: sql = """explain partitions select * from %s where %s >= CONCAt(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') and %s <= CONCAt(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ % (self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval) partition_name = self.sourcedb_query(sql, 'select') partition_name = partition_name[0][3] sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s partition (%s)""" % (self.pk, self.pk, self.source_tab, partition_name) q = self.sourcedb_query(sql, 'select') source_cnt = q[0][0] source_min_id = q[0][1] source_max_id = q[0][2] checksum = str(source_cnt) + '_' + str(source_min_id) + '_' + str(source_max_id) if source_cnt == 0 or source_min_id == -1 or source_max_id == -1: print ("There is 0 record in source PARTITION been matched!
") else: if checksum == self.source_checksum: drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab, partition_name) droped = self.sourcedb_query(drop_par_sql, 'ddl') if droped: print (drop_par_sql + "
") print ("nDrop partition complete at -- %s
") % (datetime.datetime.now().__str__()) self._deleteCompleteState = True else: print (drop_par_sql + "
") print ("Drop partition failed..
") else: print ("The partition %s checksum failed !! Drop failed !!") % partition_name sys.exit(77) except Exception, e: print (str(e) + "
") def delete_data(self): try: if self._migCompleteState: k = self.source_min_id while k <= self.source_max_id: sql = """delete from %s where %s >= %d and %s< %d and %s >= CONCAt(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') and %s <= CONCAt(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval) print ("n%s
") % sql starttime = datetime.datetime.now() rows = self.sourcedb_query(sql, 'dml') if rows == False: print ("Delete failed!!
") else: print ("Deleted %s rows.
") % rows endtime = datetime.datetime.now() timeinterval = endtime - starttime print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds
") time.sleep(1) k += self.step_size print ("nDelete complete at -- %s
") % (datetime.datetime.now().__str__()) self._deleteCompleteState = True except Exception, e: print (str(e) + "
") def do(self): tab_create = self.create_table_from_template() if tab_create: migration = self.migrate_2_destdb() if migration: self.verify_total_cnt() if self._migCompleteState: if self.deleteStrategy == 'drop': self.drop_daily_partition() else: self.delete_data() print ("n
") print ("====="*5 + '
') print ("source_total_cnt: %s
") % self.source_cnt print ("dest_total_cnt: %s
") % self.dest_cnt print ("====="*5 + '
') if self._deleteCompleteState: print ("nFinal result: Successfully !!
") sys.exit(88) else: print ("nFinal result: Failed !!
") sys.exit(254) else: print ("Create table failed ! Exiting. . .") sys.exit(255) f = ClassMigrate() f.do()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/3299777.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-05
下一篇 2022-10-05

发表评论

登录后才能评论

评论列表(0条)

保存