-
版本限制:
flink flink-clickhouse-sink 1.3.*1.0.0 1.9.*1.3.1 -
Maven依赖
ru.ivi.opensource flink-clickhouse-sink1.3.1 -
Job类添加ClickHouse的环境配置
... ... ... Map
sinkPro = new HashMap<>(); //sink Properties sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "sc.chproxy.bigdata.services.org:10000"); // ClickHouse 本地写账号 sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_USER, "your-user"); sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, "your-password"); // sink common sinkPro.put(ClickHouseSinkConst.TIMEOUT_SEC, "10"); sinkPro.put(ClickHouseSinkConst.NUM_WRITERS, "10"); sinkPro.put(ClickHouseSinkConst.NUM_RETRIES, "3"); sinkPro.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "1000000"); sinkPro.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false"); sinkPro.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");//本地运行会在项目内生成名字为"d:"的文件夹,以存放运行失败明细记录 // env - sinkPro ParameterTool parameters = ParameterTool.fromMap(sinkPro); env.getConfig().setGlobalJobParameters(parameters); // ClickHouseSink - sinkPro Properties props = new Properties(); props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "database_1564.ch_zjk_test_local"); props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000"); ClickHouseSink sink = new ClickHouseSink(props); env.setParallelism(1);//ClickHouse不支持高并发,本地测试建议加上,全局环境并行设为1 ... ... ... env.addSource(new FlinkKafkaConsumer(PropertyUtil.get("your-topic"), new SimpleStringSchema(), sourcPro).setStartFromLatest()).uid("JobSource").name("JobSource").setParallelism(1) //这个FlatMap算子是数据的ETL .flatMap(new SourceFlatMapRep()).uid("SourceFlatMapRep").name("SourceFlatMapRep").setParallelism(2) //这个算子是输出要插入的字段数据,封装成固定格式的字符串(圆括号包裹整体,单引包裹每个元素),例如-> ('zjk','22','csdn_note') .flatMap(new OutFlatMap()).uid("OutFlatMap").name("OutFlatMap").setParallelism(1) //输入到flink-clickhouse-sink的三方处理,自动发送给CK .andSink(sink); env.execute("EventJob");
flink-connector-jdbc要求Flink版本为 1.11.0+
-
Maven依赖
org.apache.flink flink-connector-jdbc_${scala.binary.version}${flink.version} -
Job类
//不需要CK全局环境那些配置,参数都在SinkFunction自定义 ... ... ... env.addSource(new FlinkKafkaConsumer(PropertyUtil.get("topic1111"), new SimpleStringSchema(), sourcPro).setStartFromLatest()).uid("JobSource").name("JobSource").setParallelism(5) .flatMap(new SourceFlatMap()).uid("SourceFlatMap").name("SourceFlatMap").setParallelism(10) .flatMap(new EventFlatMap()).uid("EventFlatMap").name("EventFlatMap").setParallelism(10) .addSink(MySink.sink()).setParallelism(4); env.execute("EventJob");
-
SinkFunction算子
import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHouseDriver; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; public class MySink extends RichSinkFunction { private static final Logger LOG = LoggerFactory.getLogger(MySink.class); private static String WRITE_USER = "your-user"; private static String PASSWD = "your-password"; private static String url = "jdbc:clickhouse://sc.chproxy.bigdata.services.org:10000/database_1564"; private static String insertSql = "insert into database_1564.ch_zjk_test_local (seq,real_time,app_id,app_version,session_id,event_id,device_uuid,old_id,mos,m,o,br,os,bd,ise,lct,mid,chl,lpro,mosv,manu,osvi,is_ca,_pt,id,du,pg,dus,ext,gdu,ids,tab,tag,key,code,type,from,plat,exts,docid,style,state,types,newev,hotev,value,vtype,styles,fromID,status,action,msg_id,source,offset,column,spsuuid,buildev,referer,offsets,columnd,fold_id,searchid,rec_type,commentid,refererid,content_id,referer_id,schsessionid,exposurepercent,exposurepercents) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; public static JdbcStatementBuilder
- > marketStatement =
(ps, t) -> {
LOG.info("##### {}", t);
for (int i = 1; i < 71; i++) {
ps.setString(i, t.get(i - 1));
}
};
public static SinkFunction sink() {
return JdbcSink.sink(insertSql, marketStatement, JdbcExecutionOptions.builder().withBatchIntervalMs(1000 * 3).withBatchSize(100).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(url)
.withUsername(WRITE_USER)
.withPassword(PASSWD)
.withDriverName(ClickHouseDriver.class.getName())
.build());
}
}
此方式对任何Flink版本有效 建议使用
- Maven依赖
ru.yandex.clickhouse clickhouse-jdbc0.2.3 - Job类
//它也不需要CK全局环境那些配置,参数都在FlatMap算子的open方法里自定义 ... ... ... env.addSource(new FlinkKafkaConsumer(PropertyUtil.get("topic1111"), new SimpleStringSchema(), sourcPro).setStartFromLatest()).uid("JobSource").name("JobSource").setParallelism(30) .flatMap(new SourceFlatMap()).uid("SourceFlatMap").name("SourceFlatMap").setParallelism(20) .flatMap(new EventFlatMap()).uid("EventFlatMap").name("EventFlatMap").setParallelism(20) //写入CK的算子 .flatMap(new CHSinkFlatMap()).uid("CHSinkFlatMap").name("CHSinkFlatMap").setParallelism(15) env.execute("EventJob");
- FlatMap算子
import java.util.List; import org.slf4j.Logger; import java.sql.Connection; import java.sql.SQLException; import java.sql.DriverManager; import org.slf4j.LoggerFactory; import java.sql.PreparedStatement; import org.apache.flink.util.Collector; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple4; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.ClickHouseDataSource; import org.apache.flink.configuration.Configuration; import ru.yandex.clickhouse.settings.ClickHouseProperties; import org.apache.flink.api.common.functions.RichFlatMapFunction; public class CHSinkFlatMap extends RichFlatMapFunction
- ,String>{
private static final Logger LOG = LoggerFactory.getLogger(CHSinkFlatMap.class);
private static int count = 1;
private static ClickHouseConnection connection= null;
private static PreparedStatement preparedStatement = null;
//创建连接对象和会话
@Override
public void open(Configuration parameters) throws Exception
{
try{
connection = getConn();
connection.setAutoCommit(false);
preparedStatement = connection.prepareStatement(insertSql);
}catch (Exception e)
{
LOG.error("clickhouse初始化连接报错:",e);
}
}
//使用Batch批量写入,关闭自动提交
@Override
public void flatMap(List
list, Collector collector) throws Exception { try { for(int i = 1; i<71 ; i++) { preparedStatement.setString(i, StringUtils.isNotBlank(list.get(i-1)) ? list.get(i-1) : "uk"); } preparedStatement.addBatch(); count = count+1; try{ if (count >= 50000) { preparedStatement.executeBatch(); connection.commit(); preparedStatement.clearBatch(); count = 1; } }catch (Exception ee) { LOG.error("数据插入click house 报错:",ee); } }catch (Exception ex){ LOG.error("ClickhouseSink插入报错====",ex); } } public static ClickHouseConnection getConn() { String username = ""; String password = ""; String address = ""; String db = ""; int socketTimeout = 600000; ClickHouseProperties properties = new ClickHouseProperties(); properties.setUser(username); properties.setPassword(password); properties.setDatabase(db); properties.setSocketTimeout(socketTimeout); ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties); ClickHouseConnection conn = null; try { conn = clickHouseDataSource.getConnection(); return conn; } catch (SQLException e) { e.printStackTrace(); } return null; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)