JAVA实现Flink Table读取文件内容,通过SQL进行查询,在将结果写入到文件
package org.fenghua.example.table.connector; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class FileConnector { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.connect( new FileSystem() .path("D:\DevTool\IdeaProjects\HwFlinkKafka\src\main\resources\Sensor.txt") ).withFormat( new Csv() .field("id", Types.STRING) .field("name", Types.STRING) .field("idCard", Types.STRING) .field("addressCode", Types.STRING) .field("age", Types.INT) .fieldDelimiter(",") .lineDelimiter("n") .ignoreParseErrors() ).withSchema( new Schema() .field("id", Types.STRING) .field("name", Types.STRING) .field("idCard", Types.STRING) .field("addressCode", Types.STRING) .field("age", Types.INT) ) .inAppendMode() .registerTableSource("sensor"); Table table = tEnv.sqlQuery("select id,name,idCard,addressCode,age from sensor"); tEnv.toAppendStream(table, Row.class).print(); tEnv.connect( new FileSystem() .path("D:\DevTool\IdeaProjects\HwFlinkKafka\src\main\resources\SensorOut.txt") ).withFormat( new Csv() .field("id", Types.STRING) .field("name", Types.STRING) .field("idCard", Types.STRING) .field("addressCode", Types.STRING) .field("age", Types.INT) .fieldDelimiter(",") ).withSchema( new Schema() .field("id", Types.STRING) .field("name", Types.STRING) .field("idCard", Types.STRING) .field("addressCode", Types.STRING) .field("age", Types.INT) ) .inAppendMode() .registerTableSink("sensorOut"); env.setParallelism(1); table.insertInto("sensorOut"); env.execute(" test "); } }文件内容样例
11eeebfe94eaf3088285cba7feb67b20,刘谦娇,130683200709203040,520526000000,12
代码仓库https://gitee.com/xuguoxi/FlinkLearn.git
仓库里面有其他的样例代码
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)