如何保证一个Spark Application只有一个SparkContext实例

如何保证一个Spark Application只有一个SparkContext实例,第1张

Spark有个关于是否允许一个application存在多个SparkContext实例的配置项, 如下: sparkdriverallowMultipleContexts: If true, log warnings instead of throwing exceptions when multiple SparkContexts are active。 该值默认为false, 即不

package Wordcount

import orgapachesparkSparkConf

import orgapachesparkSparkContext

import orgapachesparkSparkContext_

/

@author hadoop

统计字符出现个数

/

object Wordcount {

def main(args: Array[String]) {

if(argslength < 1) {

Systemerrprintln("Usage: <file>")

Systemexit(1)

}

val conf = new SparkConf()

val sc = new SparkContext(conf)

//SparkContext 是把代码提交到集群或者本地的通道,我们编写Spark代码,无论是要本地运行还是集群运行都必须有SparkContext的实例

val line = sctextFile(args(0))

//把读取的内容保存给line变量,其实line是一个MappedRDD,Spark的所有 *** 作都是基于RDD的

lineflatMap(_split(" "))map((_, 1))reduceByKey(_+_)collectforeach(println)

scstop

}

网易在Spark多租户方面的工作,这个项目叫做Kyuubi(该项目的开源地址: >

首先以一个简单的示例开始:用Spark Streaming对从TCP连接中接收的文本进行单词计数。

/

功能:用spark streaming实现的针对流式数据进行单词计数的程序

该程序只是对数据流中的每一批数据进行单独的计数,而没有进行增量计数。

环境:spark 161, scala 2104

/

// 引入相关类库

import orgapachespark_

import orgapachesparkstreaming_

object NetworkWordCount {

def main(args: Array[String]) {

// Spark Streaming程序以StreamingContext为起点,其内部维持了一个SparkContext的实例。

// 这里我们创建一个带有两个本地线程的StreamingContext,并设置批处理间隔为1秒。

val conf = new SparkConf()setMaster("local[2]")setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))

// 在一个Spark应用中默认只允许有一个SparkContext,默认地spark-shell已经为我们创建好了

// SparkContext,名为sc。因此在spark-shell中应该以下述方式创建StreamingContext,以

// 避免创建再次创建SparkContext而引起错误:

// val ssc = new StreamingContext(sc, Seconds(1))

// 创建一个从TCP连接获取流数据的DStream,其每条记录是一行文本

val lines = sscsocketTextStream("localhost", 9999)

// 对DStream进行转换,最终得到计算结果

val res = linesflatMap(_split(" "))map((_, 1))reduceByKey(_ + _)

// 打印该DStream中每个RDD中的前十个元素

resprint()

// 执行完上面代码,Spark Streaming并没有真正开始处理数据,而只是记录需在数据上执行的 *** 作。

// 当我们设置好所有需要在数据上执行的 *** 作以后,我们就可以开始真正地处理数据了。如下:

sscstart() // 开始计算

sscawaitTermination() // 等待计算终止

}

}

为了测试程序,我们得有TCP数据源作为输入,这可以使用Netcat(一般linux系统中都有,如果是windows系统,则推荐你使用 Ncat ,Ncat是一个改进版的Netcat)。如下使用Netcat监听指定本地端口:

nc -lk 9999

如果是使用Ncat,则对应命令如下:

ncat -lk 9999

在IntelliJ IDEA或Eclipse中可以本地运行测试上述Spark Streaming程序,该程序会连接到Netcat(或Ncat)监听的端口,你可以在运行Netcat(或Ncat)的终端中输入东东并回车,然后就可以看到该Spark Streaming程序会马上输出处理结果,并且这个处理是不停的、流式的。

注意:上述示例只是对数据流中的每一批数据进行单独的计数,而没有进行增量计数。

word2vector 是google开源的一个生成词向量的工具,以语言模型为优化目标,迭代更新训练文本中的词向量,最终收敛获得词向量。词向量可以作为文本分析中重要的特征,在分类问题、标注问题等场景都有着重要的应用价值。本文总结下了spark word2vector使用过程中遇到的问题,以及给出word2vector使用所需的参数配置,希望能够减少新手在使用过程中遇到的坑,希望有所帮助。

from pysparkmlfeature import Word2Vec

from pysparksql import SQLContext

from pyspark import SparkConf, SparkContext

from pysparksql import Row

conf = (SparkConf()set("sparkdrivermaxResultSize","2g"))

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

text = sctextFile("yourfilepath")

documentDF = textmap(lambda x : Row(text=xsplit(" ")))toDF()

word2Vec = Word2Vec(vectorSize=200, minCount=5, numPartitions=100,inputCol="text", outputCol="result")

model = word2Vecfit(documentDF)

vector_model = modelgetVectors()

vector_modelsaveAsParquetFile("modelpath")

spark-submit

--master yarn-client

--executor-cores 2

--executor-memory 14g

--queue your-queue

--num-executors 100

--driver-memory 10g

--conf sparkuiport=$RANDOM

--conf sparkshufflemanager=SORT

--conf sparkshufflememoryFraction=02

--conf sparkyarnexecutormemoryOverhead=2048

--conf sparkcoreconnectionackwaittimeout=300

--conf sparkakkaframeSize=600 /word2vector_trainingpy

以上就是关于如何保证一个Spark Application只有一个SparkContext实例全部的内容,包括:如何保证一个Spark Application只有一个SparkContext实例、如何在给spark提交worldcount、网易Kyuubi等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9492377.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-28
下一篇 2023-04-28

发表评论

登录后才能评论

评论列表(0条)

保存