实时统计order_info、order_detail和user_info表,将三张表汇总后的宽表存入ElasticSearch数据库需求分析
1、三表的关联条件:order_info表2、使用spark streaming对order_info和order_detail表进行流join 3、使用spark sql拉取user_info表的数据,再和上面join后的结果合并
- order_info.`id` = order_detail.`order_id`
- order_info.`user_id` = user_info.`id`
order_detail表
user_info
SaleDetailApp
RedisUtilpackage com.zyj.gmall.realtime.app import java.util.Properties import com.alibaba.fastjson.JSON import com.zyj.gmall.common.{Constant, ESUtil} import com.zyj.gmall.realtime.bean.{OrderDetail, OrderInfo, SaleDetail, UserInfo} import com.zyj.gmall.realtime.util.{MyKafkaUtil, RedisUtil} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.json4s.jackson.Serialization import redis.clients.jedis.Jedis import scala.collection.JavaConversions._ // 将java的集合转成scala集合 object SaleDetailApp { // 写数据到redis def saveToRedis(client: Jedis, key: String, value: AnyRef) = { import org.json4s.DefaultFormats val json = Serialization.write(value)(DefaultFormats) // client.set(key, json) client.setex(key, 60 * 30, json) // 添加过期时间 超过60 * 30秒之后这个key会自动删除 } // 缓存order_info def cacheOrderInfo(client: Jedis, orderInfo: OrderInfo) = { val key = "order_info:" + orderInfo.id saveToRedis(client, key, orderInfo) } // 缓存order_detail def cacheOrderDetail(client: Jedis, orderDetail: OrderDetail) = { val key = s"order_detail:${orderDetail.order_id}:${orderDetail.id}" saveToRedis(client, key, orderDetail) } // 对传入的两个流进行fullJoin def fullJoin(orderInfoStream: DStream[(String, OrderInfo)], orderDetailStream: DStream[(String, OrderDetail)]): DStream[SaleDetail] = { orderInfoStream.fullOuterJoin(orderDetailStream).mapPartitions(it => { // 1. 获取一个到redis的客户端 val client = RedisUtil.getClient() // 2. 对各种延迟情况做处理 (如果返回一个就把一个放在集合,如果返回的是空,就返回一个空集合) val result: Iterator[SaleDetail] = it.flatMap { // 2.1 orderInfo必定存在的情况 case (orderId, (Some(orderInfo), opt)) => // 写缓存 cacheOrderInfo(client, orderInfo) // 不管opt是Some还是None,总是要去读orderDetail的缓冲区 val keys = client.keys(s"order_detail:${orderId}:*").toList keys.map(key => { val orderDetailString = client.get(key) client.del(key) val orderDetail = JSON.parseObject(orderDetailString, classOf[OrderDetail]) SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) }) ::: (opt match { case Some(orderDetail) => SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) :: Nil case None => Nil }) // 2.2 orderInfo不存在 orderDetail存在的情况 case (orderId, (None, Some(orderDetail))) => //2.2.1 根据orderDetail中的orderId去缓存读取对应的orderInfo信息 val orderInfoString = client.get("order_info:" + orderId) //2.2.2 读取之后,有可能读到orderDetail中的信息,也可能没有,分表处理 //2.2.3 读到,把数据封装到SaleDetail中去 if (orderInfoString != null && orderInfoString.nonEmpty) { val orderInfo = JSON.parseObject(orderInfoString, classOf[OrderInfo]) SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) :: Nil } else { //2.2 读不到,把orderDetail的写到缓存 cacheOrderDetail(client, orderDetail) Nil } } // 3. 关闭redis客户端 client.close() // 4. 返回处理后的结果 result }) } def joinUser(saleDetailStream: DStream[SaleDetail], ssc: StreamingContext) = { val url = "jdbc:mysql://192.168.1.100:3306/gmall2" val props = new Properties() props.setProperty("user", "root") props.setProperty("password", "root") val spark = SparkSession.builder() .config(ssc.sparkContext.getConf) // 给sparkSession进行配置的时候,使用ssc.sparkContext的配置 .getOrCreate() import spark.implicits._ // 1. 先把mysql数据读进来 每隔3秒读一次 saleDetailStream.transform((saleDetailRDD: RDD[SaleDetail]) => { // 2.1 直接在driver中去把数据读过来 val userInfoRDD = spark.read.jdbc(url, "user_info", props) .as[UserInfo] .rdd .map(user => (user.id, user)) // 2.2 两个RDD做join saleDetailRDD.map(saleDetail => (saleDetail.user_id, saleDetail)) .join(userInfoRDD) .map { case (_, (saleDetail, userInfo)) => saleDetail.mergeUserInfo(userInfo) } }) } def main(args: Array[String]): Unit = { // 获取一个ssc val conf = new SparkConf().setAppName("SaleDetailApp").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(3)) //1. 读取kafka中的两个topic,得到两个流 //2. 对他们做封装 (join必须是k,v形式的 k其实就是他们join的条件) val orderInfoStream: DStream[(String, OrderInfo)] = MyKafkaUtil .getKafkaStream(ssc, Constant.TOPIC_ORDER_INFO) .map(s => { val orderInfo = JSON.parseObject(s, classOf[OrderInfo]) (orderInfo.id, orderInfo) }) val orderDetailStream: DStream[(String, OrderDetail)] = MyKafkaUtil .getKafkaStream(ssc, Constant.TOPIC_ORDER_DETAIL) .map(s => { val orderDetail = JSON.parseObject(s, classOf[OrderDetail]) (orderDetail.order_id, orderDetail) // order_id就是和order_info的id关联的条件 }) //3. 双流join var saleDetailStream: DStream[SaleDetail] = fullJoin(orderInfoStream, orderDetailStream) //4. 使用spark-sql去读mysql中的数据,然后把需要的字段添加到SaleDetail saleDetailStream = joinUser(saleDetailStream, ssc) //5. 把详情写到es中 saleDetailStream.foreachRDD(rdd => { // 1. 方法一:可以把rdd的所有数据拉到驱动端, 一次性写入 ESUtil.insertBulk("sale_detail_1015", rdd.collect().toIterator) // 2. 方法二:每个分区分别去写 }) ssc.start() ssc.awaitTermination() } }
MyKafkaUtilpackage com.zyj.gmall.realtime.util import redis.clients.jedis.Jedis object RedisUtil { val host: String = ConfigUtil.getProperty("redis.host") val port: Int = ConfigUtil.getProperty("redis.port").toInt def getClient(): Jedis = { val client = new Jedis(host, port, 6 * 1000) // 连接超时时间 client.connect() client } }
ESUtilpackage com.zyj.gmall.realtime.util import kafka.serializer.StringDecoder import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils object MyKafkaUtil { val params: Map[String, String] = Map[String, String]( ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> ConfigUtil.getProperty("kafka.servers"), ConsumerConfig.GROUP_ID_ConFIG -> ConfigUtil.getProperty("kafka.group.id") ) def getKafkaStream(ssc: StreamingContext, topic: String, otherTopics: String*): DStream[String] = { KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, params, (otherTopics :+ topic).toSet ).map(_._2) } }
pompackage com.zyj.gmall.common import io.searchbox.client.JestClientFactory import io.searchbox.client.config.HttpClientConfig import io.searchbox.core.{Bulk, Index} object ESUtil { // 端口号,服务器如果没有配置,默认是9200 val esUrl = "http://hadoop103:9200" // 创建一个客户端工厂 val factory = new JestClientFactory val conf = new HttpClientConfig.Builder(esUrl) .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍 .connTimeout(10 * 1000) // 连接到es的超时时间 .readTimeout(10 * 1000) // 读取数据的最大超时时间 .multiThreaded(true) // 是否允许多线程 .build() factory.setHttpClientConfig(conf) def insertSingle(index: String, source: Object, id: String = null) = { // 从工厂获取一个客户端 val client = factory.getObject // 写入(单次,批次) val action = new Index.Builder(source) .index(index) .`type`("_doc") .id(id) // id 如果是null,相当于没有传 .build() client.execute(action) // 关闭客户端(其实是把客户端还给工厂) client.shutdownClient() } def insertBulk(index: String, sources: Iterator[Any]) = { val client = factory.getObject val bulk = new Bulk.Builder() .defaultIndex(index) .defaultType("_doc") sources.foreach { case (id: String, data) => val action = new Index.Builder(data).id(id).build() bulk.addAction(action) case data => val action = new Index.Builder(data).build() bulk.addAction(action) } client.execute(bulk.build()) client.shutdownClient() } def main(args: Array[String]): Unit = { val list = User(1, "aa") :: User(2, "bb") :: User(3, "cc") :: Nil insertBulk("user", list.toIterator) val list2 = ("100", User(1, "a")) :: ("200", User(1, "b")) :: ("300", User(3, "c")) :: Nil insertBulk("user", list2.toIterator) } } case class User(age: Int, name: String)
gmall1015 com.zyj.gmall 1.0-SNAPSHOT 4.0.0 gmall-realtimecom.zyj.gmall gmall-common1.0-SNAPSHOT org.apache.spark spark-core_2.11org.apache.spark spark-streaming_2.11org.apache.spark spark-streaming-kafka-0-8_2.11redis.clients jedis3.2.0 org.apache.phoenix phoenix-core4.14.2-Hbase-1.3 org.apache.phoenix phoenix-spark4.14.2-Hbase-1.3 org.apache.spark spark-sql_2.11mysql mysql-connector-java5.1.47 maven-assembly-plugin jar-with-dependencies make-assembly package single gmall1015 com.zyj.gmall 1.0-SNAPSHOT 4.0.0 gmall-commoncom.alibaba fastjsonorg.apache.httpcomponents httpclientorg.apache.httpcomponents httpmimeio.searchbox jest6.3.1 net.java.dev.jna jna4.5.2 org.codehaus.janino commons-compiler2.7.8
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)