获得gp的数据库连接池,使用连接池,不要使用单连接
package com.ysservice.dataStreamApi.utils; import com.alibaba.druid.pool.DruidDataSourceFactory; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.Properties; public class DruidConnectionPool { private transient static DataSource dataSource = null; private transient static Properties props = new Properties(); // 静态代码块 static { props.put("driverClassName", "org.postgresql.Driver"); props.put("url", SystemConstants.dataOutput_url_greenPlum_dwh); props.put("username", SystemConstants.dataOutput_username_greenPlum_dwh); props.put("password", SystemConstants.dataOutput_password_greenPlum_dwh); try { dataSource = DruidDataSourceFactory.createDataSource(props); } catch (Exception e) { e.printStackTrace(); } } private DruidConnectionPool() { } public static Connection getConnection() throws SQLException { return dataSource.getConnection(); } }
GreenPlumTwoPhaseCommitSink,使用两阶段提交,保证checkpoint和数据写入一同成功
package com.ysservice.dataStreamApi.sink; import com.ysservice.dataStreamApi.utils.DruidConnectionPool; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; public class GreenPlumTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction{ // 定义可用的构造函数 public GreenPlumTwoPhaseCommitSink() { super(new KryoSerializer<>(GreenPlumTwoPhaseCommitSink.ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE); } @Override protected ConnectionState beginTransaction() throws Exception { //使用连接池,不使用单个连接 Connection connection = DruidConnectionPool.getConnection(); connection.setAutoCommit(false);//设定不自动提交 return new ConnectionState(connection); } @Override protected void invoke(ConnectionState transaction, String value, Context context) throws Exception { Connection connection = transaction.connection; String sql = value.split("<-tableNameSplitPoint->")[1]; PreparedStatement pstm = connection.prepareStatement(sql); int i = 0; try { i = pstm.executeUpdate(); } catch (SQLException throwables) { PreparedStatement pstm2 = connection.prepareStatement("insert into cdc_log.cdc_error_sql(error_sql) values ('错误sql:" + sql + "')"); i = pstm2.executeUpdate(); pstm2.close(); } if (i == 0) { PreparedStatement pstm3 = connection.prepareStatement("insert into cdc_log.cdc_error_sql(error_sql) values ('执行后未更新数据库的sql:" + sql + "')"); pstm3.executeUpdate(); pstm3.close(); } pstm.close(); } // 先不做处理 @Override protected void preCommit(ConnectionState transaction) throws Exception { } //提交事务 @Override protected void commit(ConnectionState transaction) { Connection connection = transaction.connection; try { connection.commit(); connection.close(); } catch (SQLException e) { throw new RuntimeException("提交事物异常"); } } //回滚事务 @Override protected void abort(ConnectionState transaction) { System.out.println("=====> abort... "); Connection connection = transaction.connection; try { connection.rollback(); connection.close(); } catch (SQLException e) { throw new RuntimeException("回滚事物异常"); } } //定义建立数据库连接的方法 public static class ConnectionState { private final transient Connection connection; public ConnectionState(Connection connection) { this.connection = connection; } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)