Flink 自定义sink 写入 RabbitMQ
Flink 自定义sink 写入 RabbitMQ
- 添加依赖
- 基于 Flink 服务提交任务并执行时需要的依赖包
- 构建RabbitMQSink参数实例
- 构建自定义RabbitMQSink
添加
依赖
org.apache.flink
flink-connector-rabbitmq_2.12
1.13.2
基于 Flink 服务提交任务并
执行时需要的依赖包
基于 flink 服务器提交任务前,先上传依赖包到 flink 的 lib 目录下;然后重启 flink 服务,使 jar 进行加载;否则会出现 ClassNoFoundException 的异常。
- flink-connector-rabbitmq_2.12-1.13.2.jar
- amqp-client-5.9.0.jar
构建RabbitMQSink参数实例
public class RabbitSink implements Serializable {
private static final long serialVersionUID = -6108637207745123361L;
private String host;
private int port;
private String username;
private String password;
private String virtualHost;
private String data;
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getVirtualHost() {
return virtualHost;
}
public String getData() {
return data;
}
public RabbitSink(Object obj) {
final JSONObject json = JSONObject.parseObject(obj.toString());
this.host = json.getString("host");
this.port = json.getIntValue("port");
this.username = json.getString("username");
this.password = json.getString("password");
this.virtualHost = json.getString("virtualHost");
if(json.containsKey("data")) {
this.data = json.getString("data");
}
}
构建自定义RabbitMQSink
基于继承 RichSinkFunction< T > 抽象类实现自定义Sink,实现方法有三个:
- open():构建sink节点时最先执行的方法,用于实现一些初始化动作。
- invoke():执行节点时执行,用于实现具体业务逻辑。
- close():关闭节点回收资源时执行,用于资源的回收。
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.ygsoft.dataprocess.vo.sink.RabbitSink;
public class RabbitPropertySink extends RichSinkFunction
public class RabbitEventSink extends RichSinkFunction
评论列表(0条)