1.需要的依赖
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; 、
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
使用JDBCAppendTableSink方法创建mysqlsink写入,数据是追加到数据库的
//创建流处理执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 environment.setParallelism(2); //加载自定义数据形成数据流 DataStreamSourcepersonSource = environment.addSource(new DataDB()); //转换为Row类型才能写入mysql DataStream personRow = personSource.map(new MapFunction
() { @Override public Row map(Person person) throws Exception { Row row = new Row(4); row.setField(0, person.getPid()); row.setField(1, person.getPname()); row.setField(2, person.getPsex()); row.setField(3, person.getPage()); return row; } }); //定义调用JDBCAppendTableSink创建mysqlSink JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder() //驱动程序 .setDrivername("com.mysql.cj.jdbc.Driver") //连接URL地址 .setDBUrl("jdbc:mysql://localhost:3306/school?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC") //用户名 .setUsername("root") //密码 .setPassword("1234") //写入类型 .setParameterTypes(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO) //写入语句 .setQuery("insert into person values (?,?,?,?);") //一次批量写入条数 .setBatchSize(10) .build(); //调用mysqlsink写入mysql数据 tableSink.emitDataStream(personRow); //执行程序 environment.execute();
如果出现以下报错,
The server time zone value ‘�й���ʱ��’ is unrecognized or represents
more than one time zone. You must configure either the server or JDBC
driver (via the serverTimezone configuration property) to use a more
specifc time zone value if you want to utilize time zone support.
需要在url地址里面添加如下代码
&serverTimezone=UTC
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)