经过1个月的摸索,最终选择在腾讯云上搭建一个学习环境。当时选择原因还是新用户有优惠(150左右3年),但现在看1核2g的配置勉强够用,建议后续小伙伴选择时最好是2核4g配置。
由于是单节点安装,需要准备如下资源:
1、jdk1.8
2、zookeeper3.5.9
3、kafka_2.12-3.0.0
链接: 资源都整合在这里.
提取码:pbtw
-
找到jdk进行解压:
-
配置环境变量
vi /etc/profile export JAVA_HOME=/usr/local/soft/jdk1.8.0_171 export PATH=.:$JAVA_HOME/bin:$PATH
配置完后执行:source /etc/profile
- 找到zookeeper进行解压:
- 配置环境变量(如上)
- 解压后进入zookeeper的conf目录
- 执行mv zoo_sample.cfg zoo.cfg
- 修改以下路径
- 找到kafka进行解压:
- 配置环境变量(如上)
- 解压后进入kafka的config目录:vi server.properties,修改如下几处:
启动(kafka目录下执行): zkServer.sh start bin/kafka-server-start.sh -daemon config/server.properties 停止(kafka目录下执行): bin/kafka-server-stop.sh config/server.properties zkServer.sh stop 创建topic(kafka bin目录下执行): kafka-topics.sh --create --topic test --bootstrap-server 10.0.4.2:9092 --partitions 1 --replication-factor 1 查看topic(kafka bin目录下执行): kafka-topics.sh --describe --topic test --bootstrap-server 10.0.4.2:9092 创建生产者(kafka bin目录下执行): kafka-console-producer.sh --topic test --bootstrap-server 10.0.4.2:9092 创建消费者(kafka bin目录下执行): kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server 10.0.4.2:9092
启动zookeeper和kafka后可以看到如下进程:
启动生产和消费者命令消费数据:
这一步是非常重要的,因为云服务器默认是开启防火墙限制的,如果从本地访问特定ip端口号需要在防火墙规则进行配置。
1、进入云服务器首页管理页面
2、点击防火墙进行添加规则
3、分别添加9092和2181端口
4、添加完毕如下
我这里用的是flink1.12版本 ```xml代码示例org.apache.flink flink-streaming-java_2.121.12.5 org.apache.flink flink-clients_2.121.12.5 org.apache.flink flink-java1.12.5 org.apache.flink flink-connector-kafka_2.121.12.5 org.apache.kafka kafka-clients3.0.0
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class kafkaTest { public static void main(String[] args) throws Exception{ //创建上下文 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); test1(env); env.execute("kafkaTest"); } public static void test1(StreamExecutionEnvironment env) { Properties prop = new Properties(); prop.setProperty("bootstrap.servers","云服务器外网ip地址:9092"); prop.setProperty("group.id","test"); DataStreamstream = env .addSource(new FlinkKafkaConsumer ("topicname",new SimpleStringSchema(),prop)); stream.print(); } }
服务器上开启一个生产者客户端并往里面打数据
运行结果如下:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)