Python 连接hive(Linux)

Python 连接hive(Linux),第1张

之所以选择基于Linux系统用Python连接hive,是因为在window下会出现Hadoop认证失败的问题。会出现执行python脚本的机器无目标hive的kerberos认证信息类似错误,也会出现sasl调用问题:

该错误我尝试多次,未能解决(有知道window下解决方案的欢迎留言),所以建议使用Linux系统。

VMware Workstation +Ubuntu

网上教程很多,本文推荐一个教程: https://blog.csdn.net/stpeace/article/details/78598333

主要是以下四个包:

在安装包sasl的过程会出现麻烦,主要是Ubuntu中缺乏sasl.h的问题,这里可以通过下面语句解决

这和centos有一些区别。

本文是基于本机虚拟机用Python连接的公司测试环境的hive(生产环境和测试环境是有隔离的,生产环境需要堡垒机才能连接)

因缺乏工程和计算机基础的知识,对很多的地方都了解的不够深入,欢迎大神指点,最后向以下两位大佬的帖子致谢:

[1] https://www.zhihu.com/question/269333988/answer/581126392

[2] https://mp.weixin.qq.com/s/cdFxkphMtJASQ7-nKt13mg

由于版本的不同,Python 连接 Hive 的方式也就不一样。

在网上搜索关键字 python hive 的时候可以找到一些解决方案。大部分是这样的,首先把hive 根目录下的$HIVE_HOME/lib/py拷贝到 python 的库中,也就是 site-package 中,或者干脆把新写的 python 代码和拷贝的 py 库放在同一个目录下,然后用这个目录下提供的 thrift 接口调用。示例也是非常简单的。类似这样:

import sys

from hive_service import ThriftHive

from hive_service.ttypes import HiveServerException

from thrift import Thrift

from thrift.transport import TSocket

from thrift.transport import TTransport

from thrift.protocol import TBinaryProtocol

def hiveExe(sql):

try:

transport = TSocket.TSocket('127.0.0.1', 10000)

transport = TTransport.TBufferedTransport(transport)

protocol = TBinaryProtocol.TBinaryProtocol(transport)

client = ThriftHive.Client(protocol)

transport.open()

client.execute(sql)

print "The return value is : "

print client.fetchAll()

print "............"

transport.close()

except Thrift.TException, tx:

print '%s' % (tx.message)

if __name__ == '__main__':

hiveExe("show tables")1234567891011121314151617181920212223242526272812345678910111213141516171819202122232425262728

或者是这样的:

#!/usr/bin/env python

import sys

from hive import ThriftHive

from hive.ttypes import HiveServerException

from thrift import Thrift

from thrift.transport import TSocket

from thrift.transport import TTransport

from thrift.protocol import TBinaryProtocol

try:

transport = TSocket.TSocket('14.18.154.188', 10000)

transport = TTransport.TBufferedTransport(transport)

protocol = TBinaryProtocol.TBinaryProtocol(transport)

client = ThriftHive.Client(protocol)

transport.open()

client.execute("CREATE TABLE r(a STRING, b INT, c DOUBLE)")

client.execute("LOAD TABLE LOCAL INPATH '/path' INTO TABLE r")

client.execute("SELECT * FROM test1")

while (1):

row = client.fetchOne()

if (row == None):

break

print rowve

client.execute("SELECT * FROM test1")

print client.fetchAll()

transport.close()

except Thrift.TException, tx:

print '%s' % (tx.message)

12345678910111213141516171819202122232425262728293031323334351234567891011121314151617181920212223242526272829303132333435

但是都解决不了问题,从 netstat 中查看可以发现 TCP 连接确实是建立了,但是不执行 hive 指令。也许就是版本的问题。

还是那句话,看各种中文博客不如看官方文档。

项目中使用的 hive 版本是0.13,此时此刻官网的最新版本都到了1.2.1了。中间间隔了1.2.0、1.1.0、1.0.0、0.14.0。但是还是参考一下官网的方法试试吧。

首先看官网的 setting up hiveserver2

可以看到启动 hiveserver2 可以配置最大最小线程数,绑定的 IP,绑定的端口,还可以设置认证方式。(之前一直不成功正式因为这个连接方式)然后还给了 python 示例代码。

import pyhs2

with pyhs2.connect(host='localhost',

port=10000,

authMechanism="PLAIN",

user='root',

password='test',

database='default') as conn:

with conn.cursor() as cur:

#Show databases

print cur.getDatabases()

#Execute query

cur.execute("select * from table")

#Return column info from query

print cur.getSchema()

#Fetch table results

for i in cur.fetch():

print i123456789101112131415161718192021123456789101112131415161718192021

在拿到这个代码的时候,自以为是的把认证信息给去掉了。然后运行发现跟之前博客里介绍的方法结果一样,建立了 TCP 连接,但是就是不执行,也不报错。这是几个意思?然后无意中尝试了一下原封不动的使用上面的代码。结果可以用。唉。。。

首先声明一下,hive-site.xml中默认关于 hiveserver2的配置我一个都没有修改,一直是默认配置启动 hiveserver2。没想到的是默认配置是有认证机制的。

然后再写一点,在安装 pyhs2的时候还是遇到了点问题,其实还是要看官方文档的,我只是没看官方文档直接用 pip安装导致了这个问题。安装 pyhs2需要确定已经安装了几个依赖包。直接看在 github 上的 wiki 吧。哪个没安装就补上哪一个就好了。

给定一个查询时间,找最近登录的用户

create temporary function row_number as "com.ai.hive.udf.util.RowNumberUDF"

select logint_time,username from 

( select ip,login_time,username from (

select ip ,select_time from a join select ip,login_time,username from b on(a.ip=b.ip and a.login_time

)t sort by login_time desc )p where row_number=1

import pandas as pd

import numpy as np

login_column_names = ['ip','dip','type','uri','time']

select_column_names = ['ip','dip','action','time']

df_login = pd.read_csv('login.txt',sep='\t',encoding='utf-8',header=None,names=login_column_names)

df_select = pd.read_csv('select.txt',sep='\t',encoding='utf-8',header=None,names=select_column_names)

df_login[['username','password','authPassword','submit']]= df_login['uri'].str.replace('j_username=','').str.replace('password=','').str.split('&',expand=True)

df_login

x=[1,2,3,6,7,8]

df_login.drop(df_login.columns[x], axis=1, inplace=True)

df_login

import time

# time.strptime(df_login['time'],"%Y-%m-%d %H:%M:%S")

df_login['time']

df_login['new_time']=0

a = 0

for x in df_login['time']:

    print(int(time.mktime(time.strptime(x,"%Y-%m-%d %H:%M:%S.%f"))))

    df_login['new_time'][a] =  int(time.mktime(time.strptime(x,"%Y-%m-%d %H:%M:%S.%f")))

    a=a+1

df_login['new_time']

import time

# time.strptime(df_login['time'],"%Y-%m-%d %H:%M:%S")

df_select['time']

df_select['new_time']=0

a = 0

for x in df_select['time']:

    print(int(time.mktime(time.strptime(x,"%Y-%m-%d %H:%M:%S.%f"))))

    df_select['new_time'][a] =  int(time.mktime(time.strptime(x,"%Y-%m-%d %H:%M:%S.%f")))

    a=a+1

df_select

df_on = df_select.merge(df_login,how='left',on=['ip'])

df_on

df_on['diff_time'] = df_on['new_time_x']-df_on['new_time_y']

df_on

df_on[df_on['diff_time']>=0]

#找时间最小的那个

df_on = df_on[df_on['diff_time']>=0]

df_on

df_on[['ip','time_x','username','diff_time']]

def min_time(df,n=3,column='diff_time'):

    return df.sort_index(by=column,ascending=False)[-n:]

df_on[['ip','time_x','username','diff_time']].groupby(['ip','time_x']).apply(min_time,n=1)


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/sjk/9246844.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-26
下一篇 2023-04-26

发表评论

登录后才能评论

评论列表(0条)

保存