Flink Table & SQL 行列转换

Flink Table & SQL 行列转换,第1张

Flink Table & SQL 行列转换

总结Flink Table & SQL 中,如何将一个数组(Array)或对象(Map)展开

测试数据

用datagen连接器构造测试数据。DDL如下:

CREATE TABLE source_table (
    userId INT,
    eventTime as '2021-10-01 08:08:08',
    eventType as 'click',
    productId INT,
    -- 数组(Array)类型
    productImages as ARRAY['image1','image2'],
    -- 对象(Map)类型
    pageInfo as MAP['pageId','1','pageName','yyds']
) WITH (
    'connector' = 'datagen',
    'number-of-rows' = '2',
    'fields.userId.kind' = 'random',
    'fields.userId.min' = '2',
    'fields.userId.max' = '2',
    'fields.productId.kind' = 'sequence',
    'fields.productId.start' = '1',
    'fields.productId.end' = '2'
);

测试数据如下:

userId      eventTime      			eventType      productId      productImages     	 	pageInfo
  2       2021-10-01 08:08:08     click           1          [image1, image2] 	{pageId=1, pageName=yyds}
  2       2021-10-01 08:08:08     click           2          [image1, image2] 	{pageId=1, pageName=yyds}   
展开Array 1. 单列多行

将数组展开为单列多行。

基于UNNEST
SELECt userId,eventTime,eventType,productId,productImage 
FROM source_table, UNNEST(productImages) as t(productImage);
基于UDTF
package com.bigdata.flink.sql.udtf;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;

@FunctionHint(
        input = @DataTypeHint("ARRAY"),
        output = @DataTypeHint("String"))
public class ExpandArrayOneColumnMultRowUDTF extends TableFunction {
    public void eval(String... productImages) {
        for (String productImage : productImages) {
            collect(productImage);
        }
    }
}

// SQL
SELECt userId, eventTime, eventType, productId, productImage FROM source_table
  , LATERAL TABLE (ExpandArrayOneColumnMultRowUDTF(`productImages`)) AS t(productImage);
结果示例
userId      eventTime          	eventType      productId      productImage
2       2021-10-01 08:08:08       click           1              image1
2       2021-10-01 08:08:08       click           1              image2
2       2021-10-01 08:08:08       click           2              image1
2       2021-10-01 08:08:08       click           2              image2
展开Map 1. 多列单行

将Map展开为多列单行。

基于UNNEST
// 默认,用这种方式即可展开Map
SELECt userId,eventTime,eventType,productId,pageInfo['pageId'] as pageId,pageInfo['pageName'] as pageName 
FROM source_table;
基于UDTF
package com.bigdata.flink.sql.udtf;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.Map;

@FunctionHint(
        input = @DataTypeHint("MAP"),
        output = @DataTypeHint("ROW"))
public class ExpandMapMultColumnOneRowUDTF extends TableFunction {
    public void eval(Map pageInfo) {
        // 原来的一行,对应输出一行
        collect(Row.of(pageInfo.get("pageId"), pageInfo.get("pageName")));
    }
}

// SQL使用
SELECt userId,eventTime,eventType,productId,pageId,pageName FROM source_table
  , LATERAL TABLE (ExpandMapMultColumnOneRowUDTF(`pageInfo`)) AS t(pageId,pageName)
结果示例
userId      eventTime      			eventType      productId      pageId      pageName
2       2021-10-01 08:08:08       click           1             1            yyds
2       2021-10-01 08:08:08       click           2             1            yyds
2. 多列多行

将Map展开为多列多行。

基于UNNEST
SELECt userId,eventTime,eventType,productId,mapKey,mapValue 
FROM source_table, UNNEST(pageInfo) as t(mapKey,mapValue);
基于UDTF
package com.bigdata.flink.sql.udtf;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.Map;

@FunctionHint(
        input = @DataTypeHint("MAP"),
        output = @DataTypeHint("ROW"))
public class ExpandMapMultColumnMultRowUDTF extends TableFunction {
    public void eval(Map pageInfo) {
        for (Map.Entry entry : pageInfo.entrySet()) {
            // 原来的一行,每个Key都输出一行
            collect(Row.of(entry.getKey(), entry.getValue()));
        }
    }
}

// SQL使用
SELECt userId,eventTime,eventType,productId,mapKey,mapValue FROM source_table
  , LATERAL TABLE (ExpandMapMultColumnMultRowUDTF(`pageInfo`)) AS t(mapKey,mapValue)
结果示例
userId      eventTime      			eventType      productId      	mapKey      	  mapValue
2       2021-10-01 08:08:08       click           1              pageId            1
2       2021-10-01 08:08:08       click           1              pageName         yyds
2       2021-10-01 08:08:08       click           2              pageId            1
2       2021-10-01 08:08:08       click           2              pageName         yyds

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5665587.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存