day49

day49,第1张

day49
SparkStreaming:
    Spark 框架中针对流式实时数据处理模块
    按照时间间隔将流式数据划分为很多批次batch,针对每批次的数据作为RDD进行处理
分析,最后将每批次的处理结果进行输出。
使用技术架构:
    Kafka -> SparKStreaming -> Redis/Hbase/RDMBs

15:00- 讲解两种获取Kafka中Topic 数据的api 

此种方式获取Kafka中Topic的数据,消费的偏移量存储在检查点目录中(设置检查点目的情况)
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag
] (
    // SparkStreaming中流式上下实例对象
    ssc: StreamingContext,
    // 连接Kafka Brokers信息 metadata.broker.list
    kafkaParams: Map[String, String],
    // 从Kafka中哪个Topic读取数据
    topics: Set[String]
): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndmetadata[K, V]) => 
    (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
  }

此种方式,需要自己管理TopicAndPartition的偏移量(存储和读取):
将其存储在Zookeeper集群上。将偏移量存储到ZKCluster以后,也方便Kafka 监控工具监控。
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag,
R: ClassTag] (
    ssc: StreamingContext,
    kafkaParams: Map[String, String],
    // 消费每个Topic中每个分区的开始偏移量
    fromOffsets: Map[TopicAndPartition, Long],
    // 针对Topic中获取的数据如何处理的
    messageHandler: MessageAndmetadata[K, V] => R
): InputDStream[R] = {
  val cleanedHandler = ssc.sc.clean(messageHandler)
  new DirectKafkaInputDStream[K, V, KD, VD, R](
    ssc, kafkaParams, fromOffsets, cleanedHandler)
}

模拟json数据: 

package com.hpsk.bigdata.spark.project.producer

import java.text.{DecimalFormat, SimpleDateFormat}
import java.util.{Date, Properties, Random, UUID}

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import org.codehaus.jettison.json.{JSONArray, JSONObject}



object AliplayBillsProducerOpt {

  def main(args: Array[String]): Unit = {
    // 每隔多长时间发送一次数据
    val schedulerInterval = 1  // 秒
    // 每次发送数据条目数据
    val sendCount: Int = 5000 // 千条

    // 1. 创建一个生产者对象
    // 1.1 读取配置文件
    val prop = new Properties()
    prop.load( getClass.getClassLoader.getResourceAsStream
    ("producer.properties"))

    // 1.2 创建ProducerConfig
    val producerConfig = new ProducerConfig( prop )
    // 1.3 创建Producer实例,生成数据
    val producer = new Producer[String, String](producerConfig)

    // 2. 构造message
    val topic = "testtopic9"

    // 支付方式
    val paymentList = List("余额宝","支付宝","xyk","花呗","yhk")
    // 交易状态
    val tradeStatusList = List("成功", "失败")

    val random = new Random()
    
    val list = new scala.collection.mutable.ListBuffer[KeyedMessage[String, String]]()
    while (true){
      val startTime = System.currentTimeMillis()
      // 清空
      list.clear()

      for(index <- 1 to sendCount){
        // prepare bill data
        val bill = new JSonObject()

        // 此处使用UUID代替,实际需要依据RowKey进行查询数据的
        val rowKey = UUID.randomUUID().toString
        // 将Scala集合转换为Java集合 - 隐式转换
        import scala.collection.JavaConversions._
        // 列名
        val columns: JSonArray = new JSonArray(
          List("orderId", "tradeAmount", "goodsDesc", "payment", "tradeStatus", "receiptAmount")
        )
        // 每一列对应的值
        val values: JSonArray = new JSonArray(
          List(
            getOrderId(random), getTradeAmount(20, random.nextInt(500) + 1),
            getGoodsDesc(random, random.nextInt(30)), paymentList(random.nextInt(5)),
            tradeStatusList(random.nextInt(2)), getGoodsDesc(random, random.nextInt(10))
          )
        )

        bill
          .put("r", rowKey) // add RowKey
          .put("f", "info")
          .put("q", columns)
          .put("v", values)

        val message = new KeyedMessage[String, String](topic, rowKey, bill.toString)
        list += message
      }

      // 3. 批量发送  def send(messages: KeyedMessage[K,V]*)
      producer.send(list.toList: _*)

      val endTime = System.currentTimeMillis()
      println(s"send messages: ${list.length}, spent time : ${endTime - startTime}")

      // 线程暂停
      Thread.sleep(1000 * schedulerInterval)
    }
  }


  
  def getCustomerId(random: Random): String = {
    val sb = new StringBuffer("1")
    for(index <- 1 to 10){
      sb.append(random.nextInt(10))
    }
    // 返回
    sb.toString
  }

  
  def getOrderId(random: Random): String = {
    val date = new SimpleDateFormat("yyyyMMdd").format(new Date())
    val sb = new StringBuffer(date)
    for(index <- 1 to 17){
      sb.append(random.nextInt(10))
    }
    // 返回
    sb.toString
  }

  
  def getTradeAmount(start: Int, end: Int): String = {
    // 随机生成某个范围内的Double类型数据
    val price: Double = start + Math.random() * end % (end - start + 1)
    // 保留两位小数
    new DecimalFormat("#.00").format(price)
  }

  
  def getGoodsDesc(random: Random, size: Int): String = {
    val str = "abcdefghijklmnopqrstuvwxyz"
    val len = str.length()

    val sb = new StringBuffer()
    for (i <- 0 to (len + size)) {
      sb.append(str.charAt(random.nextInt(len - 1)))
    }

    sb.toString
  }
}

 消费kafka数据 ,存入Hbase中

package com.hpsk.bigdata.spark.project

import java.util.Properties

import com.hpsk.bigdata.spark.project.hbase.HbaseDao
import com.hpsk.bigdata.spark.project.util.{KafkaCluster, ZKStringSerializer}
import kafka.common.TopicAndPartition
import kafka.message.MessageAndmetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.slf4j.{Logger, LoggerFactory}


object KafkaDataStream {
  // 记录日志信息
  private val logger: Logger = LoggerFactory.getLogger(KafkaDataStream.getClass)

  def main(args: Array[String]): Unit = {
    //接收参数
    // val Array(kafka_topic, timeWindow, maxRatePerPartition) = args
    val Array(kafka_topic, timeWindow, maxRatePerPartition) = Array("1", "3", "5000")

    // 初始化配置
    val sparkConf = new SparkConf()
      .setAppName(KafkaDataStream.getClass.getSimpleName)
      .setMaster("local[3]")
    
    //    .set("spark.yarn.am.memory", prop.getProperty("am.memory"))
    //    .set("spark.yarn.am.memoryOverhead", prop.getProperty("am.memoryOverhead"))
    //    .set("spark.yarn.executor.memoryOverhead", prop.getProperty("executor.memoryOverhead"))
    // 此处为每秒每个partition的条数
      .set("spark.streaming.kafka.maxRatePerPartition", maxRatePerPartition)
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //    .set("spark.reducer.maxSizeInFlight", "1m")
    // 设置数据本地化时间
      .set("spark.locality.wait", "100ms")

    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    // 多少秒处理一次请求
    val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt))

    

    //加载配置
    val prop: Properties = new Properties()
    prop.load(getClass.getClassLoader.getResourceAsStream("kafka.properties"))

    val groupName = prop.getProperty("group.id")

    //获取配置文件中的topic
    val kafkaTopics: String = prop.getProperty("kafka.topic." + kafka_topic)
    if (kafkaTopics == null || kafkaTopics.length <= 0) {
      System.err.println("Usage: KafkaDataStream  is number from kafka.properties")
      System.exit(1)
    }
    val topics: Set[String] = kafkaTopics.split(",").toSet
    val kafkaParams = scala.collection.immutable.Map[String, String](
      "metadata.broker.list" -> prop.getProperty("bootstrap.servers"),
      "group.id" -> groupName,
      "auto.offset.reset" -> "largest")
    // KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public即可。
    val kc = new KafkaCluster(kafkaParams)

    
    // zk
    val zkClient = new ZkClient(
      prop.getProperty("zk.connect"),
      Integer.MAX_VALUE, // sessionTimeout
      100000, // connectionTimeout
      ZKStringSerializer // 这里的ZKStringSerializer,需要把源码拷贝过来,修改一下
    )

    

    // 多个partition的offset
    var fromOffsets: Map[TopicAndPartition, Long] = Map()
    //支持多个topic : Set[String]
    topics.foreach(topicName => {
      // 去brokers中获取partition数量,注意:新增partition后需要重启
      val children: Int = zkClient.countChildren(ZkUtils.getTopicPartitionsPath(topicName))

      for (i <- 0 until children) {
        // kafka consumer 中是否有该partition的消费记录,如果没有设置为0
        val tp = TopicAndPartition(topicName, i)
        // 消费者对此Topic消费时各个Partition在Zookeeper上记录Offset的ZNode路径位置
        val path: String = s"${new ZKGroupTopicDirs(groupName, topicName).consumerOffsetDir}/$i"
        // 判断 某Topic的某Partition是否被消费
        if (zkClient.exists(path)) {
          // 如果存在,则表示消费此Topic的此Partition,获取Offset
          fromOffsets += (tp -> zkClient.readData[String](path).toLong)
        } else {
          // 不存在设置为零
          fromOffsets += (tp -> 0)
        }
      }
    })
    logger.warn(s"+++++++++++++++++++ fromOffsets $fromOffsets +++++++++++++++++++++++++ ")

    
    // MessageHandler
    val messageHandler => (String, String) = (mmd: MessageAndmetadata[String, String]) => (mmd.topic, mmd.message())

    
    //创建Kafka持续读取流,通过zk中记录的offset
    val messagesDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[
      String, String, StringDecoder, StringDecoder, (String, String)](
      ssc,
      kafkaParams,
      fromOffsets,
      messageHandler
    )

    
    // 数据 *** 作
    messagesDStream.foreachRDD(rdd => {
      if(!rdd.isEmpty()){
        // 获取此次Topic消费的Offset
        val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        // data 处理
        
        rdd.foreachPartition(partitionRecords => {
          // TaskContext 上下文     获取Topic中 某个分区此次获取Message 偏移量信息
          val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId)
          logger.warn(s"${offsetRange.topic} - ${offsetRange.partition}: from [${offsetRange.fromOffset}], to [${offsetRange.untilOffset}]")


          
          if (offsetRange.topic != null) {
            // 此处注意,topic的名称,即为在Hbase表的名称, Iterator[(topic, message)]
            HbaseDao.insert(offsetRange.topic, partitionRecords)
          }


          
          // TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id
          val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition)
          val either = kc.setConsumerOffsets(
            groupName,
            Map((topicAndPartition, offsetRange.untilOffset))
          )
          //是
          if (either.isLeft) {
            logger.warn(s"Error updating the offset to Kafka cluster: ${either.left.get}")
          }

        })
        logger.warn(s"------- insert ${rdd.count} into hbase  success ---------")
      }
    })

    // 4. 启动应用
    // start StreamingContext
    ssc.start()
    // Wait for the execution to stop
    ssc.awaitTermination()

    // 5. 流式应用的停止
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }

}

 09:13- 14:07 kafka的zookeeper存储结构 

程序的优化:
    -1. 数据源端:
        采用Direct方式从Kafka Topic中拉取数据
            sum(topics-partitions) = batch-rdd(partitions)
        Topic中一个分区对应每批次RDD中一个分区
       比如:
            每批次RDD的分区数:12个分区,如何增加分区呢??
        -a. 针对RDD调整分区数
            rdd.coalesce(24)
        -b. 调整Topic的分区数
            如果调整Topic分区数,重新运行SparkStreaming程序
        考虑:
            考虑Topic中各个分区数目均衡,不要出现数据倾斜
    -2. SparkStreaming:
        -a. 运行应用资源角度: 
            以Cluster Deploy运行在集群环境中
            Driver(AM)的资源分配
            Executor资源(个数、每个内存和CPU Core)
        -b. 参数优化:
            - 每秒钟处理Topic中每个分区最大数目条目数
                spark.streaming.kafka.maxRatePerPartition
            - 数据序列化
                spark.serializer (Kryo)
            - 数据本地化时间(降低),不要去等待
                spark.locality.wait
        -c. 编码层面:
            - 判断数据是否存在
                rdd.isEmpty()
            - 针对RDD的分区数据进行处理
                rdd.foreachPartition
            - 数据最终存储
                批量将数据存储到Hbase表中,而不是一条一条
                设置Hbase表存储数据,不向WAL写数据,直接写入到MemStore中
    -3. 数据库Hbase
        - Table的分区数
            预分区(如果表刚创建)
        - 数据压缩
            采用snappy或lz4
        - Region的Compaction和Split
            禁止自动合并与分割,在Hbase不忙的时候,手动(定时脚本)进行合并与分割。      

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5624796.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存