Spark Streaming 双流 join

Spark Streaming 双流 join,第1张

Spark Streaming 双流 join 项目需求: 
实时统计order_info、order_detail和user_info表,将三张表汇总后的宽表存入ElasticSearch数据库
需求分析
 1、三表的关联条件:
  • order_info.`id` = order_detail.`order_id`
  • order_info.`user_id` = user_info.`id`
2、使用spark streaming对order_info和order_detail表进行流join 3、使用spark sql拉取user_info表的数据,再和上面join后的结果合并
order_info表

order_detail表 

user_info

SaleDetailApp
package com.zyj.gmall.realtime.app

import java.util.Properties

import com.alibaba.fastjson.JSON
import com.zyj.gmall.common.{Constant, ESUtil}
import com.zyj.gmall.realtime.bean.{OrderDetail, OrderInfo, SaleDetail, UserInfo}
import com.zyj.gmall.realtime.util.{MyKafkaUtil, RedisUtil}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.json4s.jackson.Serialization
import redis.clients.jedis.Jedis

import scala.collection.JavaConversions._ // 将java的集合转成scala集合

object SaleDetailApp {

  // 写数据到redis
  def saveToRedis(client: Jedis, key: String, value: AnyRef) = {
    import org.json4s.DefaultFormats
    val json = Serialization.write(value)(DefaultFormats)
    //    client.set(key, json)
    client.setex(key, 60 * 30, json) // 添加过期时间 超过60 * 30秒之后这个key会自动删除
  }

  // 缓存order_info
  def cacheOrderInfo(client: Jedis, orderInfo: OrderInfo) = {
    val key = "order_info:" + orderInfo.id
    saveToRedis(client, key, orderInfo)
  }

  // 缓存order_detail
  def cacheOrderDetail(client: Jedis, orderDetail: OrderDetail) = {
    val key = s"order_detail:${orderDetail.order_id}:${orderDetail.id}"
    saveToRedis(client, key, orderDetail)
  }

  // 对传入的两个流进行fullJoin
  def fullJoin(orderInfoStream: DStream[(String, OrderInfo)],
               orderDetailStream: DStream[(String, OrderDetail)]): DStream[SaleDetail] = {

    orderInfoStream.fullOuterJoin(orderDetailStream).mapPartitions(it => {
      // 1. 获取一个到redis的客户端
      val client = RedisUtil.getClient()

      // 2. 对各种延迟情况做处理 (如果返回一个就把一个放在集合,如果返回的是空,就返回一个空集合)
      val result: Iterator[SaleDetail] = it.flatMap {
        // 2.1 orderInfo必定存在的情况
        case (orderId, (Some(orderInfo), opt)) =>
          // 写缓存
          cacheOrderInfo(client, orderInfo)
          // 不管opt是Some还是None,总是要去读orderDetail的缓冲区
          val keys = client.keys(s"order_detail:${orderId}:*").toList
          keys.map(key => {
            val orderDetailString = client.get(key)
            client.del(key)
            val orderDetail = JSON.parseObject(orderDetailString, classOf[OrderDetail])
            SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
          }) ::: (opt match {
            case Some(orderDetail) =>
              SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) :: Nil
            case None =>
              Nil
          })

        // 2.2 orderInfo不存在 orderDetail存在的情况
        case (orderId, (None, Some(orderDetail))) =>
          //2.2.1 根据orderDetail中的orderId去缓存读取对应的orderInfo信息
          val orderInfoString = client.get("order_info:" + orderId)

          //2.2.2 读取之后,有可能读到orderDetail中的信息,也可能没有,分表处理
          //2.2.3 读到,把数据封装到SaleDetail中去
          if (orderInfoString != null && orderInfoString.nonEmpty) {
            val orderInfo = JSON.parseObject(orderInfoString, classOf[OrderInfo])
            SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) :: Nil
          } else {
            //2.2 读不到,把orderDetail的写到缓存
            cacheOrderDetail(client, orderDetail)
            Nil
          }
      }
      // 3. 关闭redis客户端
      client.close()
      // 4. 返回处理后的结果
      result
    })
  }

  def joinUser(saleDetailStream: DStream[SaleDetail], ssc: StreamingContext) = {

    val url = "jdbc:mysql://192.168.1.100:3306/gmall2"
    val props = new Properties()

    props.setProperty("user", "root")
    props.setProperty("password", "root")

    val spark = SparkSession.builder()
      .config(ssc.sparkContext.getConf) // 给sparkSession进行配置的时候,使用ssc.sparkContext的配置
      .getOrCreate()

    import spark.implicits._

    // 1. 先把mysql数据读进来 每隔3秒读一次
    saleDetailStream.transform((saleDetailRDD: RDD[SaleDetail]) => {
      
      // 2.1 直接在driver中去把数据读过来
      val userInfoRDD = spark.read.jdbc(url, "user_info", props)
        .as[UserInfo]
        .rdd
        .map(user => (user.id, user))

      // 2.2 两个RDD做join
      saleDetailRDD.map(saleDetail => (saleDetail.user_id, saleDetail))
        .join(userInfoRDD)
        .map {
          case (_, (saleDetail, userInfo)) =>
            saleDetail.mergeUserInfo(userInfo)
        }
    })
  }

  def main(args: Array[String]): Unit = {

    // 获取一个ssc
    val conf = new SparkConf().setAppName("SaleDetailApp").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(3))

    //1. 读取kafka中的两个topic,得到两个流
    //2. 对他们做封装 (join必须是k,v形式的 k其实就是他们join的条件)
    val orderInfoStream: DStream[(String, OrderInfo)] = MyKafkaUtil
      .getKafkaStream(ssc, Constant.TOPIC_ORDER_INFO)
      .map(s => {
        val orderInfo = JSON.parseObject(s, classOf[OrderInfo])
        (orderInfo.id, orderInfo)
      })

    val orderDetailStream: DStream[(String, OrderDetail)] = MyKafkaUtil
      .getKafkaStream(ssc, Constant.TOPIC_ORDER_DETAIL)
      .map(s => {
        val orderDetail = JSON.parseObject(s, classOf[OrderDetail])
        (orderDetail.order_id, orderDetail) // order_id就是和order_info的id关联的条件
      })

    //3. 双流join
    var saleDetailStream: DStream[SaleDetail] = fullJoin(orderInfoStream, orderDetailStream)

    //4. 使用spark-sql去读mysql中的数据,然后把需要的字段添加到SaleDetail
    saleDetailStream = joinUser(saleDetailStream, ssc)

    //5. 把详情写到es中
    saleDetailStream.foreachRDD(rdd => {
      // 1. 方法一:可以把rdd的所有数据拉到驱动端, 一次性写入
      ESUtil.insertBulk("sale_detail_1015", rdd.collect().toIterator)

      // 2. 方法二:每个分区分别去写
      

    })

    ssc.start()
    ssc.awaitTermination()
  }
}


RedisUtil
package com.zyj.gmall.realtime.util

import redis.clients.jedis.Jedis

object RedisUtil {

  val host: String = ConfigUtil.getProperty("redis.host")
  val port: Int = ConfigUtil.getProperty("redis.port").toInt

  def getClient(): Jedis = {
    val client = new Jedis(host, port, 6 * 1000) // 连接超时时间
    client.connect()
    client
  }
}
MyKafkaUtil
package com.zyj.gmall.realtime.util

import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils

object MyKafkaUtil {

  val params: Map[String, String] = Map[String, String](
    ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> ConfigUtil.getProperty("kafka.servers"),
    ConsumerConfig.GROUP_ID_ConFIG -> ConfigUtil.getProperty("kafka.group.id")
  )

  def getKafkaStream(ssc: StreamingContext, topic: String, otherTopics: String*): DStream[String] = {

    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc,
      params,
      (otherTopics :+ topic).toSet
    ).map(_._2)

  }

}
ESUtil
package com.zyj.gmall.common

import io.searchbox.client.JestClientFactory
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.core.{Bulk, Index}

object ESUtil {

  // 端口号,服务器如果没有配置,默认是9200
  val esUrl = "http://hadoop103:9200"

  // 创建一个客户端工厂
  val factory = new JestClientFactory
  val conf = new HttpClientConfig.Builder(esUrl)
    .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍
    .connTimeout(10 * 1000) // 连接到es的超时时间
    .readTimeout(10 * 1000) // 读取数据的最大超时时间
    .multiThreaded(true) // 是否允许多线程
    .build()
  factory.setHttpClientConfig(conf)

  
  def insertSingle(index: String, source: Object, id: String = null) = {
    // 从工厂获取一个客户端
    val client = factory.getObject
    // 写入(单次,批次)
    val action = new Index.Builder(source)
      .index(index)
      .`type`("_doc")
      .id(id) // id 如果是null,相当于没有传
      .build()
    client.execute(action)
    // 关闭客户端(其实是把客户端还给工厂)
    client.shutdownClient()
  }

  
  def insertBulk(index: String, sources: Iterator[Any]) = {
    val client = factory.getObject

    val bulk = new Bulk.Builder()
      .defaultIndex(index)
      .defaultType("_doc")

    sources.foreach {
      case (id: String, data) =>
        val action = new Index.Builder(data).id(id).build()
        bulk.addAction(action)
      case data =>
        val action = new Index.Builder(data).build()
        bulk.addAction(action)
    }

    client.execute(bulk.build())
    client.shutdownClient()
  }

  def main(args: Array[String]): Unit = {

    val list = User(1, "aa") :: User(2, "bb") :: User(3, "cc") :: Nil
    insertBulk("user", list.toIterator)

    val list2 = ("100", User(1, "a")) :: ("200", User(1, "b")) :: ("300", User(3, "c")) :: Nil
    insertBulk("user", list2.toIterator)

  }
}

case class User(age: Int, name: String)
pom


    
        gmall1015
        com.zyj.gmall
        1.0-SNAPSHOT
    
    4.0.0

    gmall-realtime

    
        
            com.zyj.gmall
            gmall-common
            1.0-SNAPSHOT
        

        
            org.apache.spark
            spark-core_2.11
        
        
            org.apache.spark
            spark-streaming_2.11
        

        
            org.apache.spark
            spark-streaming-kafka-0-8_2.11
        

        
            redis.clients
            jedis
            3.2.0
        

        
            org.apache.phoenix
            phoenix-core
            4.14.2-Hbase-1.3
        
        
            org.apache.phoenix
            phoenix-spark
            4.14.2-Hbase-1.3
        
        
            org.apache.spark
            spark-sql_2.11
        

        
        
            mysql
            mysql-connector-java
            5.1.47
        

    
    
        
            
            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    




    
        gmall1015
        com.zyj.gmall
        1.0-SNAPSHOT
    
    4.0.0

    gmall-common

    
        
            com.alibaba
            fastjson
        
        
            org.apache.httpcomponents
            httpclient
        
        
            org.apache.httpcomponents
            httpmime
        
        
        
            io.searchbox
            jest
            6.3.1
        
        
            net.java.dev.jna
            jna
            4.5.2
        
        
            org.codehaus.janino
            commons-compiler
            2.7.8
        
        
    

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5671963.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存