SparkStreaming: Spark 框架中针对流式实时数据处理模块 按照时间间隔将流式数据划分为很多批次batch,针对每批次的数据作为RDD进行处理 分析,最后将每批次的处理结果进行输出。 使用技术架构: Kafka -> SparKStreaming -> Redis/Hbase/RDMBs
15:00- 讲解两种获取Kafka中Topic 数据的api
此种方式获取Kafka中Topic的数据,消费的偏移量存储在检查点目录中(设置检查点目的情况) def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag ] ( // SparkStreaming中流式上下实例对象 ssc: StreamingContext, // 连接Kafka Brokers信息 metadata.broker.list kafkaParams: Map[String, String], // 从Kafka中哪个Topic读取数据 topics: Set[String] ): InputDStream[(K, V)] = { val messageHandler = (mmd: MessageAndmetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val fromOffsets = getFromOffsets(kc, kafkaParams, topics) new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler) }
此种方式,需要自己管理TopicAndPartition的偏移量(存储和读取): 将其存储在Zookeeper集群上。将偏移量存储到ZKCluster以后,也方便Kafka 监控工具监控。 def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], // 消费每个Topic中每个分区的开始偏移量 fromOffsets: Map[TopicAndPartition, Long], // 针对Topic中获取的数据如何处理的 messageHandler: MessageAndmetadata[K, V] => R ): InputDStream[R] = { val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, cleanedHandler) }
模拟json数据:
package com.hpsk.bigdata.spark.project.producer import java.text.{DecimalFormat, SimpleDateFormat} import java.util.{Date, Properties, Random, UUID} import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import org.codehaus.jettison.json.{JSONArray, JSONObject} object AliplayBillsProducerOpt { def main(args: Array[String]): Unit = { // 每隔多长时间发送一次数据 val schedulerInterval = 1 // 秒 // 每次发送数据条目数据 val sendCount: Int = 5000 // 千条 // 1. 创建一个生产者对象 // 1.1 读取配置文件 val prop = new Properties() prop.load( getClass.getClassLoader.getResourceAsStream ("producer.properties")) // 1.2 创建ProducerConfig val producerConfig = new ProducerConfig( prop ) // 1.3 创建Producer实例,生成数据 val producer = new Producer[String, String](producerConfig) // 2. 构造message val topic = "testtopic9" // 支付方式 val paymentList = List("余额宝","支付宝","xyk","花呗","yhk") // 交易状态 val tradeStatusList = List("成功", "失败") val random = new Random() val list = new scala.collection.mutable.ListBuffer[KeyedMessage[String, String]]() while (true){ val startTime = System.currentTimeMillis() // 清空 list.clear() for(index <- 1 to sendCount){ // prepare bill data val bill = new JSonObject() // 此处使用UUID代替,实际需要依据RowKey进行查询数据的 val rowKey = UUID.randomUUID().toString // 将Scala集合转换为Java集合 - 隐式转换 import scala.collection.JavaConversions._ // 列名 val columns: JSonArray = new JSonArray( List("orderId", "tradeAmount", "goodsDesc", "payment", "tradeStatus", "receiptAmount") ) // 每一列对应的值 val values: JSonArray = new JSonArray( List( getOrderId(random), getTradeAmount(20, random.nextInt(500) + 1), getGoodsDesc(random, random.nextInt(30)), paymentList(random.nextInt(5)), tradeStatusList(random.nextInt(2)), getGoodsDesc(random, random.nextInt(10)) ) ) bill .put("r", rowKey) // add RowKey .put("f", "info") .put("q", columns) .put("v", values) val message = new KeyedMessage[String, String](topic, rowKey, bill.toString) list += message } // 3. 批量发送 def send(messages: KeyedMessage[K,V]*) producer.send(list.toList: _*) val endTime = System.currentTimeMillis() println(s"send messages: ${list.length}, spent time : ${endTime - startTime}") // 线程暂停 Thread.sleep(1000 * schedulerInterval) } } def getCustomerId(random: Random): String = { val sb = new StringBuffer("1") for(index <- 1 to 10){ sb.append(random.nextInt(10)) } // 返回 sb.toString } def getOrderId(random: Random): String = { val date = new SimpleDateFormat("yyyyMMdd").format(new Date()) val sb = new StringBuffer(date) for(index <- 1 to 17){ sb.append(random.nextInt(10)) } // 返回 sb.toString } def getTradeAmount(start: Int, end: Int): String = { // 随机生成某个范围内的Double类型数据 val price: Double = start + Math.random() * end % (end - start + 1) // 保留两位小数 new DecimalFormat("#.00").format(price) } def getGoodsDesc(random: Random, size: Int): String = { val str = "abcdefghijklmnopqrstuvwxyz" val len = str.length() val sb = new StringBuffer() for (i <- 0 to (len + size)) { sb.append(str.charAt(random.nextInt(len - 1))) } sb.toString } }
消费kafka数据 ,存入Hbase中
package com.hpsk.bigdata.spark.project import java.util.Properties import com.hpsk.bigdata.spark.project.hbase.HbaseDao import com.hpsk.bigdata.spark.project.util.{KafkaCluster, ZKStringSerializer} import kafka.common.TopicAndPartition import kafka.message.MessageAndmetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext, TaskContext} import org.slf4j.{Logger, LoggerFactory} object KafkaDataStream { // 记录日志信息 private val logger: Logger = LoggerFactory.getLogger(KafkaDataStream.getClass) def main(args: Array[String]): Unit = { //接收参数 // val Array(kafka_topic, timeWindow, maxRatePerPartition) = args val Array(kafka_topic, timeWindow, maxRatePerPartition) = Array("1", "3", "5000") // 初始化配置 val sparkConf = new SparkConf() .setAppName(KafkaDataStream.getClass.getSimpleName) .setMaster("local[3]") // .set("spark.yarn.am.memory", prop.getProperty("am.memory")) // .set("spark.yarn.am.memoryOverhead", prop.getProperty("am.memoryOverhead")) // .set("spark.yarn.executor.memoryOverhead", prop.getProperty("executor.memoryOverhead")) // 此处为每秒每个partition的条数 .set("spark.streaming.kafka.maxRatePerPartition", maxRatePerPartition) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // .set("spark.reducer.maxSizeInFlight", "1m") // 设置数据本地化时间 .set("spark.locality.wait", "100ms") val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") // 多少秒处理一次请求 val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt)) //加载配置 val prop: Properties = new Properties() prop.load(getClass.getClassLoader.getResourceAsStream("kafka.properties")) val groupName = prop.getProperty("group.id") //获取配置文件中的topic val kafkaTopics: String = prop.getProperty("kafka.topic." + kafka_topic) if (kafkaTopics == null || kafkaTopics.length <= 0) { System.err.println("Usage: KafkaDataStreamis number from kafka.properties") System.exit(1) } val topics: Set[String] = kafkaTopics.split(",").toSet val kafkaParams = scala.collection.immutable.Map[String, String]( "metadata.broker.list" -> prop.getProperty("bootstrap.servers"), "group.id" -> groupName, "auto.offset.reset" -> "largest") // KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public即可。 val kc = new KafkaCluster(kafkaParams) // zk val zkClient = new ZkClient( prop.getProperty("zk.connect"), Integer.MAX_VALUE, // sessionTimeout 100000, // connectionTimeout ZKStringSerializer // 这里的ZKStringSerializer,需要把源码拷贝过来,修改一下 ) // 多个partition的offset var fromOffsets: Map[TopicAndPartition, Long] = Map() //支持多个topic : Set[String] topics.foreach(topicName => { // 去brokers中获取partition数量,注意:新增partition后需要重启 val children: Int = zkClient.countChildren(ZkUtils.getTopicPartitionsPath(topicName)) for (i <- 0 until children) { // kafka consumer 中是否有该partition的消费记录,如果没有设置为0 val tp = TopicAndPartition(topicName, i) // 消费者对此Topic消费时各个Partition在Zookeeper上记录Offset的ZNode路径位置 val path: String = s"${new ZKGroupTopicDirs(groupName, topicName).consumerOffsetDir}/$i" // 判断 某Topic的某Partition是否被消费 if (zkClient.exists(path)) { // 如果存在,则表示消费此Topic的此Partition,获取Offset fromOffsets += (tp -> zkClient.readData[String](path).toLong) } else { // 不存在设置为零 fromOffsets += (tp -> 0) } } }) logger.warn(s"+++++++++++++++++++ fromOffsets $fromOffsets +++++++++++++++++++++++++ ") // MessageHandler val messageHandler => (String, String) = (mmd: MessageAndmetadata[String, String]) => (mmd.topic, mmd.message()) //创建Kafka持续读取流,通过zk中记录的offset val messagesDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder, (String, String)]( ssc, kafkaParams, fromOffsets, messageHandler ) // 数据 *** 作 messagesDStream.foreachRDD(rdd => { if(!rdd.isEmpty()){ // 获取此次Topic消费的Offset val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // data 处理 rdd.foreachPartition(partitionRecords => { // TaskContext 上下文 获取Topic中 某个分区此次获取Message 偏移量信息 val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId) logger.warn(s"${offsetRange.topic} - ${offsetRange.partition}: from [${offsetRange.fromOffset}], to [${offsetRange.untilOffset}]") if (offsetRange.topic != null) { // 此处注意,topic的名称,即为在Hbase表的名称, Iterator[(topic, message)] HbaseDao.insert(offsetRange.topic, partitionRecords) } // TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition) val either = kc.setConsumerOffsets( groupName, Map((topicAndPartition, offsetRange.untilOffset)) ) //是 if (either.isLeft) { logger.warn(s"Error updating the offset to Kafka cluster: ${either.left.get}") } }) logger.warn(s"------- insert ${rdd.count} into hbase success ---------") } }) // 4. 启动应用 // start StreamingContext ssc.start() // Wait for the execution to stop ssc.awaitTermination() // 5. 流式应用的停止 ssc.stop(stopSparkContext = true, stopGracefully = true) } }
09:13- 14:07 kafka的zookeeper存储结构
程序的优化: -1. 数据源端: 采用Direct方式从Kafka Topic中拉取数据 sum(topics-partitions) = batch-rdd(partitions) Topic中一个分区对应每批次RDD中一个分区 比如: 每批次RDD的分区数:12个分区,如何增加分区呢?? -a. 针对RDD调整分区数 rdd.coalesce(24) -b. 调整Topic的分区数 如果调整Topic分区数,重新运行SparkStreaming程序 考虑: 考虑Topic中各个分区数目均衡,不要出现数据倾斜 -2. SparkStreaming: -a. 运行应用资源角度: 以Cluster Deploy运行在集群环境中 Driver(AM)的资源分配 Executor资源(个数、每个内存和CPU Core) -b. 参数优化: - 每秒钟处理Topic中每个分区最大数目条目数 spark.streaming.kafka.maxRatePerPartition - 数据序列化 spark.serializer (Kryo) - 数据本地化时间(降低),不要去等待 spark.locality.wait -c. 编码层面: - 判断数据是否存在 rdd.isEmpty() - 针对RDD的分区数据进行处理 rdd.foreachPartition - 数据最终存储 批量将数据存储到Hbase表中,而不是一条一条 设置Hbase表存储数据,不向WAL写数据,直接写入到MemStore中 -3. 数据库Hbase - Table的分区数 预分区(如果表刚创建) - 数据压缩 采用snappy或lz4 - Region的Compaction和Split 禁止自动合并与分割,在Hbase不忙的时候,手动(定时脚本)进行合并与分割。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)