女友也能学会的Flink —— Mac上搭建Flink 1.14.0环境并编写Demo

女友也能学会的Flink —— Mac上搭建Flink 1.14.0环境并编写Demo,第1张

女友也能学会的Flink —— Mac上搭建Flink 1.14.0环境并编写Demo 女友也能学会的Flink —— Mac上搭建Flink 1.14.0环境并编写Demo 1. 准备环境

这里主要介绍Mac系统下的环境搭建,其他 *** 作系统可直接查看官网或者我个人的翻译博客

1.1 安装并查看Java版本号

Flink要求Java版本为Java8或Java11及以上。

java -version

1.2 安装Flink

查看Flink信息

brew info apache-flink

安装Flink

brew install apache-flink

1.3 检查安装
flink --version

1.4 启动Flink
# 切换到Flink安装目录下的bin目录
cd /opt/homebrew/Cellar/apache-flink/1.14.0/libexec/bin
# 启动Flink集群
./start-cluster.sh

1.5 查看Web页面

FlinkWeb页面

1.6 删除
brew remove apache-flink
2. Demo项目 2.1 新建一个maven项目

2.2 编写代码 2.2.1 引入依赖


    org.apache.flink
    flink-core
    ${Flink_VERSION}
    provided





    org.apache.flink
    flink-streaming-java_${SCALA_VERSION}
    ${Flink_VERSION}
    provided

2.2.2 创建SocketStreamWordCount
package com.suci.knowledge.firststep;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class SocketStreamWordCount {
  public static void main(String[] args) throws Exception {

      final int ARGS_LENGTH = 2;

      // Socket参数校验
      if (args.length < ARGS_LENGTH){
          System.err.println("ERROR:参数校验失败,请输入正确的参数  ");
          return;
      }

      String hostname = args[0];
      String port = args[1];

      // 创建Flink运行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 添加数据源
      DataStream socketTextStream = env.socketTextStream(hostname, Integer.parseInt(port));

      // 对数据分组统计
      DataStream> sum = socketTextStream.flatMap(new SocketStreamFlatMapFunction())
              .keyBy(0)
              .sum(1);

      sum.print();
      // 运行程序
      env.execute("SocketStream");

  }

    
    private static class SocketStreamFlatMapFunction implements FlatMapFunction> {

        @Override
        public void flatMap(String s, Collector> collector) throws Exception {
            String[] words = s.split(" ");
            for (String word : words) {
                collector.collect(new Tuple2(word, 1));
            }
        }
    }
}

2.2 打包运行
  1. 将项目打成jar包
mvn clean package -Dmaven.test.skip=true

  1. 监听7777端口
nc -l 7777

  1. 运行程序
    进入flink的bin目录下,执行以下命令
flink run -c com.suci.knowledge.firststep.SocketStreamWordCount /Users/apple/Desktop/flink-learning/flink-core/target/flink-core-1.0-SNAPSHOT.jar 127.0.0.1 7777

Jar包路径根据个人代码位置进行替换

  1. 进入Web UI查看运行的程序

Web UI:http://localhost:8081/

  1. 查看统计结果

使用tail命令监控程序的实时输出,注意将文件路径切换为你的路径。

cd /opt/homebrew/Cellar/apache-flink/1.14.0/libexec/log
tail -f flink-apple-taskexecutor-0-apples-MacBook-Pro.local.out

Web UI上也可以看到总共接收了12条记录。

总结

本文主要介绍了在Mac环境下如何使用HomeBrew安装Flink,并运行了一个Demo程序,希望能对你有所帮助。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5692549.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存