下面是内存溢出 jb51.cc 通过网络收集整理的代码片段。
内存溢出小编现在分享给大家,也给大家做个参考。
#!/bin/env python# -*- Coding: utf-8 -*-'''修改:2015/9/25 ver.2原因:ver.1 要使用elasticsearch的官方库,不方便,这版使用bulk接口,curl提交修改:2015/9/30 ver.3原因:封装成class,方便调用'''import sysimport osfrom optparse import OptionParserfrom datetime import datetimeimport subprocess as subimport Jsonclass loadDataToES: def __init__(self,fIEld_desc,data_file,host='127.0.0.1',port='9200',index='test',DOCTYPE='others',delimeter=',',tmp_file='/dev/shm/_tmp_data_to_es',cut_off=10000): self.host = host self.port = port self.index = index self.DOCTYPE = DOCTYPE self.delimeter = delimeter self.tmp_file = tmp_file self.fIEld_desc = fIEld_desc self.data_file = data_file self.header = '{"index":{"_index":"%s","_type":"%s"}}' %(self.index,self.DOCTYPE) self.cut_off = cut_off self.url = 'http://%s:%s/_bulk' %(self.host,self.port) def load_data(self): ''' expample data from the file: 2015-09-24 09:17:29,memory_11601,123988 ''' self.body_List = [] self.bulk = '' self.line_num = 0 self.pretty_print('INFO: loadding data to es,host: %s,index: %s' %(self.host,self.index)) self.parse_fIEld() with open(self.data_file,'r') as f_desc: for line in f_desc: self.do_line(line) self.line_num += 1 if self.line_num >= self.cut_off: self.bulk_content = '\n'.join(self.body_List) self._load_data() self.body_List = [] self.bulk = '' self.line_num = 0 if self.line_num > 0: self.bulk_content = '\n'.join(self.body_List) self._load_data() self.pretty_print('INFO: all lines parsed.') def do_line(self,line): fIElds = line.strip().split(self.delimeter) if len(fIElds) != self.fIEld_len: self.pretty_print("ERROR: line %d not match fIElds" % line_num) return body_tmp = str(self.get_body(fIElds,self.fIElds_List)) self.body_List.append(self.header) self.body_List.append(body_tmp.replace("'",'"')) def parse_fIEld(self): fIElds_List = [] fIElds_desc = self.fIEld_desc.strip().split(self.delimeter) for item in fIElds_desc: items = item.split('|') fIElds_List.append([items[0],items[1]]) self.fIElds_List = fIElds_List self.fIEld_len = len(fIElds_List) def _load_data(self): open(self.tmp_file,'w').write(self.bulk_content) p = sub.Popen(['curl','-s','-XPOST',self.url,'--data-binary',"@" + self.tmp_file ],stdout=sub.PIPE) for line in iter(p.stdout.readline,b''): ret_dict = Json.loads(line) if not ret_dict['errors']: self.pretty_print("INFO: %6s lines parseed with no errors,total cost %d ms." %(len(ret_dict['items']),ret_dict['took'])) else: self.pretty_print("ERROR: %6s lines parseed with some errors,ret_dict['took'])) def pretty_print(self,str): print('%s %s' %(datetime.Now(),str)) def get_body(self,fIElds,fIElds_List): counter = 0 body = {} while (counter < len(fIElds)): # if the data type is 'date',we will translate the value str to date # type if fIElds_List[counter][1] == 'date': body[fIElds_List[counter][0]] = self.translate_str_to_date( fIElds[counter]) # and if the data type is 'int',we translate it to int elif fIElds_List[counter][1] == 'int': body[fIElds_List[counter][0]] = self.translate_str_to_int( fIElds[counter]) elif fIElds_List[counter][1] == 'float': body[fIElds_List[counter][0]] = self.translate_str_to_float( fIElds[counter]) # other is defalut to be str else: body[fIElds_List[counter][0]] = fIElds[counter] counter += 1 # print(my_body) return body def translate_str_to_date(self,date_str): try: date = datetime.strptime(date_str,'%Y-%m-%d %H:%M:%s') return date.isoformat() except: self.pretty_print("Unexpected error: %s" % (sys.exc_info()[0])) self.pretty_print("Failed to translate '%s' to date." % (date_str)) return False def translate_str_to_int(self,num_str): try: return int(num_str) except: self.pretty_print("Failed to translate '%s' to int." % (num_str)) return False def translate_str_to_float(self,num_str): try: return float(num_str) except: self.pretty_print("Failed to translate '%s' to int." % (num_str)) return Falseif __name__ == '__main__': ''' example fIElds_desc:@timestamp|date,process|str,mem|int example lines in file: 2015-09-24 09:17:29,203532 2015-09-24 09:17:29,memory_11602,223112 ''' loader = loadDataToES(fIEld_desc='@timestamp|date,mem|int',data_file='/root/scripts/in.data',host='10.21.102.60',index = 'test') loader.load_data()
以上是内存溢出(jb51.cc)为你收集整理的全部代码内容,希望文章能够帮你解决所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
总结以上是内存溢出为你收集整理的使用ES的bulk接口导入批量数据(python进行格式化,curl提交)全部内容,希望文章能够帮你解决使用ES的bulk接口导入批量数据(python进行格式化,curl提交)所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)