spark-streaming pull方式拉取 flume-1.6.0-cdh5.10.1数据

spark-streaming pull方式拉取 flume-1.6.0-cdh5.10.1数据,第1张

注意:文章末尾提供apache-flume-1.6.0-cdh5.10.1-bin 网盘资源连接
1、flume配置文件 flume-conf-spark-netcat-pull.properties

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 0.0.0.0
a1.sinks.k1.port = 5141
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、启动flume

cd /usr/local/src/app/apache-flume-1.6.0-cdh5.10.1-bin/conf
bin/flume-ng agent -c conf/ -f conf/flume-conf-spark-netcat-pull.properties -n a1 -Dflume.root.logger=INFO,console

3、java工程部分pom文件

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <scala.version>2.11.8</scala.version>
    <kafka.version>0.9.0.0</kafka.version>
    <spark.version>2.2.0</spark.version>
    <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
    <hbase.version>1.2.0-cdh5.7.0</hbase.version>
  </properties>
  <repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    </repository>
  </repositories>
  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!-- Spark Streaming 依赖-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!-- Spark Streaming整合Flume 依赖-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume-sink_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.5</version>
    </dependency>

    <!-- Spark SQL 依赖-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>


    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.11</artifactId>
      <version>2.6.5</version>
    </dependency>

    <dependency>
      <groupId>net.jpountz.lz4</groupId>
      <artifactId>lz4</artifactId>
      <version>1.3.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flume.flume-ng-clients</groupId>
      <artifactId>flume-ng-log4jappender</artifactId>
      <version>1.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.7.4</version>
    </dependency>
  </dependencies>

4、JAVA代码

public class SparkFlumeUpdateStateTest {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[3]").setAppName("app");
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(5));
        javaStreamingContext.checkpoint(".");
        //初始化

        List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<String, Integer>("start", 1));
        JavaPairRDD<String,Integer> initialRDD = javaStreamingContext.sparkContext().parallelizePairs(tuples);


        JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
                FlumeUtils.createPollingStream(javaStreamingContext, "IP地址", 5141);
        JavaPairDStream<String, Integer> pairDStream = flumeStream.map(item -> new String(item.event().getBody().array()).trim()).mapToPair(s -> new Tuple2<>(s, 1));
        Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
                (word, one, state) -> {
                    int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
                    Tuple2<String, Integer> output = new Tuple2<>(word, sum);
                    state.update(sum);
                    return output;
                };

        JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
                pairDStream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

        stateDstream.print();
        stateDstream.stateSnapshots().print();
        javaStreamingContext.start();
        try {
            javaStreamingContext.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

5、IDEA开发工具需要配置Scala环境


6、flume-1.6.0-cdh5.10.1 网盘资源地址
链接: https://pan.baidu.com/s/1td4z5dIWfkaDnT28loj8HA 提取码: 1ou2

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/787418.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-05
下一篇 2022-05-05

发表评论

登录后才能评论

评论列表(0条)

保存