使用ES的bulk接口导入批量数据(python进行格式化,curl提交)

使用ES的bulk接口导入批量数据(python进行格式化,curl提交),第1张

概述使用ES的bulk接口导入批量数据(python进行格式化,curl提交)

下面是内存溢出 jb51.cc 通过网络收集整理的代码片段。

内存溢出小编现在分享给大家,也给大家做个参考。

#!/bin/env python# -*- Coding: utf-8 -*-'''修改:2015/9/25 ver.2原因:ver.1 要使用elasticsearch的官方库,不方便,这版使用bulk接口,curl提交修改:2015/9/30 ver.3原因:封装成class,方便调用'''import sysimport osfrom optparse import OptionParserfrom datetime import datetimeimport subprocess as subimport Jsonclass loadDataToES:    def __init__(self,fIEld_desc,data_file,host='127.0.0.1',port='9200',index='test',DOCTYPE='others',delimeter=',',tmp_file='/dev/shm/_tmp_data_to_es',cut_off=10000):        self.host = host        self.port = port        self.index = index        self.DOCTYPE = DOCTYPE        self.delimeter = delimeter        self.tmp_file = tmp_file        self.fIEld_desc = fIEld_desc        self.data_file = data_file        self.header = '{"index":{"_index":"%s","_type":"%s"}}' %(self.index,self.DOCTYPE)        self.cut_off = cut_off        self.url = 'http://%s:%s/_bulk' %(self.host,self.port)    def load_data(self):        '''        expample data from the file:        2015-09-24 09:17:29,memory_11601,123988        '''        self.body_List = []        self.bulk = ''        self.line_num = 0        self.pretty_print('INFO: loadding data to es,host: %s,index: %s' %(self.host,self.index))        self.parse_fIEld()        with open(self.data_file,'r') as f_desc:            for line in f_desc:                self.do_line(line)                self.line_num += 1                if self.line_num >= self.cut_off:                    self.bulk_content = '\n'.join(self.body_List)                    self._load_data()                    self.body_List = []                    self.bulk = ''                    self.line_num = 0            if self.line_num > 0:                self.bulk_content = '\n'.join(self.body_List)                self._load_data()        self.pretty_print('INFO: all lines parsed.')    def do_line(self,line):        fIElds = line.strip().split(self.delimeter)        if len(fIElds) != self.fIEld_len:            self.pretty_print("ERROR: line %d not match fIElds" % line_num)            return        body_tmp = str(self.get_body(fIElds,self.fIElds_List))        self.body_List.append(self.header)        self.body_List.append(body_tmp.replace("'",'"'))    def parse_fIEld(self):        fIElds_List = []        fIElds_desc = self.fIEld_desc.strip().split(self.delimeter)        for item in fIElds_desc:            items = item.split('|')            fIElds_List.append([items[0],items[1]])        self.fIElds_List = fIElds_List        self.fIEld_len = len(fIElds_List)    def _load_data(self):        open(self.tmp_file,'w').write(self.bulk_content)        p = sub.Popen(['curl','-s','-XPOST',self.url,'--data-binary',"@" + self.tmp_file ],stdout=sub.PIPE)        for line in iter(p.stdout.readline,b''):            ret_dict = Json.loads(line)            if not ret_dict['errors']:                self.pretty_print("INFO: %6s lines parseed with no errors,total cost %d ms." %(len(ret_dict['items']),ret_dict['took']))            else:                self.pretty_print("ERROR: %6s lines parseed with some errors,ret_dict['took']))    def pretty_print(self,str):        print('%s %s' %(datetime.Now(),str))    def get_body(self,fIElds,fIElds_List):        counter = 0        body = {}        while (counter < len(fIElds)):            # if the data type is 'date',we will translate the value str to date            # type            if fIElds_List[counter][1] == 'date':                body[fIElds_List[counter][0]] = self.translate_str_to_date(                    fIElds[counter])            # and if the data type is 'int',we translate it to int            elif fIElds_List[counter][1] == 'int':                body[fIElds_List[counter][0]] = self.translate_str_to_int(                    fIElds[counter])            elif fIElds_List[counter][1] == 'float':                body[fIElds_List[counter][0]] = self.translate_str_to_float(                    fIElds[counter])            # other is defalut to be str            else:                body[fIElds_List[counter][0]] = fIElds[counter]            counter += 1        # print(my_body)        return body    def translate_str_to_date(self,date_str):        try:            date = datetime.strptime(date_str,'%Y-%m-%d %H:%M:%s')            return date.isoformat()        except:            self.pretty_print("Unexpected error: %s" % (sys.exc_info()[0]))            self.pretty_print("Failed to translate '%s' to date." % (date_str))        return False    def translate_str_to_int(self,num_str):        try:            return int(num_str)        except:            self.pretty_print("Failed to translate '%s' to int." % (num_str))        return False    def translate_str_to_float(self,num_str):        try:            return float(num_str)        except:            self.pretty_print("Failed to translate '%s' to int." % (num_str))        return Falseif __name__ == '__main__':    '''    example fIElds_desc:@timestamp|date,process|str,mem|int    example lines in file:    2015-09-24 09:17:29,203532    2015-09-24 09:17:29,memory_11602,223112    '''    loader = loadDataToES(fIEld_desc='@timestamp|date,mem|int',data_file='/root/scripts/in.data',host='10.21.102.60',index = 'test')    loader.load_data()

以上是内存溢出(jb51.cc)为你收集整理的全部代码内容,希望文章能够帮你解决所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

总结

以上是内存溢出为你收集整理的使用ES的bulk接口导入批量数据(python进行格式化,curl提交)全部内容,希望文章能够帮你解决使用ES的bulk接口导入批量数据(python进行格式化,curl提交)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1199047.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存