1、业务背景
业务需求把数据写入clickhouse,同时还需要支持主键更新。目前使用的开源flink1.11版本是不支持clickhouse的,项目中使用的是flink sql 所以需要对源代码进行改造,支持jdbc的方式把实时数据写入clickhouse集群。
package org.apache.flink.connector.jdbc.dialect; import java.util.Arrays; import java.util.List; import java.util.Optional; public final class JdbcDialects { private static final ListDIALECTS = Arrays.asList( new DerbyDialect(), new MySQLDialect(), new PostgresDialect() ); public static Optional get(String url) { for (JdbcDialect dialect : DIALECTS) { if (dialect.canHandle(url)) { return Optional.of(dialect); } } return Optional.empty(); } }
2、自定义JdbcDialect,参考MySQLDialect来实现
package org.apache.flink.connector.jdbc.dialect; import java.util.Arrays; import java.util.List; import java.util.Optional; import org.apache.flink.connector.jdbc.internal.converter.ClickhouseRowConverter; import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; public class ClickhouseDialect extends AbstractDialect { private static final long serialVersionUID = 1L; // Define MAX/MIN precision of TIMESTAMP type according to Mysql docs: // https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html private static final int MAX_TIMESTAMP_PRECISION = 6; private static final int MIN_TIMESTAMP_PRECISION = 1; // Define MAX/MIN precision of DECIMAL type according to Mysql docs: // https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html private static final int MAX_DECIMAL_PRECISION = 65; private static final int MIN_DECIMAL_PRECISION = 1; @Override public boolean canHandle(String url) { return url.startsWith("jdbc:clickhouse:"); } @Override public JdbcRowConverter getRowConverter( RowType rowType) { return new ClickhouseRowConverter(rowType); } @Override public OptionaldefaultDriverName() { return Optional.of("ru.yandex.clickhouse.ClickHouseDriver"); } @Override public String quoteIdentifier(String identifier) { return identifier; } @Override public int maxDecimalPrecision() { return MAX_DECIMAL_PRECISION; } @Override public int minDecimalPrecision() { return MIN_DECIMAL_PRECISION; } @Override public int maxTimestampPrecision() { return MAX_TIMESTAMP_PRECISION; } @Override public int minTimestampPrecision() { return MIN_TIMESTAMP_PRECISION; } @Override public List unsupportedTypes() { return Arrays.asList( LogicalTypeRoot.BINARY, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, LogicalTypeRoot.INTERVAL_YEAR_MONTH, LogicalTypeRoot.INTERVAL_DAY_TIME, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MULTISET, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW, LogicalTypeRoot.DISTINCT_TYPE, LogicalTypeRoot.STRUCTURED_TYPE, LogicalTypeRoot.NULL, LogicalTypeRoot.RAW, LogicalTypeRoot.SYMBOL, LogicalTypeRoot.UNRESOLVED ); } @Override public String dialectName() { return "clickhouse"; } }
3、自定义实现JdbcRowConverter,参考MySQLRowConverter
package org.apache.flink.connector.jdbc.internal.converter; import org.apache.flink.table.types.logical.RowType; public class ClickhouseRowConverter extends AbstractJdbcRowConverter{ public ClickhouseRowConverter(RowType rowType) { super(rowType); } private static final long serialVersionUID = 1L; @Override public String converterName() { return "clickhouse"; } }
4、现在我们把创建好的ClickhouseDialect 放入到JdbcDialects 代码中
package org.apache.flink.connector.jdbc.dialect; import java.util.Arrays; import java.util.List; import java.util.Optional; public final class JdbcDialects { private static final ListDIALECTS = Arrays.asList( new DerbyDialect(), new MySQLDialect(), new PostgresDialect(), new ClickhouseDialect() ); public static Optional get(String url) { for (JdbcDialect dialect : DIALECTS) { if (dialect.canHandle(url)) { return Optional.of(dialect); } } return Optional.empty(); } }
这样就完成了,flink-jdbc-connector支持clickhouse的改造。
5、业务sql
create table tableA( client_time string, user_id string, client_ip string, session_id string, query string, dayno string, ) COMMENT 'tableA' WITH ( 'connector' = 'kafka', 'topic' = 'kafka_topic', 'properties.bootstrap.servers' = 'kafka_servers', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'SCRAM-SHA-256', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";', 'properties.group.id' = 'kafka_groupUd', 'scan.startup.mode'='timestamp', 'scan.startup.timestamp-millis'='1638962488170', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true', 'csv.allow-comments' = 'true', 'csv.field-delimiter' = U&'009' ); create table tableB( client_time string, user_id string, session_id string, query string, dayno string, primary key (session_id) NOT ENFORCED ) COMMENT 'session_id维度汇总数据' WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:clickhouse://host:ip/database', 'connector.table' = 'clickhouse_table' , 'connector.username' = 'xxx', 'connector.password' = 'xxx', 'connector.write.flush.max-rows' = '5000', 'connector.write.flush.interval' = '5' ); insert into tableB select client_time, user_id, session_id, query, dayno, from( select LISTAGG(client_time,',') client_time, max(user_id) user_id, session_id, LISTAGG(query,',') query, min(dayno) dayno, count(1) cnt from tableA where REGEXP(dayno,'[0-9]{4}-[0-9]{2}-[0-9]{2}') and session_id is not null and session_id <> '' group by session_id )x where cnt <= 10;
上面的sql代码是支持持续更新的,所以会生成update语句,但是clickhouse不支持update语句,要求写入insert语句,根据报错信息,找对应的代码进行改造,只执行insert语句,业务就成功运行了。
package org.apache.flink.connector.jdbc.internal.executor; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHousePreparedStatementImpl; import javax.annotation.Nonnull; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public final class InsertOrUpdateJdbcExecutorimplements JdbcBatchStatementExecutor { private static final Logger LOG = LoggerFactory.getLogger(InsertOrUpdateJdbcExecutor.class); private final String existSQL; private final String insertSQL; private final String updateSQL; private final JdbcStatementBuilder existSetter; private final JdbcStatementBuilder insertSetter; private final JdbcStatementBuilder updateSetter; private final Function keyExtractor; private final Function valueMapper; private final Map batch; private transient PreparedStatement existStatement; private transient PreparedStatement insertStatement; private transient PreparedStatement updateStatement; public InsertOrUpdateJdbcExecutor( @Nonnull String existSQL, @Nonnull String insertSQL, @Nonnull String updateSQL, @Nonnull JdbcStatementBuilder existSetter, @Nonnull JdbcStatementBuilder insertSetter, @Nonnull JdbcStatementBuilder updateSetter, @Nonnull Function keyExtractor, @Nonnull Function valueExtractor) { this.existSQL = checkNotNull(existSQL); this.insertSQL = checkNotNull(insertSQL); this.updateSQL = checkNotNull(updateSQL); this.existSetter = checkNotNull(existSetter); this.insertSetter = checkNotNull(insertSetter); this.updateSetter = checkNotNull(updateSetter); this.keyExtractor = checkNotNull(keyExtractor); this.valueMapper = checkNotNull(valueExtractor); this.batch = new HashMap<>(); } @Override public void prepareStatements(Connection connection) throws SQLException { existStatement = connection.prepareStatement(existSQL); insertStatement = connection.prepareStatement(insertSQL); updateStatement = connection.prepareStatement(updateSQL); } @Override public void addToBatch(R record) { batch.put(keyExtractor.apply(record), valueMapper.apply(record)); } @Override public void executeBatch() throws SQLException { if (!batch.isEmpty()) { for (Map.Entry entry : batch.entrySet()) { processoneRowInBatch(entry.getKey(), entry.getValue()); } if(updateStatement instanceof ClickHousePreparedStatementImpl) { insertStatement.executeBatch(); } else { updateStatement.executeBatch(); insertStatement.executeBatch(); } batch.clear(); } } private void processoneRowInBatch(K pk, V row) throws SQLException { if(updateStatement instanceof ClickHousePreparedStatementImpl) { insertSetter.accept(insertStatement, row); insertStatement.addBatch(); } else if (exist(pk)) { updateSetter.accept(updateStatement, row); updateStatement.addBatch(); } else { insertSetter.accept(insertStatement, row); insertStatement.addBatch(); } } private boolean exist(K pk) throws SQLException { existSetter.accept(existStatement, pk); try (ResultSet resultSet = existStatement.executeQuery()) { return resultSet.next(); } } @Override public void closeStatements() throws SQLException { for (PreparedStatement s : Arrays.asList(existStatement, insertStatement, updateStatement)) { if (s != null) { s.close(); } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)