虚拟机spark中怎样导入数据,的代码

虚拟机spark中怎样导入数据,的代码,第1张

具体 *** 作步骤:

1、准备Spark程序目录结构。

2、编辑buildsbt配置文件添加依赖。

3、创建WriteToCkscala数据写入程序文件。

4、编译打包。

5、运行。

参数说明:your-user-name:目标ClickHouse集群中创建的数据库账号名。

your-pasword:数据库账号名对应的密码。

your-url:目标ClickHouse集群地址。

/your/path/to/test/data/atxt:要导入的数据文件的路径,包含文件地址和文件名。说明文件中的数据及schema,需要与ClickHouse中目标表的结构保持一致。

your-table-name:ClickHouse集群中的目标表名称。

问题描述

在开发过程中使用spark去读取hive分区表的过程中(或者使用hive on spark、nodepad开发工具),部分开发人员未注意添加分区属性过滤导致在执行过程中加载了全量数据,引起任务执行效率低、磁盘IO大量损耗等问题。

解决办法

1、自定义规则CheckPartitionTable类,实现Rule,通过以下方式创建SparkSession。

2、自定义规则CheckPartitionTable类,实现Rule,将规则类追加至Optimizerbatches: Seq[Batch]中,如下。

规则内容实现

1、CheckPartitionTable规则执行类,需要通过引入sparkSession从而获取到引入conf;需要继承Rule[LogicalPlan];

2、通过splitPredicates方法,分离分区谓词,得到分区谓词表达式。在sql解析过程中将谓词解析为TreeNode,此处采用递归的方式获取分区谓词。

3、判断是否是分区表,且是否添加分区字段。

4、实现Rule的apply方法

大数据和云计算的关系

大数据JUC面试题

大数据之Kafka集群部署

大数据logstsh架构

大数据技术kafka的零拷贝

《深入理解spark核心思想及源码分析》百度网盘pdf最新全集下载:

链接:>pwd=df15 提取码:df15

简介:本书对Spark源代码进行了全面而深入的分析,旨在为Spark的优化、定制和扩展提供原理性的指导。阿里巴巴集团专家鼎力推荐,阿里巴巴资深Java开发和大数据专家撰写,Spark以其先进的设计理念,迅速成为社区的热门项目  

首先我们先点击一个工程的Project Structure菜单,这时候会d出一个对话框,仔细的用户肯定会发现里面列出来的模块(Module)居然没有yarn!就是这个原因导致yarn模块相关的代码老是报错!只需要将yarn模块加入到这里即可。

步骤依次选择 Add->Import Module->选择pomxml,然后一步一步点击确定,这时候会在对话框里面多了spark-yarn_210模块,

然后点击Maven Projects里面的Reimport All Maven Projects,等yarn模块里面的所有依赖全部下载完的时候,我们就可以看到这个模块里面的代码终于不再报错了!!

应该说这个和是不是Spark项目没什么关系。

建议你使用intellij idea,在spark目录下执行"sbt/sbt gen-idea",会自动生成idea项目,导入即可。

idea我不熟,还需要做一些其他的插件配置(python, sbt等)和环境设置。

你也可以使用Eclipse看,Eclipse有scala IDE,把Spark项目当maven工程导入。但是子项目之间的依赖会有点问题,会报错。

推荐使用前者,向Databricks的开发者看齐;我使用的是后者,我直接依赖了编译好的包就不会报错了,纯读源码的话也勉强可以跟踪和调试。

另外,我也看有的Committer用vim看spark

上一篇解读了shuffle写 *** 作的流程,相比较shuffle读 *** 作而言是比较简单的;

shuffle读取过程比较耗内存,由于在最后会把所有的数据拉入到缓存中进行聚合;

shulle读取过程中比较复杂,涉及的内容过多;下面从入口开始解读:

首先是调用ShuffleManager哈希shuffleManager 和排序shuffleManager的getReader方法获取ShuffleReader,BlockStoreShuffleReader继承了ShuffleReader,使用该类的read方法进行读取数据

该对象是一个迭代器,用于抓取多个block而形成的迭代器,初始化该对象时,会先取获取block所在的位置,以及对应的block信息以及block大小,由val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = mapOutputTrackergetMapSizesByExecutorId(handleshuffleId, startPartition, endPartition)完成;

每次抓取数据的上限由参数sparkreducermaxSizeInFlight设定,默认48M

1当数据和所在的BlockManager在一个节点时,把该信息加入到localBlocks列表中

2当数据和所在的BlockManager不在一个节点时,把该信息加入到remoteRequests 列表中;生成的FetchRequest生成条件:一个FetchRequest请求的block块大小总和大于等于maxBytesInFlight/5,会把block的信息放入进去,包括block所在的位置,blockId,block 大小(block中对应partition的数据大小)。

注意:此处生成的FetchRequest的可能会发生内存泄漏,因为如果单个block过大,拉取过来占用堆外内存过大,造成OOM

发送远程请求,进行拉取数据,需要满足条件才会发送fetch请求:首先必须有fetch请求即FetchRequest,然后队列中第一个请求的block大小+之前发送的请求的block快大小和要小于等于48M,或者为第一次发送fetch请求如果是第一次发送fetch请求,所属的block块过大,那么就会可能发生OOM的风险,因为fetch的数据会放入到ManagerBuffer当中,堆外内存中

这里我们选择在map端聚合的进行分析,进入到combineCombinersByKey方法中,该方法中,首先会进行拉取数据,对其聚合排序,如果内存不够,spill到本地磁盘,可能会产生多个文件,最后会对所有的spill文件和内存中的数据进行聚合,并作为一个集合返回

此处的iter就是上面ShuffleBlockFetchIterator,下面的next就是调用了ShuffleBlockFetchIterator中的next方法。

如果fetch请求成功获取了block,那么累计fetch请求的block大小会不断的释放,然后再发送rpc取抓取剩下的数据,next返回时一个block的id 和一个inputstream流对象

使用的也是外部排序类,会在spill过程中排序;跟shuffle write过程使用的一样,请翻看shuffle write *** 作;

注意:见解纰漏,如有误解,请提示

以上就是关于虚拟机spark中怎样导入数据,的代码全部的内容,包括:虚拟机spark中怎样导入数据,的代码、源码级解读如何解决Spark-sql读取hive分区表执行效率低问题、《深入理解spark核心思想及源码分析》pdf下载在线阅读全文,求百度网盘云资源等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/9866275.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-02
下一篇 2023-05-02

发表评论

登录后才能评论

评论列表(0条)

保存