RabbitMQ代码是使用Erlang编写的,需要安装Erlang环境
下载安装检查系统环境变量erlang官网地址:https://www.erlang.org/downloads
为了稳定,安装完后要检查一下系统的环境变量,如果没有自动写入需要自行手动配置;
*** 作过程:此电脑(鼠标右键)__属性__高级系统设置__环境变量__新建__系统环境变量
变量名:ERLANG_HOME
变量值:Erlang的根路径
然后添加到 path
*** 作过程:选中Path__编辑__新建__%ERLANG_HOME%bin__确定
保存退出到桌面后打开命令提示符(CMD),输入erl显示版本号即安装成功;
验证官网地址:http://www.rabbitmq.com/download.html
RabbitMq会在开始界面中创建自己功能的快捷方式所以我们直接点击其中的RabbitMq Service - start运行;
桌面会显示Service运行之后的窗口;
在浏览器链接行访问:http://localhost:15672,然后登录后即可看到它的管理页面;
注意默认账号密码皆为:guest
管理页面登录默认端口为15672,消息推送使用的端口默认为5672
Flink Maven依赖4369 (epmd), 25672 (Erlang distribution)5672, 5671 (AMQP 0-9-1 without and with TLS)15672 (if management plugin is enabled)61613, 61614 (if STOMP is enabled)1883, 8883 (if MQTT is enabled)
Flink Version:1.14.3
RabbitMq Version:5.14.1
Mysql Version:8.x.x
代码(java)org.apache.flink flink-javaorg.apache.flink flink-streaming-java_2.121.14.3 org.apache.flink flink-clients_2.121.14.3 org.apache.flink flink-connector-rabbitmq_2.121.14.3 com.rabbitmq amqp-client5.14.1 org.apache.flink flink-connector-jdbc_2.121.14.3 mysql mysql-connector-java8.0.28 com.fasterxml.jackson.core jackson-core2.13.1 com.fasterxml.jackson.core jackson-databind2.13.1 com.fasterxml.jackson.core jackson-annotations2.13.1
因为java和scala中间的Json转换过于繁琐,故使用Java代码开发。
import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.util.Collector; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.Map; public class RMQTransformation { public static void main(String[] args) throws Exception { // 创建Flink流运行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 链接RabbitMq的配置 final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() // RabbitMq部署服务器 .setHost("localhost") // RabbitMq端口 .setPort(5672) // 用户名 .setUserName("guest") // 端口 .setPassword("guest") .setVirtualHost("/") .build(); // 从数据源读取流数据的配置 final DataStream抛砖引玉stream = env // 配置源 .addSource(new RMQSource<>( // 传入配置信息 connectionConfig, // 指定监听的Queue "python-test", // 是否持久化 true, // 流数据类型转换的Schema new SimpleStringSchema())) // 设置并行度 .setParallelism(1); // 处理收到的数据 // 因为发送的数据是JSON,所以需要使用jackson解析JSON提取想要的值; // 因为java 无法自动推断 Flink的数据类型所以保险起见,不使用Lambda表达式; DataStream stringStream = stream.flatMap(new FlatMapFunction () { @Override public void flatMap(String s, Collector collector) throws Exception { ObjectMapper mapper = new ObjectMapper(); JavaType type = mapper.getTypeFactory().constructParametricType(Map.class, String.class, String.class); // jackson解析JSON Map map = mapper.readValue(s, type); for (Map.Entry entry : map.entrySet()) { //遍历收集 值 collector.collect(entry.getValue()); } } }); //控制台打印 stringStream.print(); //设置输出目标 stringStream.addSink(new RichSinkFunction () { private PreparedStatement ps = null; private Connection connection = null; String driver = "com.mysql.cj.jdbc.Driver"; String url = "jdbc:mysql://localhost:3306/scala?serverTimezone=UTC"; String username = "root"; String password = "root"; @Override public void open(Configuration parameters) throws Exception { // 用于建立连接 super.open(parameters); //在此做准备工作 //加载JDBC驱动 Class.forName(driver); // JDBC 链接 connection = DriverManager.getConnection(url, username, password); } @Override public void invoke(String value, SinkFunction.Context context) throws Exception { // 真正执行的 *** 作 String sql = "insert into sync_source (fn_id,fn_key,fn_content) values (?,?,?)"; ps = connection.prepareStatement(sql); ps.setString(1, value); ps.setString(2, "key" + value); ps.setString(3, "value" + value); ps.executeUpdate(); } @Override public void close() throws Exception { //在这里编写步骤做完的代码 super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } }); env.execute(); } }
本文记录从Flink以流的方式监听RabbitMq中Json格式的数据并写入MySQL数据库;
笔者目前仅为初步接触水平,如果有大神能用Scala写出相同效果,请帮忙留一下文章链接,感谢。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)