- Spark机器学习实战-专栏介绍
- Spark机器学习实战-问题汇总[持续更新]
- Spark机器学习实战-Spark的安装及使用
- Spark机器学习实战-使用Spark进行数据处理和数据转换
文章目录
- 系列文章目录
- 前言
- 一、Apache Spark的基础知识
- RDD
- DataFrame
- 二、安装及使用Spark
- 三、Spark编程模型及Spark python编程入门
- SparkContext类与SparkConf类
- 编写第一个Spark python应用程序:计算pi
- 总结
前言
本文主要介绍了Apache的基础知识及Spark环境的搭建和运行。
一、Apache Spark的基础知识
几年前,Spark被其创造者定义成:A fast and general engine for large-scale data processing(用于大规模数据处理的快速通用引擎)。
其中"Fast"意味着它比以前的大数据处理方法更快(例如Hadoop的Mapreduce)。更快的秘诀在于Spark在内存(RAM)上运行,这使得处理速度比在磁盘上快的多。
"General"部分意味着它可以用于多种用途,例如运行分布式SQL、创建数据管道、将数据存储到数据库、运行机器学习算法、处理图形、数据流等等。现在随着Apache Spark项目的发展,Spark几乎可以做数据科学或机器学习工作流程中的所有事情,我们也可以将Spark框架单独应用到深度学习这样的端到端项目中。
“Large-scale"意味着这是一个可以完美处理大量数据的框架,我们过去称之为"大数据”。
RDDApache Spark的核心抽象和起源是d性分布式数据集(RDD)。
RDD是可以并行 *** 作且具有一定容错性的元素集合。你可以在驱动程序中并行创建现有集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据源。
关于Spark其中有个非常重要的点是所有的Transformation *** 作都是不立即生效的,换句话说,Spark不会立即计算它的结果。相反,Spark只是记录下来对某些基础数据(例如文件)的Transformation *** 作。这些Transformation *** 作只会在Action需要将结果返回给驱动程序的时候才进行计算 *** 作。
默认情况下,每个Transformation后的 RDD 可能会在模每次对其运行 *** 作时重新计算。但是,你也可以使用Persist(或Cache)方法将 RDD 持久化在内存中,在这种情况下,Spark 会将元素保留在集群上,以便下次查询时更快地访问它。Spark还支持在磁盘上持久化 RDD 或跨多个节点复制的 *** 作。
DataFrame从 Spark 2.0.0 开始,DataFrame 是一个被组织成带有字段名的数据集【表格数据】。它在概念上等同于关系数据库中的表或 R/Python 中的 DataFrame,但在底层进行了更丰富的优化。
如下图所示,DataFrames 可以从多种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有 RDD。
简而言之,Dataframes API 是 Spark creators 中的一种方法,可让你在框架中轻松使用数据。它们与 Pandas Dataframes 或 R Dataframes 非常相似,但有几个优点。第一个当然是它们可以被缓存在一个集群的内存里,因此它们可以处理大量数据,第二个是这种数据结构是经过特殊优化的,可以适配分布式环境。
在Spark发展起初,将 Spark 与 Scala 或 Java 一起使用要快得多。随着python语言越来越普及以及Spark整个生态的发展,使用 DF API,这不再是问题,现在我们可以在 R、Python、Scala 或 Java 中使用Spark获得相同的性能。
负责此优化的是 Catalyst。你可以把它想象成一个“巫师”,它会接受你的查询( 类似 SQL 的查询,它们也会被并行化)和 *** 作并针对分布式计算进行优化。
如上图所示,这个过程并不是那么简单,但作为程序员的你甚至不会注意到它,只是它一直在那里帮助你。 在 Spark 3.0 中,新增了一个“自适应查询执行”(AQE)的东西,它将根据在查询执行过程中收集到的统计信息重新优化和调整查询计划。这将对性能产生巨大影响,例如,假设我们正在运行查询
SELECT max(i) FROM table GROUP BY column
如果没有AQE,Spark将启动五个任务来进行最终数据的聚合:
但是使用 AQE,Spark 会将上图中间的三个小分区合并为一个,因此,最终聚合现在只需要执行三个任务而不是五个:
注意::
- "$"符号表示在shell中运行(但是不要复制该符号)
- “>>>”符号表示 Python shell(不要复制该符号)
Spark能通过内置的单机集群调度器在本地运行。此时,所有的Spark进程运行在同一个java虚拟机中。这实际上构造了一个独立、多线程版本的Spark环境。本地模式很适合程序的原型设计、开发、调试和测试。同样它也适用于在单机上进行多核并行计算的实际场景。
Spark的本地模式和集群模式完全兼容,本地编写和测试过的代码仅需要增加少许设置便能在集群上运行。
- 下载预编译包
首先第一步访问Spark项目的下载页面:https://spark.apache.org/downloads.html。一版选择最新的Spark版本包
如上图所示,各个版本的版本包及源代码的github地址可以从Spark项目的下载页面找到。为了访问HDFS(Hadoop分布式文件系统)以及标准或定制的Hadoop输入源,Spark的编译版本要与Hadoop的版本对应。如上图所示,上面下载页面提供了针对Hadoop2.7的预编译版本。除非你想构建针对特定版本hadoop的Spark,否则还是建议你通过下载页面的推荐链接下载预编译的二进制包。在安装Spark之前,还要确保电脑上已经安装好了Java 8+以及anaconda。例如作者选了一台linux服务器,下载了spark-3.2.1预编译包及对应的hadoop3.3的预编译包,Java版本java1.8.0_251,python3.7。
- 解压并创建软链
下载完上述版本的包后,解压缩并将其移动到你的 /opt 文件夹下:
$ tar -xzf spark-3.2.1-bin-hadoop3.3.tgz
$ mv spark-3.2.1-bin-hadoop3.3 /opt/spark-3.2.1-bin-hadoop3.3
创建软链
$ ln -s /opt/spark-3.2.1-bin-hadoop3.3 /opt/spark̀
- 添加环境变量
最后,告诉你的 bash(或 zsh 等)在哪里可以找到 spark。为此,通过在 ~/.bashrc(或 ~/.zshrc)文件中添加以下行来配置 $PATH 变量:
export SPARK_HOME=/opt/spark
export PATH=$SPARK_HOME/bin:$PATH
- 安装pysaprk
这边使用清华源,下载快点
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
- 在IDE中使用PySpark
有时你需要一个完整的 IDE 来创建更复杂的代码,而 PySpark 默认不在 sys.path 上,但这并不意味着它不能用作常规库。你可以通过在运行时将 PySpark 添加到 sys.path 来解决此问题。findspark可以做到这点,可以输入如下命令:
$ pip install findspark
然后在你的 IDE(我使用的PyCharm)上初始化 PySpark,只需在代码中输入:
import findspark
findspark.init()
三、Spark编程模型及Spark python编程入门
SparkContext类与SparkConf类
任何Spark程序的编写都是从SparkContext开始的。SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的各种参数(比如说主节点的URL)。
初始化后,我们便可以用SparkContext对象所包含的各种方法来创建或者 *** 作分布式数据集和共享变量。Spark shell可以自动完成上述初始化:
若是用python代码来实现的话。可以参考下面的代码:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
conf = SparkConf() \
.setAppName('First Application') \
.setMaster("local[4]")
sc = SparkContext(conf=conf)
上述代码会创建一个四线程的SparkContext对象,并将其相应的任务命名为“First Application”。
编写第一个Spark python应用程序:计算pi如下所示,我们编写了一个计算Pi的应用程序:
import findspark
findspark.init()
import random
from pyspark import SparkContext
sc = SparkContext(appName="EstimatePi")
def inside(p):
x, y = random.random(), random.random()
return x*x + y*y < 1
NUM_SAMPLES = 1000000
count = sc.parallelize(range(0, NUM_SAMPLES)) \
.filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
sc.stop()
本文首先介绍了Spark的基础知识以及RDD和DataFrame这些核心概念,然后演示了如何下载Spark二进制版本并搭建一个本地单机模式下的开发环境,最后通过Python语言来编写第一个Spark程序。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)