Spark 笔记

Spark 笔记,第1张

Spark 笔记 初识

是用于大数据处理的集群计算框架,Spark并没有以 MapReduce 作为执行引擎,而是使用了它自己的分布式运行环境在集群上执行工作。Spark最突出的表现在于它能够将作业与作业之间产生的大规模的工作数据集存储在内存中。即使你不需要在内存中进行缓存,Spark还会因为其出色的 DAG 引擎和用户体验而具有吸引力。与 MapReduce 不同,Spark 的 DAG 引擎可以处理任意 *** 作流水线,并为用户将其转换为单个作业。Spark 还是用于构建分析工具的出色平台。为此,Apache Spark项目包括用于处理机器学习(MLlib)、图算法(Graphx)、流逝计算(Spark Streaming)和 SQL查询(Spark SQL)等模块。

Spark 应用、作业、阶段和任务

Spark 作业是由任意的多阶段(stages)有向无环图(DAG)构成,其中每个阶段大致相当于 MapReduce 中的 map 阶段或 reduce 阶段。这些阶段又被运行环境分解为多个任务(task),任务并行运行在分布于集群中的 RDD 分区上,就像 MapReduce 中的任务一样。
Spark 是用 Scala 实现的,而 Scala 作为基于 JVM 的语言,与 Java 有着良好集成关系。
Spark 通过一个名为 PySpark 的 API 来为 Python 提供语言支持。

d性分布式数据集(RDD)

d性分布式数据集(RDD) 是所有 Spark 程序的核心。

创建

RDD 的创建有三种方法:来自一个内存中的对象集合(也称为并行化为一个集合);使用外部的存储器(如HDFS)中的数据集;对现有的RDD进行转换。
第一种方法适用于对少量的输入数据进行并行的 CPU 密集型计算。
例如,对数字1-10运行独立运算

val params = sc.parallelize(1 to 10)
val result = params.map(performExpensiveComputatuon)

第二种方法是创建一个对外部数据集的引用

val text: RDD[String] = sc.textFile(inputPath)

第三种方法是通过现有 RDD 的转换来创建 RDD
Spark 为 RDD 提供了两大类 *** 作:转换(transformation)和 动作(action)。转换是从现有的 RDD 生成新的 RDD,而动作则触发对 RDD 的计算并对计算结果执行某种 *** 作,要么返回给用户,要么保存到外部存储器中。
例如:用于小写字母化文本文件中的文本行

val text = sc.textFile(inputPath)
val lower: RDD[String] = text.map(_.toLowerCase())
lower.foreach(println(_))

要判断一个 *** 作是转换还是动作,我们可以观察其返回类型:如果返回类型是 RDD,那么它是一个转换,否则就是一个动作。

序列化

在使用 Spark 时,要从两个方面来考虑序列化:数据序列化和函数序列化(或称为闭包函数)

数据序列化
默认情况下,Spark 在通过网络将数据从一个 executor 发送到另一个 executor 时,或者以序列化的形式缓存(持久化)数据时,所使用的都是 Java 序列化机制。Java序列化使用的类实现了 java.io.Serializable 或 java.io.Externalizable 接口,但从性能或大小来看,这种做法效率并不高。
使用 Kryo 序列化机制对于大多数 Spark 程序是一个更好的选择。Kryo 是一个高效的通用的 Java 序列化库。要想使用 Kryo 序列化机制,需要在你的驱动器程序的 SparkConf 中设置 spark.sertallizer 属性,如下

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

函数序列化
通常函数的序列化会 “谨守本份”;Scala 中的函数都可以通过标准的 Java 序列化机制来序列化,这也是 Spark 用于向远程 executor 节点发送函数的手段。 Spark 作业运行机制

运行 Spark 作业会发生些什么呢?在最高层,它有两个独立的实体:driver 和 executor。driver 负责托管应用(SparkContext)并为作业调度任务。executor 专属于应用,它在应用运行期间运行,并执行该应用的任务。通常,driver 作为一个不由集群管理器(cluster manager)管理的客户端来运行,而 executor 则运行在集群的计算机上。

作业提交


当对 RDD 执行一个动作(比如 count())时,会自动提交一个 Spark 作业。从内部看,它导致对 SparkContext 调用 runJob(),然后将调用传递给作为 driver 的一部分运行的调度程序。调度程序由两部分组成:DAG 调度程序和任务调度程序。DAG 调度程序把作业分解为若干阶段,并由这些阶段构成一个 DAG。任务调度程序则负责把每个阶段中的任务提交到集群。

代码
把文本拆分为字段

scala> val records = lines.map(_.split("t"))
records:orgs.apache.spark.rdd.RDD[Array[String]] = MappedRDD at map at 
:14

应用过滤器来滤除所有的不良记录:

scala> val filtered = records.filter(rec => (rec(1) != 9999"&& rec(2).matches("[01459]")))
filtered: orgs.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[3]
at filter at : 16

使用 Spark 找出最高温度的 Scala程序

import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext.{SparkConf, SparkContext}

object MaxTemperture{
	def main(args: Array[String]){
		val conf = new SparkConf().setAppName("Max Temperture")
		val sc = new SparkContext(conf)
		
		sc.textFile(args(0))
		  .map(_.split("t"))
		  .filter(rec => (rec(1) != 9999" && rec(2).matches("[01459]")))
		  .map(rec => (rec(0).toInt, rec(1).toInt))
		  .reduceByKey((a,b) => Math.max(a, b))
		  .saveAsTextFile(args(1))
	}
}

使用 PySpark 找出最高温度的 Python 程序

from pyspark import SparkContext
import re,sys

sc = SparkContext("local","	Max Temperture")
sc.textFile(sys.args(1)) 
		  .map(lambda s: s.split("t"))
		  .filter(lambda rec: rec => (rec(1) != 9999" && re.matches("[01459]", rec(2))) 
		  .map(lambda rec: (int(rec(0), rec(1)))
		  .reduceByKey(max)
		  .saveAsTextFile(args(2))

对文本文件计算词频统计分布图(每行文本只有一个单词)

val hist: Map[Int, Long] = sc.textFile(inputPath)
	.map(word => (word.toLowerCase(), 1))
	.reduceByKey((a, b) => a + b)
	.map(_.swap)
	.countByKey()

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705087.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存