Linux里面spark作用是什么?

Linux里面spark作用是什么?,第1张

Spark是通用数据处理引擎,适用于多种情况。 应用程序开发人员和数据科学家将Spark集成到他们的应用程序中,以快速地大规模查询,分析和转换数据。 与Spark最频繁相关的任务包括跨大型数据集的交互式查询,来自传感器或金融系统的流数据处理以及机器学习任务。

Spark于2009年开始运作,最初是加州大学伯克利分校AMPLab内部的一个项目。 更具体地说,它是出于证明Mesos概念的需要而诞生的,Mesos概念也是在AMPLab中创建的。 在Mesos白皮书《 Mesos:数据中心中的细粒度资源共享平台》中首次讨论了Spark,其中最著名的作者是Benjamin Hindman和Matei Zaharia。

2013年,Spark成为Apache Software Foundation的孵化项目,并于2014年初被提升为该基金会的顶级项目之一。 Spark是基金会管理的最活跃的项目之一,围绕该项目成长的社区包括多产的个人贡献者和资金雄厚的企业支持者,例如Databricks,IBM和中国的华为。

从一开始,Spark就被优化为在内存中运行。 它比Hadoop的MapReduce等替代方法更快地处理数据,后者倾向于在处理的每个阶段之间向计算机硬盘写入数据或从计算机硬盘写入数据。 Spark的支持者声称,Spark在内存中的运行速度可以比Hadoop MapReduce快100倍,并且在以类似于Hadoop MapReduce本身的方式处理基于磁盘的数据时也可以快10倍。 这种比较并不完全公平,这不仅是因为原始速度对Spark的典型用例而言比对批处理更为重要,在这种情况下,类似于MapReduce的解决方案仍然很出色。

我们在windows开发机上使用spark的local模式读取远程hadoop集群中的hdfs上的数据,这样的目的是方便快速调试,而不用每写一行代码或者一个方法,一个类文件都需要打包成jar上传到linux上,再扔到正式的集群上进行测试,像功能性验证直接使用local模式来快速调测是非常方便的,当然功能测试之后,我们还需要打包成jar仍到集群上进行其他的验证比如jar包的依赖问题,这个在local模式是没法测的,还有集群运行的调优参数,这些都可以在正式仍到集群时验证。

一个样例代码如下:

def main(args: Array[String]): Unit = {//指定local模式

val conf = new SparkConf().setMaster("local[2]").setAppName("read kp data to kafka")val sc= new SparkContext(conf)//支持通配符路径,支持压缩文件读取

val rrd=sc.textFile("hdfs://192.168.10.4:8020/data/log/{20170227,20170228}/tomcat-log*")//提到到集群模式时,去掉uri地址,如果有双namenode,可以自动容灾

//val rrd=sc.textFile("/data/log/{20170227,20170228}/tomcat-log*")

//统计数量

println(rrd.count())//停止spark

sc.stop()

}

如何在spark中遍历数据时获取文件路径:

val path:String="hdfs://192.168.10.4:8020/data/userlog/{20170226}/kp*"

val text= sc.newAPIHadoopFile[LongWritable,Text,TextInputFormat](path)

val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]

.mapPartitionsWithInputSplit((inputSplit, iterator) =>{

val file = inputSplit.asInstanceOf[FileSplit]iterator.map(tup =>(file.getPath, tup._2)) // 返回的K=全路径 V=每一行的值

}

)

linesWithFileNames.foreach(println)

如果遍历压缩文件时想要获取文件名,就使用newAPIHadoopFile,此外在本地调试下通过之后,提交到集群运行的时候,一定要把uri去掉,本地加上是想让它远程读取方便调试使用,如果正式运行去掉uri在双namenode的时候可以自动兼容,不去反而成一个隐患了。

最后我们可以通过spark on yarn模式提交任务,一个例子如下:

jars=`echo /home/search/x_spark_job/libs/*jar | sed 's/ /,/g'`

bin/spark-submit --class KSearch --master yarn --jars $jars/home/search/x_spark_job/kp-1.0.0.jar

这里用spark提交有另外一个优势,就是假如我开发的不是YARN应用,就是代码里没有使用SparkContext,而是一个普通的应用,就是读取mysql一个表的数据,写入另外一个mysql,这里跟MR没有关系,但是我依然可以用spark-sumbit提交,这时候是不会提交到YARN上的,但是程序会按普通程序运行,程序依赖的jar包,直接使用--jars传入就行,这一点非常方便,尤其是应用有多个依赖时,比如依赖es,hadoop,hbase,redis,fastjson,我打完包后的程序是瘦身的只有主体jar非常小,依赖的jar我可以不打到主体jar里面,在外部用的时候传入,方便共用并灵活性大大提高。

我在本地使用 Intellij Idea 打包了一个 spark 的程序 jar 包,放到linux集群上运行,报错信息是: Unsupported major.minor version 52.0

本机系统 ->windows10开发工具 ->Intellij Idea构建工具 ->maven

集群系统 ->Linuxjre ->Java(TM) SE Runtime Environment (build 1.7.0_80-b15)`

根据报错 log 可以断定的是由于我本地编译打包所使用的 jdk 版本和 linux 集群的 jre 版本不一致导致的。stanford parser 和 jdk 版本对应关系为:

可以推断出是由于我打包编译时所使用的 jdk 版本是 jdk8,而集群的 jre 是7,才导致的问题。

maven 项目会用 maven-compiler-plugin 默认的 jdk 版本来进行编译,如果不指明版本就容易出现版本不匹配的问题,可能导致编译不通过的问题。解决办法:在 pom 文件中配置 maven-compiler-plugin 插件。

方式一:

方式二:

如果使用 scala 编写 spark 的程序,在编译打包时候要注意 scala 的版本号和 jdk 版本的对应关系,同时也要考虑集群上 jre 的版本。比如我的集群上所使用的 jre 的版本号为 7,那么本机打包编译的 jdk 版本必须为 7 ,那么 scala 版本必须为 2.12 版本以下。

Intellij Idea 设置「开发」运行时所用的 jdk 版本的几个地方:

如果上图中 Intellij Idea 的开发运行 jdk 版本配置错误,在开发运行编译的时候会报: Error:java: 无效的源发行版: xx


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/7354608.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-04
下一篇 2023-04-04

发表评论

登录后才能评论

评论列表(0条)

保存