JAVA实现Flink Table文件写入写出

JAVA实现Flink Table文件写入写出,第1张

JAVA实现Flink Table文件写入写出

JAVA实现Flink Table读取文件内容,通过SQL进行查询,在将结果写入到文件

package org.fenghua.example.table.connector;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;


public class FileConnector {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
        tEnv.connect(
                new FileSystem()
                        .path("D:\DevTool\IdeaProjects\HwFlinkKafka\src\main\resources\Sensor.txt")
        ).withFormat(
                new Csv()
                        .field("id", Types.STRING)
                        .field("name", Types.STRING)
                        .field("idCard", Types.STRING)
                        .field("addressCode", Types.STRING)
                        .field("age", Types.INT)
                        .fieldDelimiter(",")
                        .lineDelimiter("n")
                        .ignoreParseErrors()
        ).withSchema(
                new Schema()
                        .field("id", Types.STRING)
                        .field("name", Types.STRING)
                        .field("idCard", Types.STRING)
                        .field("addressCode", Types.STRING)
                        .field("age", Types.INT)
        )
                .inAppendMode()
                .registerTableSource("sensor");
        Table table = tEnv.sqlQuery("select id,name,idCard,addressCode,age from sensor");
        tEnv.toAppendStream(table, Row.class).print();

        tEnv.connect(
                new FileSystem()
                        .path("D:\DevTool\IdeaProjects\HwFlinkKafka\src\main\resources\SensorOut.txt")
        ).withFormat(
                new Csv()
                        .field("id", Types.STRING)
                        .field("name", Types.STRING)
                        .field("idCard", Types.STRING)
                        .field("addressCode", Types.STRING)
                        .field("age", Types.INT)
                        .fieldDelimiter(",")
        ).withSchema(
                new Schema()
                        .field("id", Types.STRING)
                        .field("name", Types.STRING)
                        .field("idCard", Types.STRING)
                        .field("addressCode", Types.STRING)
                        .field("age", Types.INT)
        )
                .inAppendMode()
                .registerTableSink("sensorOut");
        env.setParallelism(1);
        table.insertInto("sensorOut");
        env.execute(" test ");
    }
}
文件内容样例

11eeebfe94eaf3088285cba7feb67b20,刘谦娇,130683200709203040,520526000000,12

代码仓库https://gitee.com/xuguoxi/FlinkLearn.git

仓库里面有其他的样例代码 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575428.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存