- 添加依赖
- 基于 Flink 服务提交任务并执行时需要的依赖包
- 构建ClickhouseSink参数实例
- 构建自定义ClickhouseStoreSink
基于 Flink 服务提交任务并执行时需要的依赖包ru.yandex.clickhouse clickhouse-jdbc0.2.4
基于 flink 服务器提交任务前,先上传依赖包到 flink 的 lib 目录下;然后重启 flink 服务,使 jar 进行加载;否则会出现 ClassNoFoundException 或者 clickhouseNoClassDefFound:ErrCould not initialize class ru.yandex.clickhouse.ClickHouseDriver 的异常。
- clickhouse-jdbc-0.2.4.jar
- httpclient-4.5.2.jar
- httpcore-4.4.4.jar
- commons-logging-1.0.4.jar
- guava-19.0.jar
public class ClickhouseSink implements Serializable { private static final long serialVersionUID = -4410041701538783205L; private String url; private String index; private String username; private String password; public String getUrl() { return url; } public String getIndex() { return index; } public String getUsername() { return username; } public String getPassword() { return password; } public ClickhouseSink(Object obj) { final JSONObject json = JSONObject.parseObject(obj.toString()); this.url = json.getString("url"); this.index = json.getString("index"); this.username = json.getString("username"); this.password = json.getString("password"); } }构建自定义ClickhouseStoreSink
基于继承 RichSinkFunction< T > 抽象类实现自定义Sink,实现方法有三个:
- open():构建sink节点时最先执行的方法,用于实现一些初始化动作。
- invoke():执行节点时执行,用于实现具体业务逻辑。
- close():关闭节点回收资源时执行,用于资源的回收。
import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; import java.util.Date; import java.util.Map; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.ygsoft.dataprocess.vo.sink.ClickhouseSink; public class ClickhouseStoreSink extends RichSinkFunction
评论列表(0条)