总结Flink Table & SQL 中,如何将一个数组(Array)或对象(Map)展开。
测试数据用datagen连接器构造测试数据。DDL如下:
CREATE TABLE source_table ( userId INT, eventTime as '2021-10-01 08:08:08', eventType as 'click', productId INT, -- 数组(Array)类型 productImages as ARRAY['image1','image2'], -- 对象(Map)类型 pageInfo as MAP['pageId','1','pageName','yyds'] ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '2', 'fields.userId.kind' = 'random', 'fields.userId.min' = '2', 'fields.userId.max' = '2', 'fields.productId.kind' = 'sequence', 'fields.productId.start' = '1', 'fields.productId.end' = '2' );
测试数据如下:
userId eventTime eventType productId productImages pageInfo 2 2021-10-01 08:08:08 click 1 [image1, image2] {pageId=1, pageName=yyds} 2 2021-10-01 08:08:08 click 2 [image1, image2] {pageId=1, pageName=yyds}展开Array 1. 单列多行
将数组展开为单列多行。
基于UNNESTSELECt userId,eventTime,eventType,productId,productImage FROM source_table, UNNEST(productImages) as t(productImage);基于UDTF
package com.bigdata.flink.sql.udtf; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.TableFunction; @FunctionHint( input = @DataTypeHint("ARRAY结果示例"), output = @DataTypeHint("String")) public class ExpandArrayOneColumnMultRowUDTF extends TableFunction { public void eval(String... productImages) { for (String productImage : productImages) { collect(productImage); } } } // SQL SELECt userId, eventTime, eventType, productId, productImage FROM source_table , LATERAL TABLE (ExpandArrayOneColumnMultRowUDTF(`productImages`)) AS t(productImage);
userId eventTime eventType productId productImage 2 2021-10-01 08:08:08 click 1 image1 2 2021-10-01 08:08:08 click 1 image2 2 2021-10-01 08:08:08 click 2 image1 2 2021-10-01 08:08:08 click 2 image2展开Map 1. 多列单行
将Map展开为多列单行。
基于UNNEST// 默认,用这种方式即可展开Map SELECt userId,eventTime,eventType,productId,pageInfo['pageId'] as pageId,pageInfo['pageName'] as pageName FROM source_table;基于UDTF
package com.bigdata.flink.sql.udtf; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import java.util.Map; @FunctionHint( input = @DataTypeHint("MAP结果示例"), output = @DataTypeHint("ROW ")) public class ExpandMapMultColumnOneRowUDTF extends TableFunction { public void eval(Map pageInfo) { // 原来的一行,对应输出一行 collect(Row.of(pageInfo.get("pageId"), pageInfo.get("pageName"))); } } // SQL使用 SELECt userId,eventTime,eventType,productId,pageId,pageName FROM source_table , LATERAL TABLE (ExpandMapMultColumnOneRowUDTF(`pageInfo`)) AS t(pageId,pageName)
userId eventTime eventType productId pageId pageName 2 2021-10-01 08:08:08 click 1 1 yyds 2 2021-10-01 08:08:08 click 2 1 yyds2. 多列多行
将Map展开为多列多行。
基于UNNESTSELECt userId,eventTime,eventType,productId,mapKey,mapValue FROM source_table, UNNEST(pageInfo) as t(mapKey,mapValue);基于UDTF
package com.bigdata.flink.sql.udtf; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import java.util.Map; @FunctionHint( input = @DataTypeHint("MAP结果示例"), output = @DataTypeHint("ROW ")) public class ExpandMapMultColumnMultRowUDTF extends TableFunction { public void eval(Map pageInfo) { for (Map.Entry entry : pageInfo.entrySet()) { // 原来的一行,每个Key都输出一行 collect(Row.of(entry.getKey(), entry.getValue())); } } } // SQL使用 SELECt userId,eventTime,eventType,productId,mapKey,mapValue FROM source_table , LATERAL TABLE (ExpandMapMultColumnMultRowUDTF(`pageInfo`)) AS t(mapKey,mapValue)
userId eventTime eventType productId mapKey mapValue 2 2021-10-01 08:08:08 click 1 pageId 1 2 2021-10-01 08:08:08 click 1 pageName yyds 2 2021-10-01 08:08:08 click 2 pageId 1 2 2021-10-01 08:08:08 click 2 pageName yyds
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)