Spark通信框架Spark Network Common

Spark通信框架Spark Network Common,第1张

Spark的安装分为几种模式,其中一种是本地运行模式,只需要在单节点上解压即可运行,这种模式不需要依赖Hadoop 环境。

运行 spark-shell

本地模式运行spark-shell非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME

$ MASTER=local $ bin/spark-shell

MASTER=local就是表明当前运行在单机模式。如果一切顺利,将看到下面的提示信息:

Created spark context

Spark context available as sc

这表明spark-shell中已经内置了Spark context的变量,名称为sc,我们可以直接使用该变量进行后续的 *** 作。

spark-shell 后面设置 master 参数,可以支持更多的模式,

我们在sparkshell中运行一下最简单的例子,统计在READMEmd中含有Spark的行数有多少,在spark-shell中输入如下代码:

scala>sctextFile("READMEmd")filter(_contains("Spark"))count

如果你觉得输出的日志太多,你可以从模板文件创建 conf/log4jproperties :

$ mv conf/log4jpropertiestemplate conf/log4jproperties

然后修改日志输出级别为WARN:

log4jrootCategory=WARN, console

如果你设置的 log4j 日志等级为 INFO,则你可以看到这样的一行日志 INFO SparkUI: Started SparkUI at >

thrift server可以实现通过jdbc, beeline等工具,实现连接到spark集群,并提交sql查询的机制。

默认情况下,cdh安装的spark没有包含thrift server模块,因此我们需要重新编译spark。

另外,为了不影响cdh自带的spark,而且spark目前都是基于yarn运行的,本身也没有什么独立的服务部署(除了history sever)。

所以,在一个集群中,可以部署安装多个版本的spark。

我们使用源码编译的spark 240(其中hive的版本是121)

cdh集成的spark版本和Hive版本如下:

使用jdk18

修改spark提供的mvn,使用自行安装的maven 381

使用make-distributionsh可以帮助与我们编译之后打包成tgz文件

修改pomxml文件的配置如下。

最后,执行编译命令如下:

这样打出的包,就含有thrift server的jar包了。

最终打包文件,根目录下。

之后就是解压到其他目录下后即可。

将hive-sitexml的文件连接过来,这样spark就可以读取hive的表了。

为了确保spark提交到yarn上运行,需要配置

cp spark-defaultsconftemplate spar-defaultsconf

另外,可以在spark-envsh中设置环境变量。

HADOOP_CONF_DIR

环境变量,也可以在/etc/profile中设置

启动日志可以查看,注意下端口占用问题,如下。

启动时候,使用beeline工具连接上,主要这里不用使用cdh默认安装hive提供的beeline工具,应为版本太高。

使用编译后spark生成beeline工具

参考beeline使用教程。

>

我们在windows开发机上使用spark的local模式读取远程hadoop集群中的hdfs上的数据,这样的目的是方便快速调试,而不用每写一行代码或者一个方法,一个类文件都需要打包成jar上传到linux上,再扔到正式的集群上进行测试,像功能性验证直接使用local模式来快速调测是非常方便的,当然功能测试之后,我们还需要打包成jar仍到集群上进行其他的验证比如jar包的依赖问题,这个在local模式是没法测的,还有集群运行的调优参数,这些都可以在正式仍到集群时验证。

一个样例代码如下:

def main(args: Array[String]): Unit = { //指定local模式

val conf = new SparkConf()setMaster("local[2]")setAppName("read kp data to kafka") val sc= new SparkContext(conf) //支持通配符路径,支持压缩文件读取

val rrd=sctextFile("hdfs://192168104:8020/data/log/{20170227,20170228}/tomcat-log") //提到到集群模式时,去掉uri地址,如果有双namenode,可以自动容灾

//val rrd=sctextFile("/data/log/{20170227,20170228}/tomcat-log")

//统计数量

println(rrdcount()) //停止spark

scstop()

}

如何在spark中遍历数据时获取文件路径:

val path:String="hdfs://192168104:8020/data/userlog/{20170226}/kp"

val text= scnewAPIHadoopFile[LongWritable,Text,TextInputFormat](path)

val linesWithFileNames = textasInstanceOf[NewHadoopRDD[LongWritable, Text]]

mapPartitionsWithInputSplit((inputSplit, iterator) => {

val file = inputSplitasInstanceOf[FileSplit] iteratormap(tup => (filegetPath, tup_2)) // 返回的K=全路径 V=每一行的值

}

)

linesWithFileNamesforeach(println)

如果遍历压缩文件时想要获取文件名,就使用newAPIHadoopFile,此外在本地调试下通过之后,提交到集群运行的时候,一定要把uri去掉,本地加上是想让它远程读取方便调试使用,如果正式运行去掉uri在双namenode的时候可以自动兼容,不去反而成一个隐患了。

最后我们可以通过spark on yarn模式提交任务,一个例子如下:

jars=`echo /home/search/x_spark_job/libs/jar | sed 's/ /,/g'`

bin/spark-submit --class KSearch --master yarn --jars $jars /home/search/x_spark_job/kp-100jar

这里用spark提交有另外一个优势,就是假如我开发的不是YARN应用,就是代码里没有使用SparkContext,而是一个普通的应用,就是读取mysql一个表的数据,写入另外一个mysql,这里跟MR没有关系,但是我依然可以用spark-sumbit提交,这时候是不会提交到YARN上的,但是程序会按普通程序运行,程序依赖的jar包,直接使用--jars传入就行,这一点非常方便,尤其是应用有多个依赖时,比如依赖es,hadoop,hbase,redis,fastjson,我打完包后的程序是瘦身的只有主体jar非常小,依赖的jar我可以不打到主体jar里面,在外部用的时候传入,方便共用并灵活性大大提高。

打包路径下不能存在中文和空格

使用<scope>provided</scope>将依赖不进行打包,使用平台提供的依赖

同时通过参数:

--conf sparkexecutoruserClassPathFirst=true --conf sparkdriveruserClassPathFirst=true

指定driver + executor均使用用户jar这个错都是在MAVEN插件在编译的时候报的,所以问题一定是出在编译的环节上。

这个时候就要好好检查MAVEN的编译配置,

1、看看配置里的编译版本和本机环境上配置的java版本是否一致,有时候报错的类有可能是引用了另外另外一个MAVEN模块的代码,也要看看那个模块的版本配置编码是否一致。但这并不是一定的,有时候不一致也不会有问题,但这是一个可以注意的点。

以上就是关于Spark通信框架Spark Network Common全部的内容,包括:Spark通信框架Spark Network Common、scala程序怎么形成jar包 sbt、五体大字系统进阶课讲什么等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zz/9827562.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-02
下一篇 2023-05-02

发表评论

登录后才能评论

评论列表(0条)

保存