Spark的安装分为几种模式,其中一种是本地运行模式,只需要在单节点上解压即可运行,这种模式不需要依赖Hadoop 环境。
运行 spark-shell
本地模式运行spark-shell非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME
$ MASTER=local $ bin/spark-shell
MASTER=local就是表明当前运行在单机模式。如果一切顺利,将看到下面的提示信息:
Created spark context
Spark context available as sc
这表明spark-shell中已经内置了Spark context的变量,名称为sc,我们可以直接使用该变量进行后续的 *** 作。
spark-shell 后面设置 master 参数,可以支持更多的模式,
我们在sparkshell中运行一下最简单的例子,统计在READMEmd中含有Spark的行数有多少,在spark-shell中输入如下代码:
scala>sctextFile("READMEmd")filter(_contains("Spark"))count
如果你觉得输出的日志太多,你可以从模板文件创建 conf/log4jproperties :
$ mv conf/log4jpropertiestemplate conf/log4jproperties
然后修改日志输出级别为WARN:
log4jrootCategory=WARN, console
如果你设置的 log4j 日志等级为 INFO,则你可以看到这样的一行日志 INFO SparkUI: Started SparkUI at >
thrift server可以实现通过jdbc, beeline等工具,实现连接到spark集群,并提交sql查询的机制。
默认情况下,cdh安装的spark没有包含thrift server模块,因此我们需要重新编译spark。
另外,为了不影响cdh自带的spark,而且spark目前都是基于yarn运行的,本身也没有什么独立的服务部署(除了history sever)。
所以,在一个集群中,可以部署安装多个版本的spark。
我们使用源码编译的spark 240(其中hive的版本是121)
cdh集成的spark版本和Hive版本如下:
使用jdk18
修改spark提供的mvn,使用自行安装的maven 381
使用make-distributionsh可以帮助与我们编译之后打包成tgz文件
修改pomxml文件的配置如下。
最后,执行编译命令如下:
这样打出的包,就含有thrift server的jar包了。
最终打包文件,根目录下。
之后就是解压到其他目录下后即可。
将hive-sitexml的文件连接过来,这样spark就可以读取hive的表了。
为了确保spark提交到yarn上运行,需要配置
cp spark-defaultsconftemplate spar-defaultsconf
另外,可以在spark-envsh中设置环境变量。
HADOOP_CONF_DIR
环境变量,也可以在/etc/profile中设置
启动日志可以查看,注意下端口占用问题,如下。
启动时候,使用beeline工具连接上,主要这里不用使用cdh默认安装hive提供的beeline工具,应为版本太高。
使用编译后spark生成beeline工具
参考beeline使用教程。
>
我们在windows开发机上使用spark的local模式读取远程hadoop集群中的hdfs上的数据,这样的目的是方便快速调试,而不用每写一行代码或者一个方法,一个类文件都需要打包成jar上传到linux上,再扔到正式的集群上进行测试,像功能性验证直接使用local模式来快速调测是非常方便的,当然功能测试之后,我们还需要打包成jar仍到集群上进行其他的验证比如jar包的依赖问题,这个在local模式是没法测的,还有集群运行的调优参数,这些都可以在正式仍到集群时验证。
一个样例代码如下:
def main(args: Array[String]): Unit = { //指定local模式
val conf = new SparkConf()setMaster("local[2]")setAppName("read kp data to kafka") val sc= new SparkContext(conf) //支持通配符路径,支持压缩文件读取
val rrd=sctextFile("hdfs://192168104:8020/data/log/{20170227,20170228}/tomcat-log") //提到到集群模式时,去掉uri地址,如果有双namenode,可以自动容灾
//val rrd=sctextFile("/data/log/{20170227,20170228}/tomcat-log")
//统计数量
println(rrdcount()) //停止spark
scstop()
}
如何在spark中遍历数据时获取文件路径:
val path:String="hdfs://192168104:8020/data/userlog/{20170226}/kp"
val text= scnewAPIHadoopFile[LongWritable,Text,TextInputFormat](path)
val linesWithFileNames = textasInstanceOf[NewHadoopRDD[LongWritable, Text]]
mapPartitionsWithInputSplit((inputSplit, iterator) => {
val file = inputSplitasInstanceOf[FileSplit] iteratormap(tup => (filegetPath, tup_2)) // 返回的K=全路径 V=每一行的值
}
)
linesWithFileNamesforeach(println)
如果遍历压缩文件时想要获取文件名,就使用newAPIHadoopFile,此外在本地调试下通过之后,提交到集群运行的时候,一定要把uri去掉,本地加上是想让它远程读取方便调试使用,如果正式运行去掉uri在双namenode的时候可以自动兼容,不去反而成一个隐患了。
最后我们可以通过spark on yarn模式提交任务,一个例子如下:
jars=`echo /home/search/x_spark_job/libs/jar | sed 's/ /,/g'`
bin/spark-submit --class KSearch --master yarn --jars $jars /home/search/x_spark_job/kp-100jar
这里用spark提交有另外一个优势,就是假如我开发的不是YARN应用,就是代码里没有使用SparkContext,而是一个普通的应用,就是读取mysql一个表的数据,写入另外一个mysql,这里跟MR没有关系,但是我依然可以用spark-sumbit提交,这时候是不会提交到YARN上的,但是程序会按普通程序运行,程序依赖的jar包,直接使用--jars传入就行,这一点非常方便,尤其是应用有多个依赖时,比如依赖es,hadoop,hbase,redis,fastjson,我打完包后的程序是瘦身的只有主体jar非常小,依赖的jar我可以不打到主体jar里面,在外部用的时候传入,方便共用并灵活性大大提高。
打包路径下不能存在中文和空格
使用<scope>provided</scope>将依赖不进行打包,使用平台提供的依赖
同时通过参数:
--conf sparkexecutoruserClassPathFirst=true --conf sparkdriveruserClassPathFirst=true
指定driver + executor均使用用户jar这个错都是在MAVEN插件在编译的时候报的,所以问题一定是出在编译的环节上。
这个时候就要好好检查MAVEN的编译配置,
1、看看配置里的编译版本和本机环境上配置的java版本是否一致,有时候报错的类有可能是引用了另外另外一个MAVEN模块的代码,也要看看那个模块的版本配置编码是否一致。但这并不是一定的,有时候不一致也不会有问题,但这是一个可以注意的点。
以上就是关于Spark通信框架Spark Network Common全部的内容,包括:Spark通信框架Spark Network Common、scala程序怎么形成jar包 sbt、五体大字系统进阶课讲什么等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)