Python 函数链2 一个基于规则集串联的场景

Python 函数链2 一个基于规则集串联的场景,第1张

概述说明先看一个将多个规则集串联起来的场景。这是规则集刚完成,一个比较粗糙的集成调用,主要的目的是为了观察一下应用过程的流程,看看哪里有问题。在开发函数链的时候可以一并解决掉。内容1场景实现数据获取、数据预处理、数据处理和输出的功能。2数据获取从数据源获取数 说明

先看一个将多个规则集串联起来的场景。这是规则集刚完成,一个比较粗糙的集成调用,主要的目的是为了观察一下应用过程的流程,看看哪里有问题。在开发函数链的时候可以一并解决掉。

内容1 场景

实现数据获取、数据预处理、数据处理和输出的功能。

2 数据获取

从数据源获取数据。感觉比较麻烦的地方是换了一个环境,逐个的检查依赖的函数。之后可以写一个函数,自动检查依赖函数(fs.xxx),读入之后再检查是否还有依赖,自动载入。

import FuncDict as fdimport pandas as pd# 逐个导入依赖函数列表的函数# cur_func_dict.dynamic_load_func_db('path_traceback', method='force')# 依赖函数列表depend_List = ['run_ruleset_v2','WebMsg1','get_rule_input_kw_v2','run_a_rule_v2',              'rulev002_check_and_pass_dict_keys_in_sets','rulev002_gen_slice_List',              'slice_List_by_batch1','rulev002_read_log_simple','rulev002_real_slice_List',              'create_folder_if_notexist','rulev002_simple_get_iter_slice_List',              'web_get_by_Json','get_time_str','to_pickle','logging_str_a_row',              'path_traceback','rulev002_simple_get_report','parse_log1','amend_path_slash']
1 读入规则集
query_res_List =  func_lmongo.query_many(dbname='rules', cname='rules',kv_List = [{'rule_set':'BatchSimpleGet'}], max_recs=10000)[0]BatchSimpleGet_df = pd.DataFrame(query_res_List)
2 输入参数
# 自由定义folder_path = './test001_simple_get/'max_files_in_folder = 1000ID_min = 0ID_max = 2000read_step = 500url_template = 'http://xxx/solr/xxx/select?q=docID:%s'# 公网测试地址# 局域网地址target_url = 'http://192.168.1.146:20002/API/'# -- 封装cur_dict ={}cur_dict['input_dict'] = {}cur_dict['input_dict']['folder_path'] = folder_pathcur_dict['input_dict']['max_files_in_folder'] = max_files_in_foldercur_dict['input_dict']['ID_min'] = ID_mincur_dict['input_dict']['ID_max'] = ID_maxcur_dict['input_dict']['read_step'] = read_stepcur_dict['input_dict']['url_template'] = url_templatecur_dict['input_dict']['target_url'] = target_url
3 执行
fs.run_ruleset_v2('simple_get', BatchSimpleGet_df, cur_dict, fs)---{'name': 'simple_get', 'status': True, 'msg': 'ok', 'duration': 8, 'data': {'total_success_recs': 4,  'cur_success_recs': 0,  'cur_start_dt': None,  'cur_end_dt': None,  'cur_mean_seconds': None,  'data_path': './test001_simple_get/',  'log_path': './test001_simple_get/read.log',  'cur_slice_List': []}}
3 数据预处理1 导包
import FuncDict as fdimport pandas as pdcur_func_dict = fd.FuncDict1('ner_preprocessing', pack_fpath='./funcs/',lmongo=fd.func_lmongo)import funcs as fscur_func_dict.fs = fsfunc_lmongo = fd.func_lmongo
2 导入依赖函数
# 依赖函数列表depend_List = ['from_pickle','get_batch_file1','judge_doc_pull_data_get_results',              'flat_dict','parse_judge_doc_to_df',              'rulev002_get_docID_ss','md5_trans','txt_etl','set_interval_judge',              'interval_judge','rulev002_get_company_ss_hash','extract_str_base',              'str_contains_words','rulev002_for_ss_List','split_serIEs_by_interval',              'rulev002_ner_proprocessing_output']
3 将get到的响应Json转换
这一步没有纳入规则集里,因为这个会随着不同的任务变
get_input_folder = './test001_simple_get/'pkl_file_List  = fs.get_batch_file1(get_input_folder, '.pkl', 1000)response_List = []for pkl_file in pkl_file_List:    pkl_name = pkl_file.replace('.pkl','')    tem_List = fs.from_pickle(pkl_name, path = get_input_folder)    response_List+=tem_Listres_df = fs.parse_judge_doc_to_df(response_List, fs=fs)

将原始数据转为了只有doc_IDcontent两列的df。

4 读入规则集
query_res_List =  func_lmongo.query_many(dbname='rules', cname='rules',kv_List = [{'rule_set':'NerPreProcessing'}], max_recs=10000)[0]NerPreProcessing_df = pd.DataFrame(query_res_List)
5 定义输入类型
# 自由定义# 每个短句列表的最大长度,考虑内存/显存max_ss_List_len = 4000# 每个短句允许的最大长度,模型有处理的最大限制(认为公司名称最小不短于4个字)ss_len_min = 4ss_len_max = 100# 这个参数也可以不写(默认左闭右开)intever_type = 'left_close'# 数据部分# data = [{'doc_ID' : 'a001','content':'oppo公司竟然不支持鸿蒙'},#         {'doc_ID':'a002','content':'华为公司开源了鸿蒙,是个好东西'},#         {'doc_ID':'a003','content':'你觉得鸿蒙怎么样'}]data =data_dictdata_type = 'dict_List'# 一般性过滤文本default_pass_str = r'[\u4e00-\u9fa5\da-zA-Z\-()().,]'# 如果考虑人名再加两个点 default_pass_str = '[\u4e00-\u9fa5\da-zA-Z\-()().,·•]'# 输出模式 realtime/persistent# output_mode = 'realtime'output_mode = 'persistent'output_folder = './test001_pre_processed/'# 实体识别任务ner_task='company'# -- 封装cur_dict ={}cur_dict['input_dict'] = {}cur_dict['input_dict']['max_ss_List_len'] = max_ss_List_lencur_dict['input_dict']['ss_len_min'] = ss_len_mincur_dict['input_dict']['ss_len_max'] = ss_len_maxcur_dict['input_dict']['intever_type'] = intever_typecur_dict['input_dict']['data'] = datacur_dict['input_dict']['data_type'] = data_typecur_dict['input_dict']['default_pass_str'] = default_pass_strcur_dict['input_dict']['output_mode'] = output_modecur_dict['input_dict']['output_folder'] = output_foldercur_dict['input_dict']['ner_task'] = ner_task
5 调用规则集
fs.run_ruleset_v2('ner_preprocessing', NerPreProcessing_df, cur_dict, fs)---takes 0.22data save to pickle:  ./test001_pre_processed/7e65213bc8dea995441378335b14780d.pkl

关于数据处理的模式稍微有点乱:

1 如果是离线模式,是文件夹到文件夹还是独立处理的2 如果是在线模式,文件应该放哪4 数据处理1 传入模型数据
fs.unzip_a_folder('./model.zip', './model')

应该搭一个ftp,使用wget方式来获取文件。这块属于文件的IO。

2 模型的参数

这块可能需要做一些管理。一个模型应该有个名字,对应的参数可以封装好放在一起。

# 要载入模型# 模型初始设置path_name = './'model_checkpoint = path_name + 'model/model_v0/'label_List = ['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC', 'B-T', 'I-T']max_len = 200import torchimport transformersfrom transformers import autoTokenizer, autoModelForTokenClassification# Setting up the device for GPU usagefrom torch import cudadevice = 'cuda' if cuda.is_available() else 'cpu'print('device available', device)tokenizer = autoTokenizer.from_pretrained(model_checkpoint)assert isinstance(tokenizer, transformers.PreTrainedTokenizerFast)# 预载入(device=cpu)model = autoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(label_List))from functools import partial tencoder = partial(tokenizer.encode,truncation=True, max_length=max_len, is_split_into_words=True, return_tensors="pt")
3 载入规则集
query_res_List =  func_lmongo.query_many(dbname='rules', cname='rules',kv_List = [{'rule_set':'NerModelPredict'}], max_recs=10000)[0]NerModelPredict_df = pd.DataFrame(query_res_List)
4 初始化

感觉处理数据的格式并不统一,有必要规定离线文件的标准格式

# 依赖函数列表depend_List = ['unzip_a_folder','rulev002_check_pkl_files','get_List_gap',              'rulev002_ner_predict_pkl_files','ner_batch_predict',              'inv_convert_chinese_word_tag_List2BIO','rulev002_ner_merge_predict_result']# 自由定义# -- 封装cur_dict ={}cur_dict['input_dict'] = {}# 模型参数cur_dict['input_dict']['model_checkpoint'] = model_checkpointcur_dict['input_dict']['label_List'] = label_Listcur_dict['input_dict']['max_len'] = max_lencur_dict['input_dict']['device'] = devicecur_dict['input_dict']['tokenizer'] = tokenizercur_dict['input_dict']['model'] = modelcur_dict['input_dict']['tencoder'] = tencoder# 输入数据cur_dict['input_dict']['input_folder'] = './test001_pre_processed/'# 允许的最大文件数(如果数据大的话就需要即时的把处理完成的文件删除掉)cur_dict['input_dict']['allow_fetch_num'] = 100000# 指定设备。只有cuda和cpu两种,如果device没有cuda,那么就自动转为cpucur_dict['input_dict']['user_device'] = 'cuda'---fs.run_ruleset_v2('ner_modelpredict', NerModelPredict_df, cur_dict, fs){'name': 'ner_modelpredict', 'status': True, 'msg': 'ok', 'duration': 4, 'data': {'cur_processed_files': 0,  'already_processed_files': 1,  'save_folder': './test001_pre_processed_predict/',  'cur_file_List': []}}
总结

整体效果比较符合预期:多付出一点时间在管理上,换来使用的可靠和方便。

这些规则集都是单独开发的,第一次在全新的环境下拼凑起来,整个过程还是很流畅的。一半的迁移时间花在导入依赖函数上,另外还有一些时间花在文件资源(模型)的导入上。

虽然在单独开发规则集的时候没有统一的IO考虑(这是个问题),但是并没有给集成使用造成太多麻烦,没有出现担心的BUG调试。

不过5原则。我发现一个规则集,不超过5条规则是比较舒服的。我在集成的时候也就分为拉取数据、预处理数据、处理数据和准备输出4部分。 这样在每一个层级都是非常清晰的,复杂度可以通过多层来实现。假设一个函数是5的复杂度(5个计算逻辑,有时候可以堆一二十个),一个规则集的复杂度就是25,一个函数链就是125。但通过分层,我们要面对的一直是5的复杂度,所以感觉比较轻松。在100复杂度下足够处理一个专门的业务问题了。

规则断点报错。这部分还不是很完善,有的时候断了没有报(例如函数的参数不对)。但是看到某个规则集的某条规则错了,定位起来很舒服。

设计函数链需要考虑的几个改进: 1 IO模式的规定与统一2 载入包的简化3 连接的规定 总结

以上是内存溢出为你收集整理的Python 函数链2 一个基于规则集串联的场景全部内容,希望文章能够帮你解决Python 函数链2 一个基于规则集串联的场景所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1185054.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-03
下一篇 2022-06-03

发表评论

登录后才能评论

评论列表(0条)

保存