准备工作
Spark是安装在 hadoop集群之上的一个计算框架,为了兼容Hadoop2.7.1, Spark版本为2.1.0版本,安装文件为 spark2.1.0-bin-without-hadoop tgz下面为 Spark的安装步骤
spark安装
官网下载地址:Downloads | Apache Spark
我下载的是2.1.0版本的
记得一定要看版本对应!!!!不兼容的版本会出很多错误 下载后,执行如下命令进行安装:$ sudo tar -zxf ~/下载/spark-2.1.0-bin-without-hadoop.tgz -C /usr/local/ //将压缩包进行解压到/usr/local/文件夹下 $ cd /usr/local //cd到local目录下 $ sudo mv ./spark-2.1.0-bin-without-hadoop/ ./spark //将解压好的spark文件夹进行改名 //进行授权 $ sudo su //切换到用户root $ sudo chmod -R a+w /usr/local/spark //设置用户权限 $ su hadoop //切换回原来的用户 hadoop是我的用户名
安装后,需要在 ./conf/spark-env.sh 中修改 Spark 的 Classpath,执行如下命令拷贝一个配置文件:
$ cd /usr/local/spark //切换路径 $ cp ./conf/spark-env.sh.template ./conf/spark-env.sh //复制文件 $ vim ./conf/spark-env.sh //编辑配置文件,,,,,书上的教程是用 gedit (sudo gedit conf/spark-env.sh)打开配置文件的,但我打开是一片空白。gedit命令我用不了,所以出现错误时,可以尝试用 vim 或者 vi
编辑 ./conf/spark-env.sh ,在最后面加上如下一行:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
保存后,Spark 就可以启动、运行了
如果安装没有成功,想删除spark时
用 sudo rm -r spark
注意要进入local的目录下
运行 Spark 示例
注意,必须安装 Hadoop 才能使用 Spark,但如果使用 Spark 过程中没用到 HDFS,不启动 Hadoop 也是可以的。此外,接下来教程中出现的命令、目录,若无说明,则一般以 Spark 的安装目录(/usr/local/spark)为当前路径,请注意区分。
在 ./examples/src/main 目录下有一些 Spark 的示例程序,有 Scala、Java、Python、R 等语言的版本。我们可以先运行一个示例程序 SparkPi(即计算 π 的近似值),执行如下命令:
$ cd /usr/local/spark $ ./bin/run-example SparkPi
执行时会输出非常多的运行信息,输出结果不容易找到,可以通过 grep 命令进行过滤(命令中的 2>&1 可以将所有的信息都输出到 stdout 中,否则由于输出日志的性质,还是会输出到屏幕中):
$ ./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"
过滤后的运行结果如下图所示,可以得到 π 的 5 位小数近似值
Python 版本的 SparkPi 则需要通过 spark-submit 运行:
$ ./bin/spark-submit examples/src/main/python/pi.py
通过 Spark Shell 进行交互分析
Spark shell 提供了简单的方式来学习 API,也提供了交互的方式来分析数据。Spark Shell 支持 Scala 和 Python
Scala
Scala 是一门现代的多范式编程语言,志在以简练、优雅及类型安全的方式来表达常用编程模式。它平滑地集成了面向对象和函数语言的特性。Scala 运行于 Java 平台(JVM,Java 虚拟机),并兼容现有的 Java 程序。
Scala 是 Spark 的主要编程语言,如果仅仅是写 Spark 应用,并非一定要用 Scala,用 Java、Python 都是可以的。使用 Scala 的优势是开发效率更高,代码更精简,并且可以通过 Spark Shell 进行交互式实时查询,方便排查问题。
执行如下命令启动 Spark Shell
./bin/spark-shell
启动成功后如图所示,会有 “scala >” 的命令提示符。
退出spark命令是":quit" scala> :quit基础 *** 作
Spark 的主要抽象是分布式的元素集合(distributed collection of items),称为RDD(Resilient Distributed Dataset,d性分布式数据集),它可被分发到集群各个节点上,进行并行 *** 作。RDDs 可以通过 Hadoop InputFormats 创建(如 HDFS),或者从其他 RDDs 转化而来。
我们从 ./README 文件新建一个 RDD,代码如下(本文出现的 Spark 交互式命令代码中,与位于同一行的注释内容为该命令的说明,命令之后的注释内容表示交互式输出结果):
1: val textFile = sc.textFile("file:///usr/local/spark/README.md") 2: // textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at:24
代码中通过 “file://” 前缀指定读取本地文件。Spark shell 默认是读取 HDFS 中的文件,需要先上传文件到 HDFS 中,否则会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/README.md”的错误。
上述命令的输出结果如下图所示:
RDDs 支持两种类型的 *** 作
- actions: 在数据集上运行计算后返回值
- transformations: 转换, 从现有数据集创建一个新的数据集
下面我们就来演示 count() 和 first() *** 作:
1: textFile.count() // RDD 中的 item 数量,对于文本文件,就是总行数
2:textFile.first() // RDD 中的第一个 item,对于文本文件,就是第一行内容
接着演示 transformation,通过 filter transformation 来返回一个新的 RDD,
3: val linesWithSpark = textFile.filter(line => line.contains("Spark")) // 筛选出包含 Spark 的行
4: linesWithSpark.count() // 统计行数
可以看到一共有 20 行内容包含 Spark,这与通过 Linux 命令 cat ./README.md | grep "Spark" -c 得到的结果一致,说明是正确的。action 和 transformation 可以用链式 *** 作的方式结合使用,使代码更为简洁:
5: textFile.filter(line => line.contains("Spark")).count() // 统计包含 Spark 的行数
RDD的更多 *** 作
RDD 的 actions 和 transformations 可用在更复杂的计算中,例如通过如下代码可以找到包含单词最多的那一行内容共有几个单词:
代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce *** 作,找到最大的数。map()、reduce() 中的参数是 Scala 的函数字面量(function literals,也称为闭包 closures),并且可以使用语言特征或 Scala/Java 的库。例如,通过使用 Math.max() 函数(需要导入 Java 的 Math 库),可以使上述代码更容易理解:
Hadoop MapReduce 是常见的数据流模式,在 Spark 中同样可以实现(下面这个例子也就是 WordCount):
6: val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) // 实现单词统计
7:wordCounts.collect() // 输出单词统计结果
缓存
Spark 支持在集群范围内将数据集缓存至每一个节点的内存中,可避免数据传输,当数据需要重复访问时这个特征非常有用,例如查询体积小的“热”数据集,或是运行如 PageRank 的迭代算法。调用 cache(),就可以将数据集进行缓存:
linesWithSpark.cache()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)