hive实现自定义函数

hive实现自定义函数,第1张

hive实现自定义函数

自定义函数
  • 0.编程思想得到升华
  • 0. hive中用到的对象
  • 1.UDF
  • 2.UDAF
  • 2.UDTF
  • 3.UDTF升级版
  • 4.创建临时函数
  • 5.创建永久函数

0.编程思想得到升华
  1. 当写这个框架的自定义类时,大都要继承框架的某些类
  2. 框架中的对象基本都是再次包装过的,基本不能之间 new 出来 ,可以考虑在对象名后面加上factory(工厂)
  3. 当学习新的知识时,看源码是一个非常好的方式,虽然现在还是看得有点懵逼,不过可以先理解大致流程.
  4. 实现类时,编写简单或自己用的函数,自需要写官方规定的方法即可。一般都是抽象方法会有提示.
0. hive中用到的对象
  1. Cloneable: 可克隆的 是一个接口
  2. ObjectInspector: obj对象鉴别器 是一个接口 继承与 Cloneable
  3. ObjectInspectorFactory: obj对象鉴别器工厂 是一个final class 最终类 用于生产对象
  4. PrimitiveObjectInspectorFactory: 基本类型 obj对象鉴别器工厂 是一个final class 最终类 与 java类型对接
  5. StructObjectInspector : 结构体obj对象鉴别器 是一个实现类 实现了 ObjectInspector 接口 是一个抽象类 干啥的没解析注释看不懂!!!
1.UDF

一进一出

package review.myudf;


import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;



public class GetLengthStr extends GenericUDF {
    
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        // 1.判断参数个数
        if (arguments.length != 1 || arguments == null){
            // 抛出异常
            throw new UDFArgumentLengthException("参数数量只能是一个");
        }
        // 2.判断参数类型 要求基本数据类型
        if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
            throw new UDFArgumentTypeException(0, "参数类型异常");
        }
        // 3.返回该对象数据类型
        return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    }

    
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        // 1.先获取参数
        Object str = arguments[0].get();
        // 2.考虑null值的特殊型 因为 hive中 有null   不是这个"null" 这是字符串
        if (str == null){
            return 0;
        }
        // 3.获取长度
        int length = str.toString().length();
        // 4.返回结果
        return length;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "";
    }
}

对比源码:

public class GenericUDFAbs extends GenericUDF {
  private transient PrimitiveCategory inputType;
  private final DoubleWritable resultDouble = new DoubleWritable();
  private final LongWritable resultLong = new LongWritable();
  private final IntWritable resultInt = new IntWritable();
  private final HiveDecimalWritable resultDecimal = new HiveDecimalWritable();
  private transient PrimitiveObjectInspector argumentOI;
  private transient Converter inputConverter;

  @Override
  public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    if (arguments.length != 1) {
      throw new UDFArgumentLengthException(
          "ABS() requires 1 argument, got " + arguments.length);
    }

    if (arguments[0].getCategory() != Category.PRIMITIVE) {
      throw new UDFArgumentException(
          "ABS only takes primitive types, got " + arguments[0].getTypeName());
    }
    argumentOI = (PrimitiveObjectInspector) arguments[0];

    inputType = argumentOI.getPrimitiveCategory();
    ObjectInspector outputOI = null;
    switch (inputType) {
    case SHORT:
    case BYTE:
    case INT:
      inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
          PrimitiveObjectInspectorFactory.writableIntObjectInspector);
      outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
      break;
    case LONG:
      inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
          PrimitiveObjectInspectorFactory.writableLongObjectInspector);
      outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
      break;
    case FLOAT:
    case STRING:
    case DOUBLE:
      inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
          PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
      outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
      break;
    case DECIMAL:
      outputOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
          ((PrimitiveObjectInspector) arguments[0]).getTypeInfo());
      inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
          outputOI);
      break;
    default:
      throw new UDFArgumentException(
          "ABS only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
    }
    return outputOI;
  }

  @Override
  public Object evaluate(DeferredObject[] arguments) throws HiveException {
    Object valObject = arguments[0].get();
    if (valObject == null) {
      return null;
    }
    switch (inputType) {
    case SHORT:
    case BYTE:
    case INT:
      valObject = inputConverter.convert(valObject);
      resultInt.set(Math.abs(((IntWritable) valObject).get()));
      return resultInt;
    case LONG:
      valObject = inputConverter.convert(valObject);
      resultLong.set(Math.abs(((LongWritable) valObject).get()));
      return resultLong;
    case FLOAT:
    case STRING:
    case DOUBLE:
      valObject = inputConverter.convert(valObject);
      if (valObject == null) {
        return null;
      }
      resultDouble.set(Math.abs(((DoubleWritable) valObject).get()));
      return resultDouble;
    case DECIMAL:
      HiveDecimalObjectInspector decimalOI =
          (HiveDecimalObjectInspector) argumentOI;
      HiveDecimalWritable val = decimalOI.getPrimitiveWritableObject(valObject);

      if (val != null) {
        resultDecimal.set(val);
        resultDecimal.mutateAbs();
        val = resultDecimal;
      }
      return val;
    default:
      throw new UDFArgumentException(
          "ABS only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
    }
  }

  @Override
  public String getDisplayString(String[] children) {
    return getStandardDisplayString("abs", children);
  }

}

2.UDAF
  1. 多进一出
  2. 不知咋的,实现该类时,显示已经过时就没有深入了
  3. 老师提都没提到
  4. 找了几个继承类都过时了 所以用UDTF 实现了一下
  5. 但对字段会显示多行
  6. 应该是因为每行数据都会 调用一次 process()方法
  7. 不过 select(1,2,3,4,5) 这种可以实现效果
public class MySum extends GenericUDTF {
    private int sum = 0;
    private ArrayList outRow = new ArrayList<>();

    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        // 1.判断参数个数
        List structFieldRefs = argOIs.getAllStructFieldRefs();
        if (structFieldRefs.size() < 1){
            throw new UDFArgumentLengthException("参数个数至少为一个");
        }
        // 2.判断参数类型 这里Category类没有int类型
        for (int i = 0; i < structFieldRefs.size(); i++) {
            StructField structField = structFieldRefs.get(i);
            if (structField.getFieldObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE){
                throw new UDFArgumentTypeException(i,"参数类型不匹配");
            }
        }
        ArrayList fieldNames = new ArrayList<>();
        ArrayList fieldOIs = new ArrayList();

        fieldNames.add("sum");
        // int 型   这个应该是 输出时 是 int
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
    }

    @Override
    public void process(Object[] args) throws HiveException {
        for (Object arg : args) {
            sum += Integer.parseInt(arg.toString());
        }
        // 因为只有一行数据 所以也没必要情况 list
        outRow.add(sum);
        forward(outRow);



    }

    @Override
    public void close() throws HiveException {

    }
}
2.UDTF

1.多进多出

public class MySplitWords extends GenericUDTF {
    
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        // 1.判断参数个数
        List structFieldRefs = argOIs.getAllStructFieldRefs();
        if (structFieldRefs.size() != 2){
            throw new UDFArgumentLengthException("参数个数自能是两个 (字符串,切割字符)");
        }
        // 2.判断参数类型
        for (int i = 0; i < structFieldRefs.size(); i++) {
            StructField structField = structFieldRefs.get(i);
            if (structField.getFieldObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE){
                throw new UDFArgumentTypeException(i,"参数类型不匹配");
            }
        }
        // 3.返回对象
        // 因为是多出 可能是多列
        // 所以需要用list集合来封装
        // fileNames 列名
        // fieldOIS 列类型
        ArrayList fieldNames = new ArrayList<>();
        ArrayList fieldOIs = new ArrayList();

        // 这里是单列多行所以不用循环
        fieldNames.add("word");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
    }

    
    @Override
    public void process(Object[] args) throws HiveException {
        // 获取参数
        String words = args[0].toString();
        String split = args[1].toString();
        
        // 处理
        String[] splitWords = words.split(split);
        // 写出
        for (String word : splitWords) {
            // 因为forward()方法要求用数组写出
            Object[] obj = new Object[1];
            obj[0] = word;
            forward(obj);
        }
        
    }

    
    @Override
    public void close() throws HiveException {

    }
}

对比源码:(只截取了必要的方法)

public class GenericUDTFGetSplits extends GenericUDTF {
  private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFGetSplits.class);

  protected transient StringObjectInspector stringOI;
  protected transient IntObjectInspector intOI;
  protected transient JobConf jc;
  private boolean orderByQuery;
  private boolean forceSingleSplit;
  private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
  private DataOutput dos = new DataOutputStream(bos);

  @Override
  public StructObjectInspector initialize(ObjectInspector[] arguments)
      throws UDFArgumentException {

    LOG.debug("initializing GenericUDFGetSplits");

    if (SessionState.get() == null || SessionState.get().getConf() == null) {
      throw new IllegalStateException("Cannot run get splits outside HS2");
    }

    LOG.debug("Initialized conf, jc and metastore connection");

    if (arguments.length != 2) {
      throw new UDFArgumentLengthException(
          "The function GET_SPLITS accepts 2 arguments.");
    } else if (!(arguments[0] instanceof StringObjectInspector)) {
      LOG.error("Got " + arguments[0].getTypeName() + " instead of string.");
      throw new UDFArgumentTypeException(0, """
          + "string" is expected at function GET_SPLITS, " + "but ""
          + arguments[0].getTypeName() + "" is found");
    } else if (!(arguments[1] instanceof IntObjectInspector)) {
      LOG.error("Got " + arguments[1].getTypeName() + " instead of int.");
      throw new UDFArgumentTypeException(1, """
          + "int" is expected at function GET_SPLITS, " + "but ""
          + arguments[1].getTypeName() + "" is found");
    }

    stringOI = (StringObjectInspector) arguments[0];
    intOI = (IntObjectInspector) arguments[1];

    List names = Arrays.asList("split");
    List fieldOIs = Arrays
        . asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
    StructObjectInspector outputOI = ObjectInspectorFactory
        .getStandardStructObjectInspector(names, fieldOIs);

    LOG.debug("done initializing GenericUDFGetSplits");
    return outputOI;
  }
  @Override
  public void process(Object[] arguments) throws HiveException {

    String query = stringOI.getPrimitiveJavaObject(arguments[0]);
    int num = intOI.get(arguments[1]);

    // Generate applicationId for the LLAP splits
    LlapCoordinator coordinator = LlapCoordinator.getInstance();
    if (coordinator == null) {
      throw new HiveException("LLAP coordinator is not initialized; must be running in HS2 with "
          + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled");
    }
    ApplicationId applicationId = coordinator.createExtClientAppId();
    LOG.info("Generated appID {} for LLAP splits", applicationId.toString());

    PlanFragment fragment = createPlanFragment(query, num, applicationId);
    TezWork tezWork = fragment.work;
    Schema schema = fragment.schema;

    boolean generateSingleSplit = forceSingleSplit && orderByQuery;
    try {
      InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId, generateSingleSplit);
      LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, query,
        orderByQuery, forceSingleSplit);
      if (generateSingleSplit && splits.length > 1) {
        throw new HiveException("Got more than one split (Got: " + splits.length + ") for order by query: " + query);
      }
      for (InputSplit s : splits) {
        Object[] os = new Object[1];
        bos.reset();
        s.write(dos);
        byte[] frozen = bos.toByteArray();
        os[0] = frozen;
        forward(os);
      }
    } catch (Exception e) {
      throw new HiveException(e);
    }
  }
      @Override
    public void close() throws IOException {
      try {
        LOG.info("DriverCleanup for LLAP splits: {}", applicationId);
        driver.releaseLocksAndCommitOrRollback(true);
        driver.close();
        driver.destroy();
        txnManager.closeTxnManager();
      } catch (Exception err) {
        LOG.error("Error closing driver resources", err);
        throw new IOException(err);
      }
    }
3.UDTF升级版

1.这个函数实现了多行多列

public class MySplitWordsUP extends GenericUDTF {

    private ArrayList outs = new ArrayList<>(); // 用于存储一行数据
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        // 1.判断参数个数
        List fieldRefs = argOIs.getAllStructFieldRefs();
        if (fieldRefs.size() != 3){
            throw new UDFArgumentLengthException("参数个数为3 (string,'分割符','分隔符')");
        }
        // 2.判断参数类型
        for (int i = 0; i < fieldRefs.size(); i++) {
            StructField structField = fieldRefs.get(i);
            if (structField.getFieldObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE){
                throw new UDFArgumentTypeException(i,"参数类型不匹配,只接受基本数据类型");
            }
        }
        // 3.返回对象
        // structFiledNames 指定列名
        // structFieldObjectInspector 指定列类型
        List names = new ArrayList<>();
        List type = new ArrayList<>();

        // 这个函数有两个列
        names.add("a1");
        names.add("a2");
        type.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        type.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        StandardStructObjectInspector standardStructObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(names, type);

        return standardStructObjectInspector;
    }

    
    @Override
    public void process(Object[] args) throws HiveException {
        // 获取参数
        String str = args[0].toString();
        String split1 = args[1].toString(); // 切成多行
        String split2 = args[2].toString(); // 切成多列

        // 处理参数
        String[] rows = str.split(split1);
        for (String row : rows) {
            String[] cols = row.split(split2);
            outs.clear();
            for (String col : cols) {
                outs.add(col);
            }
            forward(outs); // 输出一行数据

        }

    }

    @Override
    public void close() throws HiveException {

    }
}

如上

4.创建临时函数
  1. 进入hive客户端
  2. add jar jar包路径;
  3. create temporary function my_len as “主类名引用路径”;

注:注:这种情况是临时的重新连接就是消失

5.创建永久函数
  1. 先把 jar 放到 hive 的lib目录
    cp jar包路劲 lib目录路劲
  2. 启动hive
    create function func_name as “主类引用路劲”;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5677954.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存