可运行的代码,end部分需要优化??
def download_material_info2(param):
print('begin download_material_info')
# sql_conn = Oracle(JndiNames.EPO_DS)
try:
sql_conn = Oracle(JndiNames.EPO_DS)
param["buPlant"] = "1"
if not param['bu'] and not param['materialSource']:
param["buPlant"] = ""
# if '%' in param['specs']:
# param["specsLike"] = param["specs"]
# param["specs"] = ""
if '%' in param['makerNo']:
param["makerNoLIKE"] = param["makerNo"]
param["makerNo"] = ""
if '%' in param['makerName']:
param["makerNameLIKE"] = param["makerName"]
param["makerName"] = ""
if '%' in param['hhNo']:
param["hhNoLIKE"] = param["hhNo"]
param["hhNo"] = ""
if param['invalid'] == "invalid" and len(param["sourcer"]) == 0:
param["sourcerInvalid"] = "1"
t1 = time.time()
rsp_info_count = DalMaterialInfo(sql_conn).download_material_info_count(param)
rsp_info_count = rsp_info_count[0][0]
print('rsp_info_count', rsp_info_count)
l = []
step = 7000
for i in range(0, rsp_info_count, step):
l.append(i)
# print(l)
print(len(l))
start = 0
end = 5
#多线程 多进程查询SQL
# rsp_info = DalMaterialInfo(sql_conn).download_material_info_start_end(param, start, end)
class MyThread(Thread):
def __init__(self, func, *args): # 根据自身需求,将需要传出返回值的方法(func)和参数(args)传入,然后进行初始化
# super(MyThread, self).__init__() # 对父类属性初始化
Thread.__init__(self) # 这样写也可以,目的就是为了初始化父类属性
self.func = func
self.args = args
# 重写run方法进行 *** 作运算
def run(self):
self.result = self.func(*self.args) # 将参数(args)传入方法(func)中进行运算,并得到最终结果(result)
# 构造get_result方法传出返回值
def get_result(self):
try:
return self.result # 将结果return
except Exception:
return None
# l = [0, 5000, 10000]
threads = []
info_list = []
for index, value in enumerate(l):
start = value
end = value + step
sql_conn = Oracle(JndiNames.EPO_DS)
t = MyThread(DalMaterialInfo(sql_conn).download_material_info_start_end, param, start, end, sql_conn)
t.start()
threads.append(t)
for index, t in enumerate(threads):
info_dict = {}
t.join()
info_dict[f'{index}'] = t.get_result()
# print('info dict', info_dict)
info_list.append(info_dict)
# print('info_list',info_list)
print('len info list', len(info_list))
#按照字典key排序
def fun(d):
for k,v in d.items():
return k
#获取字典中的所有value
def get_value(d):
l = []
for k, v in d.items():
# print('v', v)
# print('len v', len(v))
for t in v:
l.append(t)
return l
info_list.sort(key=fun)
l = [get_value(x) for x in info_list]
# print(info_list)
result = []
[result.extend(x) for x in l]
print('len result', len(result))
# print(len(rsp_info) ,rsp_info)
t2 = time.time()
print('get rsp info last', t2 - t1)
time.sleep(1000)
#原始查询
# rsp_info = DalMaterialInfo(sql_conn).download_material_info(param)
# print('rsp info', str(rsp_info)[:200])
csv_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + '.csv'
xlsx_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + '.xlsx'
download_dir = r'E:\code\pypy3.9\gscm_pypy\apps\download'
csv_path = os.path.join(download_dir, csv_name)
xlsx_path = os.path.join(download_dir, xlsx_name)
try:
with open(csv_path, "w", encoding='utf_8_sig', newline='') as fp:
writer = csv.writer(fp, delimiter=",")
# writer.writerow(["your", "header", "foo"]) # write header
writer.writerows(rsp_info)
except:
pass
t6 = time.time()
print('write to csv', t6 - t2)
# head_info = get_excel_head("ppm_and_factory_download", "download")
# if not len(rsp_info) > 0:
# raise Exception('No Data!')
# base_path = head_info.path
# 5 定义excel
t3 = time.time()
head_info, workbook, worksheet, filename = create_workbook_sheet('material_info_download')
t7 = time.time()
print('get head info', t7 - t3, head_info, workbook, worksheet, filename)
# xlsx_path = os.path.join(download_dir, filename)
pyexe_path = r'E:\code\pypy3.9\pypy.exe'
py_path = r'E:\code\pypy3.9\gscm_pypy\apps\basic_information_mgt\view\t3.py'
a0 = "--csv_path"
a1 = "--xlsx_path"
p = subprocess.Popen([pyexe_path, py_path, a0,csv_path, a1, xlsx_path], shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, encoding='utf-8')
out = p.communicate()[0]
print('out-->', out)
print(type(out))
workbook.close()
# 返回下载路径
begin_path = get_download_beginning_by_sys_envir_new()
download_url = begin_path + head_info.response_path + filename
db.session.close()
sql_conn.close()
print('download_url', download_url)
t4 = time.time()
print('generate excel last', t4 - t3)
print('total last', t4 - t1)
return download_url
except Exception as e:
sql_conn.close()
db.session.close()
raise Exception("Download failed!" + str(e))
官方参考
import cx_Oracle
import threading
from urllib import urlopen
#subclass of threading.Thread
class AsyncBlobInsert(threading.Thread):
def __init__(self, cur, input):
threading.Thread.__init__(self)
self.cur = cur
self.input = input
def run(self):
blobdoc = self.input.read()
self.cur.execute("INSERT INTO blob_tab (ID, BLOBDOC) VALUES(blob_seq.NEXTVAL, :blobdoc)", {'blobdoc':blobdoc})
self.input.close()
self.cur.close()
#main thread starts here
inputs = []
inputs.append(open('/tmp/figure1.bmp', 'rb'))
inputs.append(urlopen('http://localhost/_figure2.bmp', 'rb'))
dbconn = cx_Oracle.connect('usr', 'pswd', '127.0.0.1/XE',threaded=True)
dbconn.autocommit = True
for input in inputs:
cur = dbconn.cursor()
cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
th = AsyncBlobInsert(cur, input)
th.start()
https://www.oracle.com/technical-resources/articles/embedded/vasiliev-python-concurrency.html
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)