hive做报表经常会有ip解析的需求,如解析ip为国家地区省份等作为大屏的地图展示。
方案hive udf配合免费的GeoLite2来实现,GeoLite2官网:https://dev.maxmind.com/geoip/geolite2-free-geolocation-data。
实现需要注意的是地址库GeoLite2-City.mmdb文件的存放位置,我看其他方案有放到hive classpath下,但是这种情况下使用hive on spark生成了spark job时会导致任务报FileNotFoundException,所以最好还是使用绝对路径。
package com.ms.hive.udf; import com.maxmind.geoip2.DatabaseReader; import com.maxmind.geoip2.model.CityResponse; import com.maxmind.geoip2.record.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.io.File; import java.io.IOException; import java.net.InetAddress; @Slf4j public class ParseIpUDF extends GenericUDF { private static DatabaseReader reader = null; @Override public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException { if (objectInspectors.length != 1) { throw new UDFArgumentException("参数个数应为1"); } return PrimitiveObjectInspectorFactory.javaStringObjectInspector; } @Override public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { if (deferredObjects == null || deferredObjects.length == 0) { return null; } String ip = deferredObjects[0].get().toString(); if (!StringUtils.isEmpty(ip)) { return parseIp(ip.replaceAll(" ", "")); } return null; } @Override public String getDisplayString(String[] strings) { return ""; } public String parseIp(String ip) { try { if (reader == null) { // 需要将文件放在集群所有节点 File database = new File("/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/etc/hive/conf.dist/GeoLite2-City.mmdb"); try { reader = new DatabaseReader.Builder(database).build(); } catch (IOException e) { log.error("ParseIpUDF init reader err, {}", e.getMessage()); e.printStackTrace(); } } InetAddress ipAddress = InetAddress.getByName(ip); CityResponse response = reader.city(ipAddress); Subdivision subdivision = response.getMostSpecificSubdivision(); City city = response.getCity(); Country country = response.getCountry(); String c = country.getNames().get("zh-CN"); if (c.contains("台湾")) { c = "中国台湾"; } String provinceName = subdivision.getNames().get("zh-CN"); String cityName = city.getNames().get("zh-CN"); return c + "," + provinceName + "," + cityName; } catch (Exception e) { e.printStackTrace(); log.error("evaluate err", e); } return null; } }
上传jars和创建hive function
add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/lombok-1.18.16.jar; add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/maxmind-db-2.0.0.jar; add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/geoip2-2.15.0.jar; add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/ms-hive-custom-1.0.1-SNAPSHOT.jar; create temporary function parse_ip as 'com.examole.hive.udf.ParseIpUDF' using jar 'hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/ms-hive-custom-1.0.1-SNAPSHOT.jar';
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)