UDF和GenericUDF区别

UDF和GenericUDF区别,第1张

UDF和GenericUDF区别

目录

UDF

GenericUDF


Java开发转了大数据,竟然被拉去做了非结构的ETL抽取,真的是比做后端伤脑筋,没有可借鉴的框架,只能根据数据进行抽取,第一份大数据实习,写完抽取代码后,需要写成UDF和UDTF进行使用。

简单意思:

UDF: 一对一,输入一笔数据输出一笔数据

UDTF:一对多,输入一笔数据输出多笔数据 (接受0个或多个输入然后产生多列或多行输出。)

UDAF:多对一,输入多笔数据输出一笔数据

记录一下UDF和GenericUDF的区别:

UDF属于基础的UDF:

简单的udf实现很简单,只需要继承udf,然后实现evaluate()方法就行了。evaluate()允许重载。

UDF

对于自定义函数现在需要进行总结一下:

pom文件:主要为打包文件:



    4.0.0

    org.example
    UDF
    1.0-SNAPSHOT
    jar


    
        
            org.apache.spark
            spark-core_2.11
            2.3.0
            provided
        
        
            org.apache.spark
            spark-hive_2.11
            2.3.0
            provided
        
    

    
        src
        compile
        
            
                maven-compiler-plugin
                3.5.1
                
                    1.8
                    1.8
                    UTF-8
                
            

            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                

                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                

            
        
    

package com.demo;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;


@Description(
        name = "wordUDF",
        value = "_FUNC_(String word) - Returns result",
        extended ="Example:\n  > SELECt _FUNC_(\'你好\') FROM src LIMIT 1;\n  \'2022新年快乐:你好\'""
)

public class WordSingleUDF extends UDF {

    public String evaluate(String args) {
        return "2022新年快乐:"+args;
    }

    public static void main(String[] args) {
        System.out.println(new WordSingleUDF().evaluate("你好"));
    }

}

进行打包上传:

1. add jar /home/zhaohai.li/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar

2. create temporary function udf_word as 'com.demo.WordSingleUDF';

3. select udf_word('hello') 

显示:

2022新年快乐:hello

GenericUDF

这个函数需要进行实现多个方法

 GenericUDF的有点 可以处理复杂的数据类型,所以它能处理更为复杂的数据类型场景。

 

 在进行继承GenericUDF 时需要进行实现三个方法:

必须实现的函数:

ObjectInspector initialize(ObjectInspector[] arguments)  //初始化 *** 作,在函数进行初始化的时候会执行,其他时间不执行
Object evaluate(DeferredObject[] arguments) //进行业务计算逻辑,处理具体的数据
String getDisplayString(String[] children)//进行函数描述结果的显示,只有当函数执行一场才会显示

其余的函数:

configure(MapredContext context) //在函数初始化之前,进行设置mapContext

package main.java.com.demo;

import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.Date;

public class WordUDF extends GenericUDF {

    private static int mapTasks = 0;
    private static String init = "";
    private transient ArrayList ret = new ArrayList();


    @Override
    public void configure(MapredContext context) {
        System.out.println(new Date() + "configure mapredContext");
        if (null != context) {
            //从jobConf中获取map数
            mapTasks = context.getJobConf().getNumMapTasks();
        }
        System.out.println(new Date() + "######## mapTasks [" + mapTasks + "] ..");
    }

    
    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {

        System.out.println("1. init start:udfName" + this.getUdfName() + new Date());

        //初始化文件系统,可以在这里初始化读取文件等
        init = "init";

        //定义函数的返回类型为java的List
        ObjectInspector returnOI = PrimitiveObjectInspectorFactory
                .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);

        return ObjectInspectorFactory.getStandardListObjectInspector(returnOI);
    }

    
    @Override
    public Object evaluate(DeferredObject[] args) throws HiveException {
        System.out.println("2. deal with the data process " + new Date());
        ret.clear();
        if(args.length < 1) return ret;
        //获取第一个参数
        String str = args[0].get().toString();
        String[] s = str.split(",",-1);
        for(String word : s) {
            ret.add(word);
        }
        return ret;
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "Usage: Lxw1234GenericUDF(String str)";
    }

    public static void main(String[] args) {

    }
}

UDTF
package main.java.com.demo;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

public class MyUDTF extends GenericUDTF {
    private ArrayList outList = new ArrayList<>();

    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {

        //1.定义输出数据的列名和类型
        List fieldNames = new ArrayList<>();
        List fieldOIs = new ArrayList<>();

        //2.添加输出数据的列名和类型
        fieldNames.add("lineToWord");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    @Override
    public void process(Object[] args) throws HiveException {

        //1.获取原始数据
        String arg = args[0].toString();

        //2.获取数据传入的第二个参数,此处为分隔符
        String splitKey = args[1].toString();

        //3.将原始数据按照传入的分隔符进行切分
        String[] fields = arg.split(splitKey);

        //4.遍历切分后的结果,并写出
        for (String field : fields) {

            //集合为复用的,首先清空集合
            outList.clear();

            //将每一个单词添加至集合
            outList.add(field);

            //将集合内容写出
            forward(outList);
        }
    }

    @Override
    public void close() throws HiveException {

    }

}

 

UDAF

 UDAF已经失效 需要去实现 implement 

org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2 或者 extend org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver

能看到实际上

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver也是实现的org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2

 

那我们直接进行继承父类  AbstractGenericUDAFResolver (自己可做选择)

要先了解UDAF的四个阶段,定义在GenericUDAFevaluator的Mode枚举中:


COMPLETE:如果mapreduce只有map而没有reduce,就会进入这个阶段;
PARTIAL1:正常mapreduce的map阶段;
PARTIAL2:正常mapreduce的combiner阶段;
FINAL:正常mapreduce的reduce阶段;
 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701746.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存