Flink集群搭建和使用

Flink集群搭建和使用,第1张

Flink集群搭建和使用

文章目录
    • Flink集群搭建和使用
      • local 本地测试
      • flink集群搭建
        • 1、standallone cluster
          • 提交任务 -- 将代码打包
        • 2.flink on yarn 只需要部署一个节点
          • flink启动方式
            • 1、yarn-session
            • 2、直接提交任务到yarn

Flink集群搭建和使用 local 本地测试

idea运行
idea上运行

flink集群搭建 1、standallone cluster
1、准备工作
	有jdk,节点间免密
2、上传解压
	tar -zxvf flink-1.11.0-bin-scala_2.11.tgz
   配置环境变量,过于基础不写了
   然后生效
   source /etc/profile

3、修改配置文件
#修改conf下的flink-conf.yaml
	vim conf/flink-conf.yaml
	#需要改的内容如下:
	jobmanager.rpc.address: master   主节点ip地址
#修改workers
	vim conf/workers
	修改如下:
	增加从节点  node1  node2
	(把localhost改为node1,node2)
#修改masters
	vim conf/masters  
	改成主节点ip

#同步到所有节点`pwd`是当前路径看清楚了
	scp -r flink-1.11.0/ node1:`pwd`

4、启动集群
	start-cluster.sh

http://master:8081   访问web界面
提交任务 – 将代码打包

1、在web页面提交任务

打开web界面后左边会有个Submit New job的点开然后上传jar包


传完了可以点击包名,输入类名(Entry Class),后面那个Paralleism是设置并行度的,其它不用管,然后点击submit即可

2、web提交和flink命令提交任务一样,在shell里输入下面命令

flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar 

3、rpc方式提交任务(远程命令提交,直接在idea里打包idea里运行)

package com.liu.core

import org.apache.flink.streaming.api.scala._


object WordCountRPC {
  def main(args: Array[String]): Unit = {
    //创建flink的环境
    //注意下面参数设置
    val env = StreamExecutionEnvironment.createRemoteEnvironment("master", 45189, "F:\ideaProject\liubigdata12\Flink\target\Flink-1.0-SNAPSHOT.jar")
    //设置并行度
    //    env.setParallelism(2)
    //读取socket数据
    //nc -lk 8888
    env.socketTextStream("master",8888)
      //把单词拆分
      .flatMap(_.split(","))
      //转换成kv格式
      .map((_,1))
      //按单词分组
      .keyBy(_._1)
      //统计单词数量
      .sum(1)
      //打印结果
      .print()

    //启动flink
    env.execute()
  }
}

2.flink on yarn 只需要部署一个节点

1、配置HADOOP_CONF_DIR

vim /etc/profile
#添加如下
export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop/

2、将hadoop依赖jar包上传到flink lib目录

#jar包
flink-shaded-hadoop-2-uber-2.6.5-10.0

flink和spark一样都是粗粒度资源申请

flink启动方式 1、yarn-session

在yarn里面启动一个flink集群 jobManager
先启动hadoop

yarn-session.sh -jm 1024m -tm 1096m



和standalone提交过程差不多,不多赘述

提交任务  任务提交的是偶根据并行度动态申请taskmanager
1、在web页面提交任务

2、同flink命令提交任务
flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar 

3、rpc方式提交任务


模拟消息队列输入单词,web界面查看


RPC模式结果

2、直接提交任务到yarn

直接提交到yarn不会生成端口号,通过master:8088界面查看任务,点击后面的ApplactionMaster跳转到Flink界面

每一个任务都会有一个jobManager

flink run -m yarn-cluster  -yjm 1024m -ytm 1096m -c 
com.shujia.flink.core.Demo1WordCount flink-1.0.jar
#杀掉yarn上的任务,如果之前有任务每释放就执行,id不同别直接复制,没有就忽略
yarn application -kill application_1599820991153_0005

yarn-session先在yarn中启动一个jobMansager ,所有的任务共享一个jobmanager(提交任务更快,任务之间共享jobmanager , 相互有影响) 直接提交任务模型,为每一个任务启动一个joibmanager(每一个任务独立jobmanager , 任务运行稳定)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5609494.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存