相比于MapReduce,采用spark解决问题则简单得多:用户无需受限于(MapReduce中的)Mapper、Combiner和Reducer等组件要求的固定表达方式,而只需将解决方案翻译成Spark提供的丰富算子即可。总结起来,用Spark解决问题可以分为以下几个步骤:
1、读取自定目录下所有文本文件列表,并通过parallelize算子将文件划分成K份,每份交给一个任务处理。
2、每个任务按照以下流程依次处理分配到的文件:读取文件、分词、统计词在该文件中出现的次数。
3、按照单词进行规约(使用reduceByKey算子),将同一单词在各文件中出现的次数信息连接(join)起来,并写入最终输出目录中。
随着Druid上的DataSource的数量和数据量增加,使用原来的 Hadoop MR索引任务已经不能满足对大数据量写入Druid的需求,急需找到更快的写入方式。就是本文介绍的 druid-spark-batch
github地址
注:sbt 编译druid-spark-batch时,需要先把对应版本的druid(如: 0.11.0)安装到本地mavne仓库,否则或报错 找不到对应的druid包
Spark is included in the default hadoop coordinates similar to druid.indexer.task.defaultHadoopCoordinates=["org.apache.spark:spark-core_2.10:1.5.2-mmx1"]
1.5.2-mmx1 是依赖的spark版本号,本例使用的是spark-2.2.0, 所以middleManager节点的配置如下:
同时把依赖的spark2.2.0的包拷贝到 druid-install-path/hadoop-dependencies/spark-core_2.10/2.2.0 目录下
Druid 0.11.0 jackson版本为2.4.6, Druid-spark-batch jackson版本为2.6.5。
修改Druid 0.11.0 jackson版本为2.6.5,重新编译,解决该问题。
guice 和 guava包冲突,druid-spark-batch 是用的是guice-4.1.0.jar 和 guava-16.0.1.jar。
properties 设置 spark.executor.extraClassPath=true , 并配置 spark.executor.extraLibraryPath , 如下:
druid hdfs-storage extension 模块的HadoopFsWrapper.rename 调用了 Hadoop的FileSystem类中的Rename方法。
这方法在Hadoop中有2个一个是2个参数,一个3个参数;其中一个保护的一个是公开的。
解决方法: **修改HadoopFsWrapper种的rename方法,修改成fs.rename(from, to) **
src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala
原因:当hdfs路径包含目录时,无法获取文件大小,导致分区startingPartitions值异常,超出2G限制
修改如下:
添加索引或者使用工具,比如Apache Spark先安装 Apache Spark,查询数据库的速度可以提升10倍。
在已有的 MySQL 服务器之上使用 Apache Spark (无需将数据导出到 Spark 或者 Hadoop 平台上),这样至少可以提升 10 倍的查询性能。使用多个 MySQL 服务器(复制或者 Percona XtraDB Cluster)可以让我们在某些查询上得到额外的性能提升。你也可以使用 Spark 的缓存功能来缓存整个 MySQL 查询结果表。
思路很简单:Spark 可以通过 JDBC 读取 MySQL 上的数据,也可以执行 SQL 查询,因此我们可以直接连接到 MySQL 并执行查询。那么为什么速度会快呢?对一些需要运行很长时间的查询(如报表或者BI),由于 Spark 是一个大规模并行系统,因此查询会非常的快。MySQL 只能为每一个查询分配一个 CPU 核来处理,而 Spark 可以使用所有集群节点的所有核。在下面的例子中,我们会在 Spark 中执行 MySQL 查询,这个查询速度比直接在 MySQL 上执行速度要快 5 到 10 倍。
另外,Spark 可以增加“集群”级别的并行机制,在使用 MySQL 复制或者 Percona XtraDB Cluster 的情况下,Spark 可以把查询变成一组更小的查询(有点像使用了分区表时可以在每个分区都执行一个查询),然后在多个 Percona XtraDB Cluster 节点的多个从服务器上并行的执行这些小查询。最后它会使用map/reduce 方式将每个节点返回的结果聚合在一起形成完整的结果。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)