flink jdbc connector支持clickhouse

flink jdbc connector支持clickhouse,第1张

flink jdbc connector支持clickhouse

1、业务背景

业务需求把数据写入clickhouse,同时还需要支持主键更新。目前使用的开源flink1.11版本是不支持clickhouse的,项目中使用的是flink sql 所以需要对源代码进行改造,支持jdbc的方式把实时数据写入clickhouse集群。

package org.apache.flink.connector.jdbc.dialect;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;


public final class JdbcDialects {

	private static final List DIALECTS = Arrays.asList(
		new DerbyDialect(),
		new MySQLDialect(),
		new PostgresDialect()
	);

	
	public static Optional get(String url) {
		for (JdbcDialect dialect : DIALECTS) {
			if (dialect.canHandle(url)) {
				return Optional.of(dialect);
			}
		}
		return Optional.empty();
	}
}

2、自定义JdbcDialect,参考MySQLDialect来实现

package org.apache.flink.connector.jdbc.dialect;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import org.apache.flink.connector.jdbc.internal.converter.ClickhouseRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

public class ClickhouseDialect extends AbstractDialect {
	
	 	private static final long serialVersionUID = 1L;

		// Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
		// https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
		private static final int MAX_TIMESTAMP_PRECISION = 6;
		private static final int MIN_TIMESTAMP_PRECISION = 1;

		// Define MAX/MIN precision of DECIMAL type according to Mysql docs:
		// https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
		private static final int MAX_DECIMAL_PRECISION = 65;
		private static final int MIN_DECIMAL_PRECISION = 1;

	   @Override
	   public boolean canHandle(String url) {
	       return url.startsWith("jdbc:clickhouse:");
	   }

	   @Override
	   public JdbcRowConverter getRowConverter(
	           RowType rowType) {
	       return new ClickhouseRowConverter(rowType);
	   }

	   @Override
	   public Optional defaultDriverName() {
	       return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
	   }

	   @Override
	   public String quoteIdentifier(String identifier) {
	       return identifier;
	   }

	   @Override
	   public int maxDecimalPrecision() {
	       return MAX_DECIMAL_PRECISION;
	   }

	   @Override
	   public int minDecimalPrecision() {
	       return MIN_DECIMAL_PRECISION;
	   }

	   @Override
	   public int maxTimestampPrecision() {
	       return MAX_TIMESTAMP_PRECISION;
	   }

	   @Override
	   public int minTimestampPrecision() {
	       return MIN_TIMESTAMP_PRECISION;
	   }

	   @Override
	   public List unsupportedTypes() {
	       return Arrays.asList(
	               LogicalTypeRoot.BINARY,
	               LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
	               LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
	               LogicalTypeRoot.INTERVAL_YEAR_MONTH,
	               LogicalTypeRoot.INTERVAL_DAY_TIME,
	               LogicalTypeRoot.ARRAY,
	               LogicalTypeRoot.MULTISET,
	               LogicalTypeRoot.MAP,
	               LogicalTypeRoot.ROW,
	               LogicalTypeRoot.DISTINCT_TYPE,
	               LogicalTypeRoot.STRUCTURED_TYPE,
	               LogicalTypeRoot.NULL,
	               LogicalTypeRoot.RAW,
	               LogicalTypeRoot.SYMBOL,
	               LogicalTypeRoot.UNRESOLVED
	       );
	   }

	   @Override
	   public String dialectName() {
	       return "clickhouse";
	   }
}

3、自定义实现JdbcRowConverter,参考MySQLRowConverter

package org.apache.flink.connector.jdbc.internal.converter;

import org.apache.flink.table.types.logical.RowType;

public class ClickhouseRowConverter extends AbstractJdbcRowConverter{
	
	public ClickhouseRowConverter(RowType rowType) {
		super(rowType);
	}

	private static final long serialVersionUID = 1L;

    @Override
    public String converterName() {
        return "clickhouse";
    }
}

4、现在我们把创建好的ClickhouseDialect 放入到JdbcDialects 代码中


package org.apache.flink.connector.jdbc.dialect;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;


public final class JdbcDialects {

	private static final List DIALECTS = Arrays.asList(
		new DerbyDialect(),
		new MySQLDialect(),
		new PostgresDialect(),
		new ClickhouseDialect()
	);

	
	public static Optional get(String url) {
		for (JdbcDialect dialect : DIALECTS) {
			if (dialect.canHandle(url)) {
				return Optional.of(dialect);
			}
		}
		return Optional.empty();
	}
}

这样就完成了,flink-jdbc-connector支持clickhouse的改造。

5、业务sql

create table tableA(
    client_time string,
    user_id string,
    client_ip string,
    session_id string,
    query string,
    dayno string,
) COMMENT 'tableA'
WITH (
    'connector' = 'kafka',
    'topic' = 'kafka_topic',
    'properties.bootstrap.servers' = 'kafka_servers',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'SCRAM-SHA-256',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";',
    'properties.group.id' = 'kafka_groupUd',
    'scan.startup.mode'='timestamp',
    'scan.startup.timestamp-millis'='1638962488170',
    'format' = 'csv',
    'csv.ignore-parse-errors' = 'true',
    'csv.allow-comments' = 'true',
    'csv.field-delimiter' = U&'009'
);

create table tableB(
    client_time		string,
    user_id		string,
    session_id string,
    query		string,
    dayno		string,
    primary key (session_id) NOT ENFORCED
) COMMENT 'session_id维度汇总数据'
WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:clickhouse://host:ip/database',
    'connector.table' =  'clickhouse_table' ,
    'connector.username' = 'xxx',
    'connector.password' = 'xxx',
    'connector.write.flush.max-rows' = '5000',
    'connector.write.flush.interval' = '5'
);

insert into  tableB
select
    client_time,
    user_id,
    session_id,
    query,
    dayno,
from(
    select
        LISTAGG(client_time,',') client_time,
        max(user_id) user_id,
        session_id,
        LISTAGG(query,',') query,
        min(dayno) dayno,
        count(1) cnt
    from tableA
    where REGEXP(dayno,'[0-9]{4}-[0-9]{2}-[0-9]{2}') and session_id is not null and session_id <> ''
    group by session_id
)x where cnt <= 10;

上面的sql代码是支持持续更新的,所以会生成update语句,但是clickhouse不支持update语句,要求写入insert语句,根据报错信息,找对应的代码进行改造,只执行insert语句,业务就成功运行了。


package org.apache.flink.connector.jdbc.internal.executor;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;

import javax.annotation.Nonnull;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;


@Internal
public final class InsertOrUpdateJdbcExecutor implements JdbcBatchStatementExecutor {

	private static final Logger LOG = LoggerFactory.getLogger(InsertOrUpdateJdbcExecutor.class);

	private final String existSQL;
	private final String insertSQL;
	private final String updateSQL;

	private final JdbcStatementBuilder existSetter;
	private final JdbcStatementBuilder insertSetter;
	private final JdbcStatementBuilder updateSetter;

	private final Function keyExtractor;
	private final Function valueMapper;

	private final Map batch;

	private transient PreparedStatement existStatement;
	private transient PreparedStatement insertStatement;
	private transient PreparedStatement updateStatement;

	public InsertOrUpdateJdbcExecutor(
			@Nonnull String existSQL,
			@Nonnull String insertSQL,
			@Nonnull String updateSQL,
			@Nonnull JdbcStatementBuilder existSetter,
			@Nonnull JdbcStatementBuilder insertSetter,
			@Nonnull JdbcStatementBuilder updateSetter,
			@Nonnull Function keyExtractor,
			@Nonnull Function valueExtractor) {
		this.existSQL = checkNotNull(existSQL);
		this.insertSQL = checkNotNull(insertSQL);
		this.updateSQL = checkNotNull(updateSQL);
		this.existSetter = checkNotNull(existSetter);
		this.insertSetter = checkNotNull(insertSetter);
		this.updateSetter = checkNotNull(updateSetter);
		this.keyExtractor = checkNotNull(keyExtractor);
		this.valueMapper = checkNotNull(valueExtractor);
		this.batch = new HashMap<>();
	}

	@Override
	public void prepareStatements(Connection connection) throws SQLException {
		existStatement = connection.prepareStatement(existSQL);
		insertStatement = connection.prepareStatement(insertSQL);
		updateStatement = connection.prepareStatement(updateSQL);
	}

	@Override
	public void addToBatch(R record) {
		batch.put(keyExtractor.apply(record), valueMapper.apply(record));
	}

	@Override
	public void executeBatch() throws SQLException {
		if (!batch.isEmpty()) {
			for (Map.Entry entry : batch.entrySet()) {
				processoneRowInBatch(entry.getKey(), entry.getValue());
			}
			
			if(updateStatement instanceof ClickHousePreparedStatementImpl) {
				insertStatement.executeBatch();
			} else {
				updateStatement.executeBatch();
				insertStatement.executeBatch();
			}
			
			batch.clear();
		}
	}

	private void processoneRowInBatch(K pk, V row) throws SQLException {
		
		
		if(updateStatement instanceof ClickHousePreparedStatementImpl) {
			insertSetter.accept(insertStatement, row);
			insertStatement.addBatch();
		} else if (exist(pk)) {
			updateSetter.accept(updateStatement, row);
			updateStatement.addBatch();
		} else {
			insertSetter.accept(insertStatement, row);
			insertStatement.addBatch();
		}
	}

	private boolean exist(K pk) throws SQLException {
		existSetter.accept(existStatement, pk);
		try (ResultSet resultSet = existStatement.executeQuery()) {
			return resultSet.next();
		}
	}

	@Override
	public void closeStatements() throws SQLException {
		for (PreparedStatement s : Arrays.asList(existStatement, insertStatement, updateStatement)) {
			if (s != null) {
				s.close();
			}
		}
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653085.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存