Spark阶段总结

Spark阶段总结,第1张

Spark阶段总结 Kafka

  • kafka消费数据

同一时刻,kafka当中数据只能被一个消费者组下面的一个消费者所消费。

kafka消费者在消费数据的时候,都是分组别的。不同组的消费不受影响,相同组内的消费,需要注意,如果partition有3个,消费者有3个,那么便是每一个消费者消费其中一个partition对应的数据;如果有2个消费者,此时一个消费者消费其中一个partition数据,另一个消费者消费2个partition的数据。如果有超过3个的消费者,同一时间只能最多有3个消费者能消费得到数据,

  • kafka当中数据的顺序性

如果将数据保存到多个分区当中,只能保证分区之内有序,全局无序;

要想全局有序,将所有数据发送到一个分区当中。

  • kafka生产者生产数据

指定topic+value:数据采用轮询模式保存

指定topic+key+value:如果key是固定死的,利用key的hash将数据发送某一个分区当中;如果key是动态的,也是利用key的hash,将数据发送到指定分区当中。

指定topic+partition+key+value:如果指定了分区数,那么就会将所有数据发送到指定分区当中。

Produce端向broker端发送数据并保存,为了防止发送的数据丢失,有ack机制,ack机制一共分为3种:

0:producer端发送数据,不管leader是否保存成功,follower是否同步成功,继续发送下一批数据;

1:producer端发送数据,保证leader保存成功,不管follower是否同步成功,继续发送下一批数据;

-1:producer端发送数据,既要保证leader保存成功,也要保证follower同步成功,再发送下一批数据。

## kafka的服务器
bootstrap.servers=bd-offcn-01:9092,bd-offcn-02:9092,bd-offcn-03:9092
##Key的序列化器
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer 
##value的序列化器
value.serializer=org.apache.kafka.common.serialization.StringSerializer 
acks=[0|-1|1|all] ##消息确认机制
	0:	不做确认,直管发送消息即可
	-1|all: 不仅leader需要将数据写入本地磁盘,并确认,还需要同步的等待其它followers进行确认
	1:只需要leader进行消息确认即可,后期follower可以从leader进行同步
batch.size=1024 #每个分区内的用户缓存未发送record记录的空间大小
## 如果缓存区中的数据,没有占满,也就是任然有未用的空间,那么也会将请求发送出去,为了较少请求次数,我们可以配置linger.ms大于0,
linger.ms=10 ## 不管缓冲区是否被占满,延迟10ms发送request
buffer.memory=10240 #控制的是一个producer中的所有的缓存空间
retries=0 #发送消息失败之后的重试次数

kafka消费的并行度就是kaka topic分区的个数,或者分区的个数决定了同一时间同一消费者组内最多可以有多少个消费者消费数据

 offset:是kafka的topic中的partition中的每一条消息的标识,如何区分该条消息在kafka对应的partition的位置,就是用该偏移量。offset的数据类型是Long,8个字节长度。offset在分区内是有序的,分区间是不一定有序。

多节点partition存储分布

副本分配算法:

将所有N Broker和待分配的i个Partition排序。
将第i个Partition分配到第(i mod n)个Broker上。
将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上。
同时也要兼顾复杂均衡,尽量在所有的节点里面都保存相同多的分区及其副本。
 

segment文件段

在kafka中,一个主题可以有多个分区;一个分区可以有多个segment文件段,一个segment文件段有两个文件:.log、.index文件。

.log文件保存数据,.index文件保存数据当中的索引,是稀疏索引。

一个segment文件段默认保存1g数据,当中segment文件段达到1g的数据量,就要开始分裂出第二个segment文件段,以此类推。

第一个segment文件段:

-rw-r--r-- 1 root root 10485760 Oct 13 17:14 00000000000000000000.index

-rw-r--r-- 1 root root   654696 Oct 13 17:14 00000000000000000000.log

第二个segment文件段:

-rw-r--r-- 1 root root 10485760 Oct 13 17:14 00000000000000004356.index

-rw-r--r-- 1 root root   654696 Oct 13 17:14 00000000000000004356.log

第三个segment文件段:

-rw-r--r-- 1 root root 10485760 Oct 13 17:14 00000000000000752386.index

-rw-r--r-- 1 root root   654696 Oct 13 17:14 00000000000000752386.log

Segment文件段命名规则:

以当前segment文件段当中.log文件中第一条数据的偏移量命名。

Kafka中的push和pull

​push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
​pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。

kafka为什么那么快
  1. 顺序读写磁盘
  2. 采用pageCache页缓存技术
  3. 多目录
Kafka命令 启动集群
依次在 hadoop102、hadoop103、hadoop104 节点上启动 kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &
关闭集群
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
查看当前服务器中的所有 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
创建 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 
--create --replication-factor 3 --partitions 1 --topic first

选项说明: --topic 定义 topic 名 --replication-factor 定义副本数 --partitions 定义分区数

删除 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 
--delete --topic first
发送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh 
--broker-list hadoop102:9092 --topic first
>hello world
>atguigu atguigu
消费消息
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh 
--zookeeper hadoop102:2181 --from-beginning --topic first

--from-beginning:会把 first 主题中以往所有的数据都读取出来。根据业务场景选择是 否增加该配置。

查看某个 Topic 的详情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 
--describe --topic first
启动消费者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh 
--zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 
--topic first --consumer.config config/consumer.properties
Kafka API
package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NewProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka 服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop103:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key 序列化
props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
// value 序列化
props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord("first", 
Integer.toString(i), "hello world-" + i));
}
producer.close();
}
}
自定义分区生产者

定义一个类实现 Partitioner 接口,重写里面的方法(过时 API)

package com.atguigu.kafka;
import java.util.Map;
import kafka.producer.Partitioner;
public class CustomPartitioner implements Partitioner {
public CustomPartitioner() {
super();
}
@Override
public int partition(Object key, int numPartitions) {
// 控制分区
return 0;
}
}
Kaka消费者api
#1、地址
bootstrap.servers=node01:9092
#2、序列化 
key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
#3、主题(topic) 需要制定具体的某个topic(order)即可。
#4、消费者组 group.id=test
public class OrderConsumer {
public static void main(String[] args) {
// 1连接集群
Properties props = new Properties(); 
props.put("bootstrap.servers", "node01:9092"); 
props.put("group.id", "test");
//以下两行代码 ---消费者自动提交offset值 
props.put("enable.auto.commit", "true"); 
props.put("auto.commit.interval.ms",  "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
 
kafkaConsumer.subscribe(Arrays.asList("test"));
 
while (true) {
    ConsumerRecords consumerRecords = kafkaConsumer.poll(1000);
    for (ConsumerRecord consumerRecord : consumerRecords) {
        String value = consumerRecord.value();
        int partition = consumerRecord.partition();
        long offset = consumerRecord.offset();
        String key = consumerRecord.key();
        System.out.println("key:" + key + "value:" + value + "partition:" + partition + "offset:" + offset);
    }
}
}
}
指定分区数据进行消费
public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props);
 
        TopicPartition topicPartition = new TopicPartition("test", 0);
        TopicPartition topicPartition1 = new TopicPartition("test", 1);
 
        kafkaConsumer.assign(Arrays.asList(topicPartition, topicPartition1));
 
        while (true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(1000);
            for (ConsumerRecord consumerRecord : consumerRecords) {
                String value = consumerRecord.value();
                int partition = consumerRecord.partition();
                long offset = consumerRecord.offset();
                String key = consumerRecord.key();
                System.out.println("key:" + key + "value:" + value + "partition:" + partition + "offset:" + offset);
            }
            kafkaConsumer.commitSync();
        }
    }   }
Kafka和Flume整合

flume主要是做日志数据(离线或实时)地采集。

 配置flume.conf文件

#为我们的source channel  sink起名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定我们的source收集到的数据发送到哪个管道
a1.sources.r1.channels = c1
#指定我们的source数据收集策略
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/flumedata
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.ignorePattern = ^(.)*\.tmp$
a1.sources.r1.inputCharset = UTF-8
#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
#指定我们的sink为kafka  sink,并指定我们的sink从哪个channel当中读取数据
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

测试

[offcn@bd-offcn-02 kafka]$ bin/kafka-console-consumer.sh 
 
--topic test 
 
--bootstrap-server node01:9092,node02:9092,node03:9092 
 
--from-beginning
 
[root@node01 flume]$ bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.conf --name a1 -Dflume.root.logger=INFO,console
Spark

Sparkcore

Spark官网组件说明

 Spark通用运行简易流程

 

Spark 的驱动器是执行开发程序中的 main 方法的进程。它负责开发人员编写的用来创 建 SparkContext、创建 RDD,以及进行 RDD 的转化 *** 作和行动 *** 作代码的执行。

Spark Executor 是一个工作进程,负责在 Spark 作业中运行任务,任务间相互独立。 Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周 期而存在。

通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式 存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加 速运算。

RDD(d性分布式数据集)(重点)
  • 什么是RDD?RDD有什么特点?能否携带数据?

RDD:叫做d性分布式数据集

特点:不可变,可分区,里面的元素可以并行计算的集合。

不能携带数据,类似于java当中的接口,携带的是元数据。

  • 依赖关系

窄依赖:父RDD的一个分区只能被子RDD的一个分区所依赖=》独生子女

宽依赖:父RDD的一个分区会被子RDD的多个分区所依赖=》超生子女

WordCount执行流程图

  • RDD算子分类

RDD算子一共分为两类,一类是transformation(转换算子),一类是action(执行算子)。

Transformation:转换算子,惰性计算,只做连接不运算,只有遇到action才会带动转换算子进行运算。

 map(一对一)、flatMap(一对多)、filter(一对N(0、1))、join、leftouterJoin、rightouterJoin、fullouterJoin、sortBy、sortByKey、gorupBy、groupByKey、reduceBy、reduceByKey、sample、union、mappatition、mappatitionwithindex、zip、zipWithIndex。

mappatition:

//创建RDD并指定分区数
val rdd: RDD[Int] = sc.parallelize(Array(1,2,3,4),2)
//通过-将分区之间的数据连接
val result: RDD[String] = rdd.mapPartitions(x=>Iterator(x.mkString("-")))
//打印输出
println(result.collect().toBuffer)

 mapPartitionsWithIndex:

val rdd: RDD[Int] = sc.parallelize(1 to 16,4)
//查看每个分区当中都保存了哪些数据
val result: RDD[String] = rdd.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(",")))
//打印输出
result.foreach(println)

 sample、union、join算子:

sample算子:

a.说明

sample(withReplacement, fraction, seed):随机抽样算子,sample主要工作就是为了来研究数据本身,去代替全量研究会出现类似数据倾斜(dataSkew)等问题,无法进行全量研究,只能用样本去评估整体。
withReplacement:Boolean :有放回的抽样和无放回的抽样

fraction:Double:样本空间占整体数据量的比例,大小在[0, 1],比如0.2, 0.65

seed:Long:是一个随机数的种子,有默认值,通常不需要传参

def sampleOps(sc: SparkContext): Unit = {
    val list = sc.parallelize(1 to 100000)
    val sampled1 = list.sample(true, 0.01)
    println("sampled1 count: " + sampled1.count())
    val sampled2 = list.sample(false, 0.01)
    println("sampled2 count: " + sampled2.count())
}

union算子:

a.说明

rdd1.union(rdd2)

相当于sql中的union all,进行两个rdd数据间的联合,需要说明一点是,该union是一个窄依赖 *** 作,rdd1如果有N个分区,rdd2有M个分区,那么union之后的分区个数就为N+M。


join算子:



rdd1.join(rdd2) 相当于sql中的join连接 *** 作

  A(id) a, B(aid) b

  select * from A a join B b on a.id = b.aid


交叉连接: across join

  select * from A a across join B ====>这回产生笛卡尔积


内连接: inner join,提取左右两张表中的交集

  select * from A a inner join B on a.id = b.aid 或者

  select * from A a, B b where a.id = b.aid


外连接:outer join

  左外连接 left outer join 返回左表所有,右表匹配返回,匹配不上返回null

    select * from A a left outer join B on a.id = b.aid

    //leftOutJoin *** 作

  val result1: RDD[(Int, (String, Option[Int]))] = rdd1.leftOuterJoin(rdd2)

  右外连接 right outer join 刚好是左外连接的相反

    select * from A a left outer join B on a.id = b.aid

  //rightOuterJoin
 
    val result2: RDD[(Int, (Option[String], Int))] = rdd1.rightOuterJoin(rdd2)

全连接 full join

  全外连接 full outer join = left outer join + right outer join

//fullOuterJoin
    val result3: RDD[(Int, (Option[String], Option[Int]))] = rdd1.fullOuterJoin(rdd2)
前提:要先进行join,rdd的类型必须是K-V


coalesce算子、reparation(numPartitions):

coalesce(numPartition, shuffle=false): 分区合并的意思

    numPartition:分区后的分区个数

    shuffle:此次重分区是否开启shuffle,决定当前的 *** 作是宽(true)依赖还是窄(false)依赖

原先有100个分区,合并成10分区,或者原先有2个分区,重分区之后变成了4个。

coalesce默认是一个窄依赖算子,如果压缩到1个分区的时候,就要开启shuffle=true,此时coalesce是一个宽依赖算子

如果增大分区,shuffle=false,不会改变分区的个数,可以通过将shuffle=true来进行增大分区

可以用repartition(numPartition)来进行代替= coalesce(numPartitions, shuffle = true)
 

Action:执行算子,带动转换算子运算并将结果进行输出。

 count、collect(将task的计算结果拉回到Driver端)、foreach(不会回收所有task计算结果,原理:将用户传入的参数推送到各个节点上去执行,只能去计算节点找结果)、saveAsTextFile(path)、reduce、foreachPatition、take、first、takeordered(n)。
 

  • map和mapParititons哪个效率高?举例说明

mapPartitions效率高。

例子:将数据保存到数据库,如果是map算子,每保存一个元素,就要连接数据库,保存完,要断开数据库,如果数据量过大,反复的连接断开数据库会给数据库造成很大的压力;反之,使用mapParititons,一次 *** 作的是一个分区当中的数据,没保存一个分区当中的数据,只需要连接断开一次数据库,给数据库造成的压力比较小。

  • reduceByKey和groupByKey哪个效率高?为什么?

reduceByKey效率高,因为它前期进行了预聚合,减少了网络传输。

  • reduceByKey和reduce哪个是执行算子?哪个是转换算子?

reduceByKey是转换算子

Reduce是执行算子。

  • 持久化方式

持久化方式一共分为两种,一种是cache,一种是persist

 Spark中最重要的功能之一是跨 *** 作在内存中持久化(或缓存)数据集。当您持久化RDD时,每个节点将其计算的任何分区存储在内存中,并在该数据集(或从该数据集派生的数据集)上的其他 *** 作中重用这些分区。这使得未来的行动更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

如何进行持久化

  可以使用persist()或cache()方法将RDD标记为持久化。第一次在动作中计算时,它将保存在节点的内存中。Spark的缓存是容错的——如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。

持久化的方法就是rdd.persist()或者rdd.cache()

 

  • cache和persist之间的关系和区别

Cache底层调用的是persist无参构造,而persist无参构造默认是将数据缓存到内存当中。

Persist可以选择缓存机制

共享变量

跨任务支持通用的读写共享变量将是低效的(低效). 然而,Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。

 就是说,为了能够更加高效的在driver和算子之间共享数据,spark提供了两种有限的共享变量,一者广播变量,一者累加器

定义广播变量注意点

变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改

accumulator累加器

accumulator累加器的概念和mr中出现的counter计数器的概念有异曲同工之妙,对某些具备某些特征的数据进行累加。累加器的一个好处是,不需要修改程序的业务逻辑来完成数据累加,同时也不需要额外的触发一个action job来完成累加,反之必须要添加新的业务逻辑,必须要触发一个新的action job来完成,显然这个accumulator的 *** 作性能更佳!

  • join是窄依赖还是宽依赖?

Join既有可能是窄依赖,也有可能是宽依赖。

RDD分区

Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。

分区的决定,就是在宽依赖的过程中才有,窄依赖因为是一对一,分区确定的,所以不需要指定分区 *** 作。
HashPartitioner

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //加载数据
    val rdd = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
    //通过Hash分区
    val result: RDD[(Int, Int)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
    //获取分区方式
    println(result.partitioner)
    //获取分区数
    println(result.getNumPartitions)
}
  • Job

一个action算子就形成了一个job

  • DAG有向无环图

描述的是RDD的执行流程。

怎么形成?

遇到action算子形成DAG有向无环图。

 血统Lineage
RDD只支持粗粒度转换,即在大量记录上执行的单个 *** 作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

 

Spark环境启动命令

启动%SPARK_HOME%binspark-shell.cmd脚本

 Spark分布式环境

sbin/start-all.sh
sbin/stop-all.sh

提交任务&执行程序

[root@node01 spark]# bin/spark-submit 
--class org.apache.spark.examples.SparkPi 
--master spark://node01:7077 
--driver-memory 1g 
--executor-memory 1g 
--executor-cores 2 
--queue default 
./examples/jars/spark-examples_2.11-2.4.7.jar 
100

Spark分布式HA环境安装

 配置基于Zookeeper的ha,需要在spark-env.sh中添加一句话:

注释掉如下内容:
#SPARK_MASTER_HOST=node01
export SPARK_MASTER_PORT=7077
添加上如下内容:配置的时候保证下面语句在一行,否则配置不成功,每个-D参数使用空格分开
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181
-Dspark.deploy.zookeeper.dir=/spark"

因为ha不确定master在node01上面启动,所以将

export SPARK_MASTER_HOST=node01注释掉,同步spark-env.sh到其它机器,重启spark集群,node1、node02启动master。

提交任务&执行程序:

[root@node01 spark]# bin/spark-submit 
--class org.apache.spark.examples.SparkPi 
--master spark://node01:7077,node02:7077 
--driver-memory 1g 
--executor-memory 1g 
--executor-cores 2 
--queue default 
./examples/jars/spark-examples_2.11-2.4.7.jar 
100

动态上下线slave 

spark]# sbin/start-slave.sh node01:7077 -c 4 -m 1024M
spark]# sbin/stop-slave.sh node01:7077 -c 4 -m 1024M

Spark分布式Yarn环境

修改hadoop配置文件yarn-site.xml

[root@node01 hadoop]$ vi yarn-site.xml
        
        
                yarn.nodemanager.pmem-check-enabled
                false
        
        
        
                yarn.nodemanager.vmem-check-enabled
                false
        

修改spark-env.sh

[root@node01 conf]# vi spark-env.sh
YARN_CONF_DIR=/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
HADOOP_CONF_DIR=/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

client模式

[root@node01 spark]# bin/spark-submit 
--class org.apache.spark.examples.SparkPi 
--master yarn 
--deploy-mode client 
./examples/jars/spark-examples_2.11-2.4.7.jar 
100

 cluster模式

[root@node01 spark]# bin/spark-submit 
--class org.apache.spark.examples.SparkPi 
--master yarn 
--deploy-mode cluster 
./examples/jars/spark-examples_2.11-2.4.7.jar 
100
Spark代码编写

Sparkcore编写 WordCount 程序

package com.atguigu
import org.apache.spark.{SparkConf, SparkContext}
object WordCount{
 def main(args: Array[String]): Unit = {
//1.创建 SparkConf 并设置 App 名称
 val conf = new SparkConf().setAppName("WC")
//2.创建 SparkContext,该对象是提交 Spark App 的入口
 val sc = new SparkContext(conf)
 //3.使用 sc 创建 RDD 并执行相应的 transformation 和 action
 sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 
1)).reduceByKey(_+_, 1).sortBy(_._2, false).saveAsTextFile(args(1))
//4.关闭连接
 sc.stop()
 }
}

打包到集群测试

bin/spark-submit 
--class WordCount 
--master spark://hadoop102:7077 
WordCount.jar 
/word.txt 
/out

高效写入数据库

def saveInfoMySQLByForeachPartition(rdd: RDD[(String, Int)]): Unit = {
	rdd.foreachPartition(partition => {
		//这是在partition内部,属于该partition的本地
		Class.forName("com.mysql.jdbc.Driver")
		val url = "jdbc:mysql://localhost:3306/test"
		val connection = DriverManager.getConnection(url, "mark", "sorry")
		val sql =
			"""
			  |insert into wordcounts(word, `count`) Values(?, ?)
			  |""".stripMargin
		val ps = connection.prepareStatement(sql)
 
		partition.foreach{case (word, count) => {
			ps.setString(1, word)
			ps.setInt(2, count)
			ps.execute()
		}}
		ps.close()
		connection.close()
	})
}
Spark SQL

它提供了 2 个编程抽象:Dataframe 和 DataSet,并且作为分布式 SQL 查询引擎的作用。

Dataframe比RDD多了一个表头信息(Schema:约束信息)

Dataset

相对于RDD,Dataset提供了强类型支持(泛型),也是在RDD的每行数据加了类型约束,下图1-7是官网对于dataset的表述。

  • sparkSql查询风格

一共分为两种:一种是DSL风格,一种是SQL风格。

DSL:利用算子进行数据分析,对编程能力有一定的要求。

SQL:利用sql语句进行数据的分析。

  • schema约束信息

指的是结构化信息。

  • SparkCore和SparkSql

SparkCore:底层抽象:RDD 程序入口:SparkContext

SparkSql:底层抽象:Dataframe和DataSet 程序入口:SparkSession

  • RDD、Dataframe、DataSet

Dataframe=RDD-泛型+schema+sql+优化

DataSet=RDD+schma+sql+优化

文件保存选项

SparkSQL基本编程

SparkSession的构建

val spark = SparkSession.builder()
	         .appName("SparkSQLOps")
	         .master("local[*]")
           //.enableHiveSupport()//支持hive的相关 *** 作
	          .getOrCreate()

Dataframe的构建方式

package chapter1
 
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataframe, SparkSession}
 
object Create_Dataframe {
    def main(args: Array[String]): Unit = {
        //创建程序入口
        val spark: SparkSession = SparkSession.builder().appName("createDF").master("local[*]").getOrCreate()
        //调用sparkContext
        val sc: SparkContext = spark.sparkContext
        //设置控制台日志输出级别
        sc.setLogLevel("WARN")
        //从数据源创建Dataframe
        val personDF: Dataframe = spark.read.json("examples/src/main/resources/people.json")
        //展示数据
        personDF.show()
    }
}

从RDD进行转换:

        val personDF: Dataframe = personRDD.toDF("id","name","age")

通过反射创建Dataframe:

        val personDF: Dataframe = personRDD.toDF()

动态编程

val df = spark.createDataframe(row, schema)

 
        val list = List(
            new Student(1, "王盛芃", 1, 19),
            new Student(2, "李金宝", 1, 49),
            new Student(3, "张海波", 1, 39),
            new Student(4, "张文悦", 0, 29)
        )
        import spark.implicits._
        val ds = spark.createDataset[Student](list)

Row:代表的是二维表中的一行记录,或者就是一个Java对象

数据加载

spark.read.format(数据文件格式).load(path)

//导包
    import spark.implicits._
//第一种方式
//加载json文件
val personDF: Dataframe = spark.read.format("json").load("E:\data\people.json")
//加载parquet文件
val personDF1: Dataframe = spark.read.format("parquet").load("E:\data\people.parquet")
//加载csv文件,csv文件有些特殊,如果想要带上表头,必须调用option方法
val person2: Dataframe = spark.read.format("csv").option("header","true").load("E:\data\people.csv")
//加载数据库当中的表
val personDF3: Dataframe = spark.read
    .format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306/bigdata")
    .option("user", "root")
    .option("password", "root")
    .option("dbtable", "person")
    .load()

spark.read.json(path)

//第二种方式
//加载json文件
val personDF4: Dataframe = spark.read.json("E:\data\people.json")
//加载parquet文件
val personDF5: Dataframe = spark.read.parquet("E:\data\people.parquet")
//加载csv文件,csv文件有些特殊,如果想要带上表头,必须调用option方法
val person6: Dataframe = spark.read.option("header","true").csv("E:\data\people.csv")
//加载数据库当中的表
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "root")
val personDF7: Dataframe = spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata", "person", properties)
数据落地
    //第一种方式
    //保存为json文件
    personDF.write.format("json").save("E:\data\json")
    //保存为parquet文件
    personDF.write.format("parquet").save("E:\data\parquet")
    //保存为csv文件,想要带上表头,调用option方法
    personDF.write.format("csv").option("header","true").save("E:\data\csv")
    //保存为数据库当中的表
    personDF.write
        .format("jdbc")
        .option("url", "jdbc:mysql://localhost:3306/bigdata")
        .option("user", "root")
        .option("password", "root")
        .option("dbtable", "person").save()

 //第二种方式
//保存为parque文件
personDF.write.parquet("E:\data\parquet")
//保存为csv文件
personDF.write.option("header", "true").csv("E:\data\csv")
//保存为json文件
personDF.write.format("json").save("E:\data\json")
//保存为数据库的表
val props = new Properties()
props.put("user","root")
props.put("password","root")
personDF.write.jdbc("jdbc:mysql://localhost:3306/bigdata","person",props)
SparkSQL与Hive整合

1、需要引入hive的hive-site.xml,添加classpath目录下面即可,或者放到$SPARK_HOME/conf

2、为了能够正常解析hive-site.xml中hdfs路径,需要将hdfs-site.xml和core-site.xml到classpath下面

package chapter5
 
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
 
object Hive_Support {
  def main(args: Array[String]): Unit = {
    //创建sparkSql程序入口
    val spark: SparkSession = SparkSession.builder()
      .appName("demo")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()
    //调用sparkContext
    val sc: SparkContext = spark.sparkContext
    //设置日志级别
    sc.setLogLevel("WARN")
    //导包
    import spark.implicits._
    //查询hive当中的表
    spark.sql("show tables").show()
    //创建表
    spark.sql("CREATE TABLE person (id int, name string, age int) row format delimited fields terminated by ' '")
    //导入数据
    spark.sql("load data local inpath'./person.txt' into table person")
    //查询表当中数据
    spark.sql("select * from person").show()
  }
}
用户自定义函数
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, SparkSession}
 
object UDF_Demo {
  def main(args: Array[String]): Unit = {
    //创建sparkSql程序入口
    val spark: SparkSession = SparkSession.builder().appName("demo").master("local[*]").getOrCreate()
    //调用sparkContext
    val sc: SparkContext = spark.sparkContext
    //设置日志级别
    sc.setLogLevel("WARN")
    //导包
    import spark.implicits._
    //加载文件
    val personDF: Dataframe = spark.read.json("E:\data\people.json")
    //展示数据
    //personDF.show()
    //注册成为一张表
    personDF.createOrReplaceTempView("t_person")
    //赋予什么功能
    val fun = (x:String)=>{
      "Name:"+x
    }
    //没有addName这个函数,就注册它
    spark.udf.register("addName",fun)
    //查询
    spark.sql("select name,addName(name) from t_person").show()
//释放资源
spark.stop()
  }}

开窗函数

row_number() over (partitin by XXX order by XXX)

rank() 跳跃排序,有两个第二名是,后边跟着的是第四名

dense_rank()  连续排序,有两个第二名是,后边跟着的是第三名

row_number() 连续排序,两个值相同排序也是不同
 

package com.zg.d03
 
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
 
case class StudentScore(name:String,clazz:Int,score:Int)
object SparkSqlOverDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("sparksqlover")
    val sc = new SparkContext(conf)
    val spark = SparkSession.builder().config(conf).getOrCreate()
 
    val arr01 = Array(("a",1,88),
      ("b",1,78),
      ("c",1,95),
      ("d",2,74),
      ("e",2,92),
      ("f",3,99),
      ("g",3,99),
      ("h",3,45),
      ("i",3,53),
      ("j",3,78))
 
    import spark.implicits._
    val scoreRDD = sc.makeRDD(arr01).map(x=>StudentScore(x._1,x._2,x._3)).toDS
    scoreRDD.createOrReplaceTempView("t_score")
    //查询t_score表数据
    spark.sql("select * from t_score").show()
    //使用开窗函数查找topN,rank() 跳跃排序,有两个第二名是,后边跟着的是第四名
    spark.sql("select name,clazz,score, rank() over( partition by clazz order by score desc ) rownum from t_score ").show()
    //讲使用开窗函数后的查询结果作为一张临时表,这个临时表有每个班的成绩排名,再取前三名
    spark.sql("select * from (select name,clazz,score, rank() over( partition by clazz order by score desc ) rownum from t_score) t1 where rownum <=3 ").show()
  }
}
SQL炸裂函数
	//SQL风格 *** 作
        
  
SparkStreaming

第二代的流式处理框架,短时间内生成mirco-batch,提交一次作业。准实时,延迟略高,秒级或者亚秒级延迟。

SparkStreaming是SparkCore的api的一种扩展,使用DStream(discretized stream or DStream)作为数据模型,基于内存处理连续的数据流,本质上还是RDD的基于内存的计算。

DStream,本质上是RDD的序列。

  • 实时计算和准实时计算

实时计算:以事件为驱动。来一条数据就驱动着立刻处理一条数据。

准实时计算:以时间为驱动。不管是否接收到数据,达到时间节点再处理。

SparkStreaming 架构

 

SparkStreaming算子

主要学习transform,updateByKey,window函数。

SparkStreaming基本编程 入口类StreamingContext
object SparkStreamingWordCountOps {
    def main(args: Array[String]): Unit = {
        
        val conf = new SparkConf()
                    .setAppName("SparkStreamingWordCount")
                    .setMaster("local[*]")
        val duration = Seconds(2)
        val ssc = new StreamingContext(conf, duration)
		//业务
        //为了执行的流式计算,必须要调用start来启动
        ssc.start()
        //为了不至于start启动程序结束,必须要调用awaitTermination方法等待程序业务完成之后调用stop方法结束程序,或者异常
        ssc.awaitTermination()
    }
}

 awaitTermination

要想持续不断的执行streaming计算,就必须要调用awaitTermination方法,以便driver能够在后台常驻

监听本地

无法读取手动拷贝,或者剪切到指定目录下的文件,只能读取通过流写入的文件。

SparkStreaming整合HDFS

正常情况下,我们可以读取到通过put上传的文件,还可以读取通过cp拷贝的文件,但是读取不了mv移动的文件。

读取文件的这种方式,没有额外的Receiver消耗线程资源,所以可以指定master为local

object SparkStreamingHDFS {
 
    def main(args: Array[String]): Unit = {
 
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
 
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
 
        Logger.getLogger("org.spark_project").setLevel(Level.WARN)
 
 
 
        val conf = new SparkConf()
 
            .setAppName("SparkStreamingHDFS")
 
            .setMaster("local")
 
        val duration = Seconds(2)
 
        val ssc = new StreamingContext(conf, duration)
 
        //读取local中数据 --->需要通过流的方式写入
 
//        val lines = ssc.textFileStream("file:///E:/data/monitored")
 
 
 
        //hdfs
 
        val lines = ssc.textFileStream("hdfs://node01:9000/data/spark")
 
 
 
        lines.print()
 
 
 
        ssc.start()
 
        ssc.awaitTermination()
 
    }
 
}
SparkStreaming整合Kafka

Receiver方式:

Spark去kafka当中抽取数据,并将数据保存在executor的内存当中,但是往往会因为底层的计算失败而造成数据的丢失,有解决办法,开启WAL预写日志,将kafka当中的数据不仅保存在executor的内存中,还要保存在WAL预写日志当中一份,这样,加入内存中的数据丢失,还能从WAL当中进行恢复,但是这样造成了数据的冗余。

Direct模式:

Spark每隔批次间隔就去kafka当中,读取每个topic下面每个partition当中最新的偏移量范围,数据依然保存在kafka当中,如果计算失败,只要kafka保存数据时间足够长,就可以进行无限次的恢复,不会造成数据的冗余。

直连模式特点:batch time 每隔一段时间,去kafka读取一批数据,然后消费

         简化并行度,rdd的分区数量=topic的分区数量

         数据存储于kafka中,没有数据冗余

         不存在单点问题

         效率高

         可以实现仅消费一次的语义 exactly-once语义


整合编码

earliest 
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 
latest 
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 

(1)从kafka读取数据

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object StreamingFromKafkaOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setMaster("local")
            .setAppName("StreamingFromKafkaOps")
//            .set("spark.serializer", classOf[KryoSerializer].getName)
//            .registerKryoClasses(Array(classOf[ConsumerRecord[String, String]]))
        //两次流式计算之间的时间间隔,batchInterval
        val batchDuration = Seconds(2) // 每隔2s提交一次sparkstreaming的作业
        val ssc = new StreamingContext(conf, batchDuration)
        val topics = Set("hadoop")
        val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "spark-kafka-grou-0817",
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> "false"
        )
        
        val message:InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
                                                                    ConsumerStrategies.Subscribe(topics, kafkaParams))
//        message.print()//直接打印有序列化问题
        message.foreachRDD((rdd, bTime) => {
            if(!rdd.isEmpty()) {
                println("-------------------------------------------")
                println(s"Time: $bTime")
                println("-------------------------------------------")
                rdd.foreach(record => {
                    println(record)
                })
            }
        })
        ssc.start()
        ssc.awaitTermination()
    }
}
 

transform是一个transformation算子,转换算子。


import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{
DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Advertising_ranking {
    def main(args: Array[String]): Unit = {
        //创建程序入口
        val conf: SparkConf = new SparkConf().setAppName("advertising").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val ssc = new StreamingContext(sc,Seconds(5))
        //设置日志级别
        sc.setLogLevel("WARN")
        //接收数据
        val data: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
        //切分数据
        val splidata: DStream[String] = data.flatMap(_.split(" "))
        //每条点击流日志记为1次
        val pageAndOne: DStream[(String, Int)] = spliData.map((_,1))
        //聚合相同的点击流
        val pageAndCount: DStream[(String, Int)] = pageAndOne.reduceByKey(_+_)
        //遍历DStream中封装的RDD进行 *** 作
        val resultSorted: DStream[(String, Int)] = pageAndCount.transform(rdd => {
            //对RDD当中的数据进行倒叙排名
            val sorted: RDD[(String, Int)] = rdd.sortBy(_._2, false)
            //从排名数据中取top3
            val topThree: Array[(String, Int)] = sorted.take(3)
            //打印输出
            topThree.foreach(println)
            //因为transform需要返回值
            sorted
        })
        //打印整体排名情况
        resultSorted.print()
        //启动sparkStreaming
        ssc.start()
        //让其一直启动,等待程序关闭
        ssc.awaitTermination()
    }
}
updateStateByKey
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
 
object UpdateStateByKey_Demo {
  def updateFunc(currentValue:Seq[Int],historyValue:Option[Int]): Option[Int] = {
    val result: Int = currentValue.sum+historyValue.getOrElse(0)
    Some(result)
  }
  def main(args: Array[String]): Unit = {
    //创建sparkStreaming程序入口
    val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(5))
    //设置日志级别
    sc.setLogLevel("WARN")
    //设置检查点,用来保存历史状态
    ssc.checkpoint("./999")
    //接收数据
    val file: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
    //切分
    val spliFile: DStream[String] = file.flatMap(_.split(" "))
    //每个单词记为1次
    val wordAndOne: DStream[(String, Int)] = spliFile.map((_,1))
    //进行有状态转化 *** 作
    val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
    //打印输出
    wordAndCount.print()
    //开启sparkStreaming
    ssc.start()
    //让其一直开启,等待关闭
    ssc.awaitTermination()
  }
}
Window
object  WindowOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("WindowOps")
        //两次流式计算之间的时间间隔,batchInterval
        val batchDuration = Seconds(2) // 每隔2s提交一次sparkstreaming的作业
        val ssc = new StreamingContext(conf, batchDuration)
        val lines = ssc.socketTextStream("node01", 9999)
        val words = lines.flatMap(line => line.split("\s+"))
        val pairs = words.map(word => (word, 1))
        val ret = pairs.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4))
        ret.print
        ssc.start()
        ssc.awaitTermination()
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575514.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存