python查询mysql以及es,将数据写到文件中

python查询mysql以及es,将数据写到文件中,第1张

python查询mysql以及es,将数据写到文件中 python查询mysql以及es,将数据写到文件中
import datetime
import json
import time
import pymysql
import requests



def select_active_users(start_time, end_time):
    list = {}
    # 创建数据库连接
    conn = pymysql.connect(
        # host='i9p6z45927c1tw849.mysql.rds.ops.console.avic-internal.com',
        # port=3306,
        # user="igmp",
        # passwd="d",
        # db="resource_admin"

        host='10.2.1.5',
        port=30306,
        user="root",
        passwd="",
        db="koal_audit_dev"
    )
    print("mysql connect succ!")
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    sql = 'SELECT user_id FROM user_login_status where last_time BETWEEN "%s" and "%s"'
    sql = sql % (start_time, end_time)
    cursor.execute(sql)
    print(sql)
    list = cursor.fetchall()
    print("list", list)
    return list


def get_result(now_time, user_id, _30_ago_es_time):
    result_data = {}
    # base_url = 'http://10.129.81.130:9200/idaas_log*/_search'
    base_url = 'http://10.2.0.5:9200/system*/_search'
    # query_bady='{"size": 10,"query": {"bool": {"must": [{"term": {"host_name": "localhost.localdomain"}}]}}}'
    query_bady = '{"size":0,"query":{"bool":{"filter":[{"range":{"flink_time":{"gte":"2020-02-04T04:56:01.000Z","lte":"2021-12-04T12:56:01.000Z"}}}],"must":[{"term":{"host_name":"localhost.localdomain"}}]}},"aggs":{"app_aggs":{"terms":{"field":"uptime"}}}}'
    # query_bady = '{"size":0,"query":{"bool":{"filter":[{"range":{"flink_time":{"gte":"%s","lte":"%s"}}}],"must":[{"term":{"user_id":"%s"}}]}},"aggs":{"app_aggs":{"terms":{"field":"app_id"}}}}'
    # headers = {'content-type': 'application/json', 'Authorization': 'Basic ZWxhc3RpYzpLMGExQDc0MTA='}
    headers = {'content-type': 'application/json', 'Authorization': 'Basic ZWxhc3RpYzoxMjM0NWE='}
    # query_bady = query_bady % (_30_ago_es_time, now_time, user_id)
    r = requests.get(url=base_url, data=query_bady, headers=headers)
    print("r: ", r)
    buckets_list = r.json()["aggregations"]["app_aggs"]["buckets"]
    print("str: ", str(r.json()))
    for i in range(len(buckets_list)):
        key_str = buckets_list[i]['key']
        result_data[key_str] = buckets_list[i]["doc_count"]
    print("result_data: ", result_data)
    return result_data


def write_result_csv():
    days = 180
    now_time = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.localtime())
    print("now_time", now_time)
    today_sql_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    _30_ago_sql_time = (datetime.datetime.now() - datetime.timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
    _30_ago_es_time = (datetime.datetime.now() - datetime.timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
    active_users = select_active_users(_30_ago_sql_time, today_sql_time)
    result = {}
    for i in range(len(active_users)):
        user_id = active_users[i]["user_id"]
        r = get_result(now_time, user_id, _30_ago_es_time)
        result['user_id'] = user_id
        result['data'] = json.dumps(r)
        with open('user_access_app_result.txt', 'a', encoding='utf-8') as f:
            f.write(json.dumps(result))
            f.write("n")
        result.clear()
        time.sleep(1)


if __name__ == '__main__':
    now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    print("start: ", now_time)
    write_result_csv()
    now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    print("end: ", now_time)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5680234.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存