前几篇博客分析了关于Spark Streaming的运行流程部分,由于运行流程内容过多,在此暂时分析到这里,接下来分析一下Spark Streaming的性能调优机制。
Spark Streaming性能调优机制Spark Streaming用于对大量数据的接收和处理,提高Spark集群的性能以应对更大的业务处理需要十分重要。最好的状态是数据接收的速度也能匹配,且Spark集群的硬件资源也能被充分应用,这就涉及性能能调优
并行度解析在Spark集群资源允许的前提下,可以提高数据接收、数据处理的并行度。
数据接收的并行度数据接收的并行度调优有多个方面
1.InputDStream的并行度Spark Streaming应用程序中涉及数据接收的第一个DStream是InputDStream。
下面对Receiver方式进行讨论。每个InputDStream都会在某个Worker节点上创建一个Receiver。其实在写应用程序时,可以创建多个InputDStream来接收同一数据源的数据。还可以通过配置,让这些DStream分别接收数据源的不同分区的数据,最大DStream个数可以达到数据源提供的分区数。例如,一个接收两个Kafka Topic数据的输入DStream可以被拆分成两个接收不同Topic数据的DStream。
最后,可以在程序中把多个InputDStream再合并为一个DStream,进行后续处理。下面给出基于Kafka的Java代码:
// 多个InputDStream合并为一个DStream的Java代码 int numStreams = 5; List2.Task的并行度> kafkaStreams = new ArrayList >(numStreams); for (int i = 0; i < numStreams; i++) { kafkaStream.add(KafkaUtils.createStream(...)); } JavaPairDStream unifiedStrem = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
数据接收使用的BlockGenerator里面有个RecurringTimer类型的对象blockIntervalTimer,会周期性的发送BlockGenerator消息,进而周期性的生成和存储一个Block。这个周期有一个配置参数spark.streaming.blockInterval。这个时间周期的默认值是200ms。
读写Block会用到BlockManager。在小组讨论过程中,我从与小组成员李子旭讨论中了解到BlockManager是定义于Spark Core中的,而且是Storage模块与其他模块交互最主要的类(可见此篇文章了解Spark Core大致内容:山东大学软件工程应用与实践——Spark项目(二)),提供了读和写Block的接口。这里的Block,实际上就对应了RDD中提到的Partition,每一个Partition都会对应一个Block。而Spark Streaming按Batch Interval来组织一次数据接收和处理,所以Batch Interval内的Block个数就是RDD的Partition数,也就是RDD的并行Task数。
因此,Task的并行度大致等于Batch Interval / Block Interval。比如,Batch Interval 是2s,Block Interval是200ms,则Task并行度为10.
通过调小Block Interval,可以提高Task并行度。但一般最好不要让Block Interval低于50ms。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)