【腾讯轻量应用服务器上部署kafka并通过flink读取kafka数据】

【腾讯轻量应用服务器上部署kafka并通过flink读取kafka数据】,第1张

【腾讯轻量应用服务器上部署kafka并通过flink读取kafka数据】 环境准备

经过1个月的摸索,最终选择在腾讯云上搭建一个学习环境。当时选择原因还是新用户有优惠(150左右3年),但现在看1核2g的配置勉强够用,建议后续小伙伴选择时最好是2核4g配置。
由于是单节点安装,需要准备如下资源:
1、jdk1.8
2、zookeeper3.5.9
3、kafka_2.12-3.0.0
链接: 资源都整合在这里.
提取码:pbtw

JDK安装
  1. 找到jdk进行解压

  2. 配置环境变量

     vi /etc/profile
     export JAVA_HOME=/usr/local/soft/jdk1.8.0_171
     export PATH=.:$JAVA_HOME/bin:$PATH
    

    配置完后执行:source /etc/profile

zookeeper安装
  1. 找到zookeeper进行解压:
  2. 配置环境变量(如上)
  3. 解压后进入zookeeper的conf目录
  4. 执行mv zoo_sample.cfg zoo.cfg
  5. 修改以下路径
安装kafka
  1. 找到kafka进行解压:
  2. 配置环境变量(如上)
  3. 解压后进入kafka的config目录:vi server.properties,修改如下几处:


如上,所需要的资源已经配置完毕,下面进行启动 常用命令
启动(kafka目录下执行):
zkServer.sh start
bin/kafka-server-start.sh -daemon config/server.properties

停止(kafka目录下执行):
bin/kafka-server-stop.sh config/server.properties
zkServer.sh stop

创建topic(kafka bin目录下执行):
kafka-topics.sh --create --topic test --bootstrap-server 10.0.4.2:9092 --partitions 1 --replication-factor 1

查看topic(kafka bin目录下执行):
kafka-topics.sh --describe --topic test --bootstrap-server 10.0.4.2:9092

创建生产者(kafka bin目录下执行):
kafka-console-producer.sh --topic test --bootstrap-server 10.0.4.2:9092

创建消费者(kafka bin目录下执行):
kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server 10.0.4.2:9092

启动zookeeper和kafka后可以看到如下进程:

启动生产和消费者命令消费数据:

以上服务器上安装成功,接下来通过flink 读取kafka数据 防火墙端口号开通

这一步是非常重要的,因为云服务器默认是开启防火墙限制的,如果从本地访问特定ip端口号需要在防火墙规则进行配置。
1、进入云服务器首页管理页面

2、点击防火墙进行添加规则

3、分别添加9092和2181端口

4、添加完毕如下

添加依赖
我这里用的是flink1.12版本
```xml

        
            org.apache.flink
            flink-streaming-java_2.12
            1.12.5
        
        
            org.apache.flink
            flink-clients_2.12
            1.12.5
        
        
            org.apache.flink
            flink-java
            1.12.5
        
        
            org.apache.flink
            flink-connector-kafka_2.12
            1.12.5
        
        
            org.apache.kafka
            kafka-clients
            3.0.0
        
代码示例
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class kafkaTest {
    public static void main(String[] args) throws Exception{
        //创建上下文
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        test1(env);
        env.execute("kafkaTest");
    }

    public static void test1(StreamExecutionEnvironment env) {
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","云服务器外网ip地址:9092");
        prop.setProperty("group.id","test");
        DataStream stream = env
                .addSource(new FlinkKafkaConsumer("topicname",new SimpleStringSchema(),prop));
        stream.print();
    }

}

服务器上开启一个生产者客户端并往里面打数据

运行结果如下:

以上完成了整个部署及调用过程。后续会出一个踩坑记录。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653140.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存