目录
UDF
GenericUDF
Java开发转了大数据,竟然被拉去做了非结构的ETL抽取,真的是比做后端伤脑筋,没有可借鉴的框架,只能根据数据进行抽取,第一份大数据实习,写完抽取代码后,需要写成UDF和UDTF进行使用。
简单意思:
UDF: 一对一,输入一笔数据输出一笔数据
UDTF:一对多,输入一笔数据输出多笔数据 (接受0个或多个输入然后产生多列或多行输出。)
UDAF:多对一,输入多笔数据输出一笔数据
记录一下UDF和GenericUDF的区别:
UDFUDF属于基础的UDF:
简单的udf实现很简单,只需要继承udf,然后实现evaluate()方法就行了。evaluate()允许重载。
对于自定义函数现在需要进行总结一下:
pom文件:主要为打包文件:
4.0.0 org.example UDF1.0-SNAPSHOT jar org.apache.spark spark-core_2.112.3.0 provided org.apache.spark spark-hive_2.112.3.0 provided src compile maven-compiler-plugin 3.5.1 1.8 UTF-8 maven-assembly-plugin jar-with-dependencies make-assembly package single
package com.demo; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDF; @Description( name = "wordUDF", value = "_FUNC_(String word) - Returns result", extended ="Example:\n > SELECt _FUNC_(\'你好\') FROM src LIMIT 1;\n \'2022新年快乐:你好\'"" ) public class WordSingleUDF extends UDF { public String evaluate(String args) { return "2022新年快乐:"+args; } public static void main(String[] args) { System.out.println(new WordSingleUDF().evaluate("你好")); } }
进行打包上传:
GenericUDF1. add jar /home/zhaohai.li/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar
2. create temporary function udf_word as 'com.demo.WordSingleUDF';
3. select udf_word('hello')
显示:
2022新年快乐:hello
这个函数需要进行实现多个方法
GenericUDF的有点 可以处理复杂的数据类型,所以它能处理更为复杂的数据类型场景。
在进行继承GenericUDF 时需要进行实现三个方法:
必须实现的函数: ObjectInspector initialize(ObjectInspector[] arguments) //初始化 *** 作,在函数进行初始化的时候会执行,其他时间不执行Object evaluate(DeferredObject[] arguments) //进行业务计算逻辑,处理具体的数据String getDisplayString(String[] children)//进行函数描述结果的显示,只有当函数执行一场才会显示其余的函数:
configure(MapredContext context) //在函数初始化之前,进行设置mapContext
package main.java.com.demo; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.util.ArrayList; import java.util.Date; public class WordUDF extends GenericUDF { private static int mapTasks = 0; private static String init = ""; private transient ArrayList ret = new ArrayList(); @Override public void configure(MapredContext context) { System.out.println(new Date() + "configure mapredContext"); if (null != context) { //从jobConf中获取map数 mapTasks = context.getJobConf().getNumMapTasks(); } System.out.println(new Date() + "######## mapTasks [" + mapTasks + "] .."); } @Override public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException { System.out.println("1. init start:udfName" + this.getUdfName() + new Date()); //初始化文件系统,可以在这里初始化读取文件等 init = "init"; //定义函数的返回类型为java的List ObjectInspector returnOI = PrimitiveObjectInspectorFactory .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING); return ObjectInspectorFactory.getStandardListObjectInspector(returnOI); } @Override public Object evaluate(DeferredObject[] args) throws HiveException { System.out.println("2. deal with the data process " + new Date()); ret.clear(); if(args.length < 1) return ret; //获取第一个参数 String str = args[0].get().toString(); String[] s = str.split(",",-1); for(String word : s) { ret.add(word); } return ret; } @Override public String getDisplayString(String[] strings) { return "Usage: Lxw1234GenericUDF(String str)"; } public static void main(String[] args) { } }UDTF
package main.java.com.demo; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.util.ArrayList; import java.util.List; public class MyUDTF extends GenericUDTF { private ArrayListoutList = new ArrayList<>(); @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { //1.定义输出数据的列名和类型 List fieldNames = new ArrayList<>(); List fieldOIs = new ArrayList<>(); //2.添加输出数据的列名和类型 fieldNames.add("lineToWord"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] args) throws HiveException { //1.获取原始数据 String arg = args[0].toString(); //2.获取数据传入的第二个参数,此处为分隔符 String splitKey = args[1].toString(); //3.将原始数据按照传入的分隔符进行切分 String[] fields = arg.split(splitKey); //4.遍历切分后的结果,并写出 for (String field : fields) { //集合为复用的,首先清空集合 outList.clear(); //将每一个单词添加至集合 outList.add(field); //将集合内容写出 forward(outList); } } @Override public void close() throws HiveException { } }
UDAF
UDAF已经失效 需要去实现 implement
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2 或者 extend org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver能看到实际上
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver也是实现的org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2
那我们直接进行继承父类 AbstractGenericUDAFResolver (自己可做选择)
要先了解UDAF的四个阶段,定义在GenericUDAFevaluator的Mode枚举中:
COMPLETE:如果mapreduce只有map而没有reduce,就会进入这个阶段;
PARTIAL1:正常mapreduce的map阶段;
PARTIAL2:正常mapreduce的combiner阶段;
FINAL:正常mapreduce的reduce阶段;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)