在大数据早期的课程中我们已经学习了 MapReduce 框架的原理及基本使用,并了解了其底层数据处理的实现方式。接下来,就让咱们走进 Spark 的世界,了解一下它是如何带领我们完成数据处理的。
1 创建 Maven 项目 1.1 增加 Scala 插件Spark 由 Scala 语言开发的,所以本课件接下来的开发所使用的语言也为 Scala,咱们当前使用的 Spark 版本为 3.0.0,默认采用的 Scala 编译版本为 2.12,所以后续开发时。我们依然采用这个版本。开发前请保证 IDEA 开发工具中含有 Scala 开发插件。
1.2 增加依赖关系修改 Maven 项目中的 POM 文件,增加 Spark 框架的依赖关系。本课件基于 Spark3.0 版本,使用时请注意对应版本。
1.3 WordCountorg.apache.spark spark-core_2.123.0.0 net.alchim31.maven scala-maven-plugin3.2.2 testCompile org.apache.maven.plugins maven-assembly-plugin3.1.0 jar-with-dependencies make-assembly package single
为了能直观地感受 Spark 框架的效果,接下来我们实现一个大数据学科中最常见的教学案例 WordCount
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount") // 创建 Spark 上下文环境对象(连接对象) val sc : SparkContext = new SparkContext(sparkConf) // 读取文件数据 val fileRDD: RDD[String] = sc.textFile("input/word.txt") // 将文件中的数据进行分词 val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") ) // 转换数据结构 word => (word, 1) val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_,1)) // 将转换结构后的数据按照相同的单词进行分组聚合 val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_+_) // 将数据聚合结果采集到内存中 val word2Count: Array[(String, Int)] = word2CountRDD.collect() // 打印结果 word2Count.foreach(println) //关闭 Spark 连接 sc.stop()
执行过程中,会产生大量的执行日志,如果为了能够更好的查看程序的执行结果,可以在项目的 resources 目录中创建 log4j.properties 文件,并添加日志配置信息:
log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Set the default spark-shell log level to ERROR. When running the spark-shell, the # log level for this class is used to overwrite the root logger's log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=ERROR # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=ERROR log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR1.4 异常处理
如果本机 *** 作系统是 Windows,在程序中使用了 Hadoop 相关的东西,比如写入文件到HDFS,则会遇到如下异常:
出现这个问题的原因,并不是程序的错误,而是 windows 系统用到了 hadoop 相关的服务,解决办法是通过配置关联到 windows 的系统依赖就可以了。在 IDEA 中配置 Run Configuration,添加 HADOOP_HOME 变量。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)