- 0.编程思想得到升华
- 0. hive中用到的对象
- 1.UDF
- 2.UDAF
- 2.UDTF
- 3.UDTF升级版
- 4.创建临时函数
- 5.创建永久函数
0. hive中用到的对象
- 当写这个框架的自定义类时,大都要继承框架的某些类
- 框架中的对象基本都是再次包装过的,基本不能之间 new 出来 ,可以考虑在对象名后面加上factory(工厂)
- 当学习新的知识时,看源码是一个非常好的方式,虽然现在还是看得有点懵逼,不过可以先理解大致流程.
- 实现类时,编写简单或自己用的函数,自需要写官方规定的方法即可。一般都是抽象方法会有提示.
1.UDF
- Cloneable: 可克隆的 是一个接口
- ObjectInspector: obj对象鉴别器 是一个接口 继承与 Cloneable
- ObjectInspectorFactory: obj对象鉴别器工厂 是一个final class 最终类 用于生产对象
- PrimitiveObjectInspectorFactory: 基本类型 obj对象鉴别器工厂 是一个final class 最终类 与 java类型对接
- StructObjectInspector : 结构体obj对象鉴别器 是一个实现类 实现了 ObjectInspector 接口 是一个抽象类 干啥的没解析注释看不懂!!!
一进一出
package review.myudf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; public class GetLengthStr extends GenericUDF { @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { // 1.判断参数个数 if (arguments.length != 1 || arguments == null){ // 抛出异常 throw new UDFArgumentLengthException("参数数量只能是一个"); } // 2.判断参数类型 要求基本数据类型 if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE){ throw new UDFArgumentTypeException(0, "参数类型异常"); } // 3.返回该对象数据类型 return PrimitiveObjectInspectorFactory.javaIntObjectInspector; } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { // 1.先获取参数 Object str = arguments[0].get(); // 2.考虑null值的特殊型 因为 hive中 有null 不是这个"null" 这是字符串 if (str == null){ return 0; } // 3.获取长度 int length = str.toString().length(); // 4.返回结果 return length; } @Override public String getDisplayString(String[] children) { return ""; } }
对比源码:
public class GenericUDFAbs extends GenericUDF { private transient PrimitiveCategory inputType; private final DoubleWritable resultDouble = new DoubleWritable(); private final LongWritable resultLong = new LongWritable(); private final IntWritable resultInt = new IntWritable(); private final HiveDecimalWritable resultDecimal = new HiveDecimalWritable(); private transient PrimitiveObjectInspector argumentOI; private transient Converter inputConverter; @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { if (arguments.length != 1) { throw new UDFArgumentLengthException( "ABS() requires 1 argument, got " + arguments.length); } if (arguments[0].getCategory() != Category.PRIMITIVE) { throw new UDFArgumentException( "ABS only takes primitive types, got " + arguments[0].getTypeName()); } argumentOI = (PrimitiveObjectInspector) arguments[0]; inputType = argumentOI.getPrimitiveCategory(); ObjectInspector outputOI = null; switch (inputType) { case SHORT: case BYTE: case INT: inputConverter = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableIntObjectInspector); outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector; break; case LONG: inputConverter = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableLongObjectInspector); outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector; break; case FLOAT: case STRING: case DOUBLE: inputConverter = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; break; case DECIMAL: outputOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( ((PrimitiveObjectInspector) arguments[0]).getTypeInfo()); inputConverter = ObjectInspectorConverters.getConverter(arguments[0], outputOI); break; default: throw new UDFArgumentException( "ABS only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType); } return outputOI; } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { Object valObject = arguments[0].get(); if (valObject == null) { return null; } switch (inputType) { case SHORT: case BYTE: case INT: valObject = inputConverter.convert(valObject); resultInt.set(Math.abs(((IntWritable) valObject).get())); return resultInt; case LONG: valObject = inputConverter.convert(valObject); resultLong.set(Math.abs(((LongWritable) valObject).get())); return resultLong; case FLOAT: case STRING: case DOUBLE: valObject = inputConverter.convert(valObject); if (valObject == null) { return null; } resultDouble.set(Math.abs(((DoubleWritable) valObject).get())); return resultDouble; case DECIMAL: HiveDecimalObjectInspector decimalOI = (HiveDecimalObjectInspector) argumentOI; HiveDecimalWritable val = decimalOI.getPrimitiveWritableObject(valObject); if (val != null) { resultDecimal.set(val); resultDecimal.mutateAbs(); val = resultDecimal; } return val; default: throw new UDFArgumentException( "ABS only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType); } } @Override public String getDisplayString(String[] children) { return getStandardDisplayString("abs", children); } }2.UDAF
- 多进一出
- 不知咋的,实现该类时,显示已经过时就没有深入了
- 老师提都没提到
- 找了几个继承类都过时了 所以用UDTF 实现了一下
- 但对字段会显示多行
- 应该是因为每行数据都会 调用一次 process()方法
- 不过 select(1,2,3,4,5) 这种可以实现效果
public class MySum extends GenericUDTF { private int sum = 0; private ArrayList2.UDTFoutRow = new ArrayList<>(); @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { // 1.判断参数个数 List extends StructField> structFieldRefs = argOIs.getAllStructFieldRefs(); if (structFieldRefs.size() < 1){ throw new UDFArgumentLengthException("参数个数至少为一个"); } // 2.判断参数类型 这里Category类没有int类型 for (int i = 0; i < structFieldRefs.size(); i++) { StructField structField = structFieldRefs.get(i); if (structField.getFieldObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE){ throw new UDFArgumentTypeException(i,"参数类型不匹配"); } } ArrayList fieldNames = new ArrayList<>(); ArrayList fieldOIs = new ArrayList (); fieldNames.add("sum"); // int 型 这个应该是 输出时 是 int fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs); } @Override public void process(Object[] args) throws HiveException { for (Object arg : args) { sum += Integer.parseInt(arg.toString()); } // 因为只有一行数据 所以也没必要情况 list outRow.add(sum); forward(outRow); } @Override public void close() throws HiveException { } }
1.多进多出
public class MySplitWords extends GenericUDTF { @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { // 1.判断参数个数 List extends StructField> structFieldRefs = argOIs.getAllStructFieldRefs(); if (structFieldRefs.size() != 2){ throw new UDFArgumentLengthException("参数个数自能是两个 (字符串,切割字符)"); } // 2.判断参数类型 for (int i = 0; i < structFieldRefs.size(); i++) { StructField structField = structFieldRefs.get(i); if (structField.getFieldObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE){ throw new UDFArgumentTypeException(i,"参数类型不匹配"); } } // 3.返回对象 // 因为是多出 可能是多列 // 所以需要用list集合来封装 // fileNames 列名 // fieldOIS 列类型 ArrayListfieldNames = new ArrayList<>(); ArrayList fieldOIs = new ArrayList (); // 这里是单列多行所以不用循环 fieldNames.add("word"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs); } @Override public void process(Object[] args) throws HiveException { // 获取参数 String words = args[0].toString(); String split = args[1].toString(); // 处理 String[] splitWords = words.split(split); // 写出 for (String word : splitWords) { // 因为forward()方法要求用数组写出 Object[] obj = new Object[1]; obj[0] = word; forward(obj); } } @Override public void close() throws HiveException { } }
对比源码:(只截取了必要的方法)
public class GenericUDTFGetSplits extends GenericUDTF { private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFGetSplits.class); protected transient StringObjectInspector stringOI; protected transient IntObjectInspector intOI; protected transient JobConf jc; private boolean orderByQuery; private boolean forceSingleSplit; private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); private DataOutput dos = new DataOutputStream(bos); @Override public StructObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { LOG.debug("initializing GenericUDFGetSplits"); if (SessionState.get() == null || SessionState.get().getConf() == null) { throw new IllegalStateException("Cannot run get splits outside HS2"); } LOG.debug("Initialized conf, jc and metastore connection"); if (arguments.length != 2) { throw new UDFArgumentLengthException( "The function GET_SPLITS accepts 2 arguments."); } else if (!(arguments[0] instanceof StringObjectInspector)) { LOG.error("Got " + arguments[0].getTypeName() + " instead of string."); throw new UDFArgumentTypeException(0, """ + "string" is expected at function GET_SPLITS, " + "but "" + arguments[0].getTypeName() + "" is found"); } else if (!(arguments[1] instanceof IntObjectInspector)) { LOG.error("Got " + arguments[1].getTypeName() + " instead of int."); throw new UDFArgumentTypeException(1, """ + "int" is expected at function GET_SPLITS, " + "but "" + arguments[1].getTypeName() + "" is found"); } stringOI = (StringObjectInspector) arguments[0]; intOI = (IntObjectInspector) arguments[1]; List3.UDTF升级版names = Arrays.asList("split"); List fieldOIs = Arrays . asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); StructObjectInspector outputOI = ObjectInspectorFactory .getStandardStructObjectInspector(names, fieldOIs); LOG.debug("done initializing GenericUDFGetSplits"); return outputOI; } @Override public void process(Object[] arguments) throws HiveException { String query = stringOI.getPrimitiveJavaObject(arguments[0]); int num = intOI.get(arguments[1]); // Generate applicationId for the LLAP splits LlapCoordinator coordinator = LlapCoordinator.getInstance(); if (coordinator == null) { throw new HiveException("LLAP coordinator is not initialized; must be running in HS2 with " + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled"); } ApplicationId applicationId = coordinator.createExtClientAppId(); LOG.info("Generated appID {} for LLAP splits", applicationId.toString()); PlanFragment fragment = createPlanFragment(query, num, applicationId); TezWork tezWork = fragment.work; Schema schema = fragment.schema; boolean generateSingleSplit = forceSingleSplit && orderByQuery; try { InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId, generateSingleSplit); LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, query, orderByQuery, forceSingleSplit); if (generateSingleSplit && splits.length > 1) { throw new HiveException("Got more than one split (Got: " + splits.length + ") for order by query: " + query); } for (InputSplit s : splits) { Object[] os = new Object[1]; bos.reset(); s.write(dos); byte[] frozen = bos.toByteArray(); os[0] = frozen; forward(os); } } catch (Exception e) { throw new HiveException(e); } } @Override public void close() throws IOException { try { LOG.info("DriverCleanup for LLAP splits: {}", applicationId); driver.releaseLocksAndCommitOrRollback(true); driver.close(); driver.destroy(); txnManager.closeTxnManager(); } catch (Exception err) { LOG.error("Error closing driver resources", err); throw new IOException(err); } }
1.这个函数实现了多行多列
public class MySplitWordsUP extends GenericUDTF { private ArrayListouts = new ArrayList<>(); // 用于存储一行数据 @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { // 1.判断参数个数 List extends StructField> fieldRefs = argOIs.getAllStructFieldRefs(); if (fieldRefs.size() != 3){ throw new UDFArgumentLengthException("参数个数为3 (string,'分割符','分隔符')"); } // 2.判断参数类型 for (int i = 0; i < fieldRefs.size(); i++) { StructField structField = fieldRefs.get(i); if (structField.getFieldObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE){ throw new UDFArgumentTypeException(i,"参数类型不匹配,只接受基本数据类型"); } } // 3.返回对象 // structFiledNames 指定列名 // structFieldObjectInspector 指定列类型 List names = new ArrayList<>(); List type = new ArrayList<>(); // 这个函数有两个列 names.add("a1"); names.add("a2"); type.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); type.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); StandardStructObjectInspector standardStructObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(names, type); return standardStructObjectInspector; } @Override public void process(Object[] args) throws HiveException { // 获取参数 String str = args[0].toString(); String split1 = args[1].toString(); // 切成多行 String split2 = args[2].toString(); // 切成多列 // 处理参数 String[] rows = str.split(split1); for (String row : rows) { String[] cols = row.split(split2); outs.clear(); for (String col : cols) { outs.add(col); } forward(outs); // 输出一行数据 } } @Override public void close() throws HiveException { } }
如上
4.创建临时函数
- 进入hive客户端
- add jar jar包路径;
- create temporary function my_len as “主类名引用路径”;
注:注:这种情况是临时的重新连接就是消失
5.创建永久函数
- 先把 jar 放到 hive 的lib目录
cp jar包路劲 lib目录路劲- 启动hive
create function func_name as “主类引用路劲”;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)