# -* coding:utf8 *- import sys, os, django os.environ["DJANGO_SETTINGS_MODULE"] = "IFS.settings" django.setup() import json from utils.ERP.erp_database import ERPDB sys.path.append('./') from kafka import KafkaConsumer from script.syn_procurement_data import MyPymysqlPool, send_msg from script import setlog from django.db import connection from IFS.settings import KAFKA_SERVER WARNING_URL = "https://oapi.dingtalk.com/robot/send?access_token=a1d786b94b339f21dbb3ab92ad5f896ffcec03788b954dc07f8fd4b4553f56b5" def getlist(result): list_id = [] for i in result: list_id.append(i['id']) return list_id def syn_fund_order(): """ 监控 采购应付单列表 finance_payable_order :return: """ logger = setlog.set_log('logs/synerp/syn_fund_order', 'erp_syn_fund_order.log') try: print("开始同步erp的finance_payable_order数据") consumer = KafkaConsumer( bootstrap_servers=KAFKA_SERVER, # kafka集群地址 group_id="erp_finance_payable_order", # 消费组id enable_auto_commit=True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交) auto_commit_interval_ms=5000, # 自动提交的周期(毫秒) ) consumer.subscribe(topics=["erp_finance_payable_order"]) # 消息的主题,可以指定多个 # 找it制订 for message in consumer: print('开始 *** 作') if message: value_b = message.value if len(value_b) < 10: # ??? continue global value_str value_str = value_b.decode(encoding='utf-8') value = json.loads(value_str) if not isinstance(value, dict): logger.info('获取异常消息是: {}'.format(value)) continue table = value.get('table', '') if table not in ['finance_payable_order', ]: continue data = value.get('data', {}) print('data', data) operation = value.get('operation', '') status = data.get('status', '') if not data: continue mysql = ERPDB() # erp的数据库 cur = connection.cursor() # 备货数据库 if operation == 'DELETe': sql_group = """ select * from procurement_order where purchase_id = (select id from purchase_order where name = (select po from purchase_fund_order_line where fund_order_id = (select request_id from finance_payable_order where id ={} ) ));""".format(data.get('id')) logger.info("sql_group:%s" % sql_group) results = mysql.get_all(sql_group) if results: erp_id = getlist(results) if len(erp_id) == 1: erp_id = erp_id[0] sql_del = "update StockApplicationState set state = null where erp_id ={}".format( erp_id) else: erp_id = tuple(erp_id) # 更改备货单 sql_del = "update StockApplicationState set state = null where erp_id in {}".format( erp_id) # 更改备货单 cur.update(sql_del) else: # 查询要改的备货单的erp_id sql_group = """ select * from procurement_order where purchase_id = (select id from purchase_order where name = (select po from purchase_fund_order_line where fund_order_id = (select request_id from finance_payable_order where id ={} ) ));""".format(data.get('id')) logger.info("sql_group:%s" % sql_group) results = mysql.getAll(sql_group) if results: erp_id = getlist(results) if len(erp_id) == 1: erp_id = erp_id[0] sql_del = "update StockApplicationState set fund_state = '{}' where erp_id ={}".format( status, erp_id) else: erp_id = tuple(erp_id) # 更改备货单 sql_del = "update StockApplicationState set fund_state = '{}' where erp_id in {}".format( status, erp_id) logger.info('更新stockApplication表的sql语句是:{}'.format(sql_del)) cur.execute(sql_del) mysql.dispose() logger.info("表:{} 更新完毕!id: {}".format(table, data.get('id'))) else: continue except Exception as e: logger.error("topic:ifs_stockapplciation同步erp数据异常: {}---获取队列消息是:{}".format(e, value_str)) send_msg(WARNING_URL, msg="kafka-topic:ifs_stockapplciation同步erp数据库supplier数据异常报警:{}".format(e), reminders=None) def syn_supplier_data(): """ 同步erp系统数据: 监控 supplier供应商表 :return: """ logger = setlog.set_log('logs/synerp/supplier', 'erp_supplier.log') # logger = logging.getLogger('django') try: print("开始同步supplier数据") consumer = KafkaConsumer( bootstrap_servers=KAFKA_SERVER, # kafka集群地址 group_id="erp_supplier0011", # 消费组id enable_auto_commit=True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交) auto_commit_interval_ms=5000, # 自动提交的周期(毫秒) ) consumer.subscribe(topics=["ifs_supplier"]) # 消息的主题,可以指定多个 # 找it制订 for message in consumer: print('开始操作') if message: value_b = message.value if len(value_b) < 10: # ??? continue global value_str value_str = value_b.decode(encoding='utf-8') value = json.loads(value_str) if not isinstance(value, dict): logger.info('获取异常消息是: {}'.format(value)) continue table = value.get('table', '') if table not in ['supplier', ]: continue data = value.get('data', {}) print('data', data) operation = value.get('operation', '') is_account = data.get('is_account', '') is_overdue = data.get('is_overdue', '') if not data: continue mysql = ERPDB() # erp的数据库 cur = connection.cursor() # 备货数据库 if operation == 'DELETE': sql_group = """SELECT procurement_order.id FROM procurement_order LEFT JOIN purchase_order ON procurement_order.purchase_id = purchase_order.id left join supplier on purchase_order.partner_id = supplier.erp_id where supplier.id = {};""".format( data.get('id')) logger.info("sql_group:%s" % sql_group) results = mysql.get_all(sql_group) if results: erp_id = getlist(results) if len(erp_id) == 1: erp_id = erp_id[0] sql_del = "update StockApplicationState set is_account = null,is_overdue = null, where erp_id ={}".format( erp_id) else: erp_id = tuple(erp_id) # 更改备货单 sql_del = "update StockApplicationState set is_account = null,is_overdue = null where erp_id in {}".format( erp_id) # 更改备货单 cur.update(sql_del) else: # 查询要改的备货单的erp_id sql_group = """SELECT procurement_order.id FROM procurement_order LEFT JOIN purchase_order ON procurement_order.purchase_id = purchase_order.id left join supplier on purchase_order.partner_id = supplier.erp_id where supplier.id = {};""".format( data.get('id')) logger.info("sql_group:%s" % sql_group) results = mysql.getAll(sql_group) if results: erp_id = getlist(results) if len(erp_id) == 1: erp_id = erp_id[0] sql_del = "update StockApplicationState set is_account = {},is_overdue = {} where erp_id = {}".format( is_account, is_overdue, erp_id) else: erp_id = tuple(erp_id) # 更改备货单 sql_del = "update StockApplicationState set is_account = {},is_overdue = {} where erp_id in {}".format( is_account, is_overdue, erp_id) logger.info('更新stockApplication表的sql语句是:{}'.format(sql_del)) cur.execute(sql_del) mysql.dispose() logger.info("表:{} 更新完毕!id: {}".format(table, data.get('id'))) else: continue except Exception as e: logger.error("topic:ifs_stockapplciation同步erp数据异常: {}---获取队列消息是:{}".format(e, value_str)) send_msg(WARNING_URL, msg="kafka-topic:ifs_stockapplciation同步erp数据库supplier数据异常报警:{}".format(e), reminders=None) # if __name__ == '__main__': # syn_supplier_data() # syn_fund_order()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)