Hive UDF配合Geoip2GeoLite2实现ip解析及静态文件位置导致异常的分析

Hive UDF配合Geoip2GeoLite2实现ip解析及静态文件位置导致异常的分析,第1张

Hive UDF配合Geoip2/GeoLite2实现ip解析及静态文件位置导致异常的分析 需求

hive做报表经常会有ip解析的需求,如解析ip为国家地区省份等作为大屏的地图展示。

方案

hive udf配合免费的GeoLite2来实现,GeoLite2官网:https://dev.maxmind.com/geoip/geolite2-free-geolocation-data。

实现

需要注意的是地址库GeoLite2-City.mmdb文件的存放位置,我看其他方案有放到hive classpath下,但是这种情况下使用hive on spark生成了spark job时会导致任务报FileNotFoundException,所以最好还是使用绝对路径。

package com.ms.hive.udf;

import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.record.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;

@Slf4j
public class ParseIpUDF extends GenericUDF {

    
    private static DatabaseReader reader = null;

    
    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        if (objectInspectors.length != 1) {
            throw new UDFArgumentException("参数个数应为1");
        }
        return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        if (deferredObjects == null || deferredObjects.length == 0) {
            return null;
        }
        String ip = deferredObjects[0].get().toString();
        if (!StringUtils.isEmpty(ip)) {
            return parseIp(ip.replaceAll(" ", ""));
        }
        return null;
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "";
    }
    
	public String parseIp(String ip) {
        try {
            if (reader == null) {
                // 需要将文件放在集群所有节点
                File database = new File("/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/etc/hive/conf.dist/GeoLite2-City.mmdb");
                try {
                    reader = new DatabaseReader.Builder(database).build();
                } catch (IOException e) {
                    log.error("ParseIpUDF init reader err, {}", e.getMessage());
                    e.printStackTrace();
                }
            }
            InetAddress ipAddress = InetAddress.getByName(ip);
            CityResponse response = reader.city(ipAddress);
            Subdivision subdivision = response.getMostSpecificSubdivision();
            City city = response.getCity();
            Country country = response.getCountry();
            String c = country.getNames().get("zh-CN");
            if (c.contains("台湾")) {
                c = "中国台湾";
            }
            String provinceName = subdivision.getNames().get("zh-CN");
            String cityName = city.getNames().get("zh-CN");
            return c + "," + provinceName + "," + cityName;
        } catch (Exception  e) {
            e.printStackTrace();
            log.error("evaluate err", e);
        }
        return null;
    }

}

上传jars和创建hive function

add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/lombok-1.18.16.jar;
add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/maxmind-db-2.0.0.jar;
add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/geoip2-2.15.0.jar;
add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/ms-hive-custom-1.0.1-SNAPSHOT.jar;
create temporary function parse_ip as 'com.examole.hive.udf.ParseIpUDF' using jar 'hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/ms-hive-custom-1.0.1-SNAPSHOT.jar';

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5654784.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存