spark+hive开窗函数练习:求用户每次会话的行为轨迹

spark+hive开窗函数练习:求用户每次会话的行为轨迹,第1张

spark+hive开窗函数练习:求用户每次会话的行为轨迹 数据

1001    2020-09-10 10:21:21    home.html
1001    2020-09-10 10:28:10    good_list.html
1001    2020-09-10 10:35:05    good_detail.html
1001    2020-09-10 10:42:55    cart.html
1001    2020-09-10 11:35:21    home.html
1001    2020-09-10 11:36:10    cart.html
1001    2020-09-10 11:38:12    trade.html
1001    2020-09-10 11:38:55    payment.html
1002    2020-09-10 09:40:00    home.html
1002    2020-09-10 09:41:00    mine.html
1002    2020-09-10 09:42:00    favor.html
1003    2020-09-10 13:10:00    home.html
1003    2020-09-10 13:15:00    search.html

需求:

求得用户每次会话的行为轨迹(同一用户,上一条与下一条在半小时内为一次会话)


要求结果如下:


A    1001    2020-09-10 10:21:21    home.html    1
A    1001    2020-09-10 10:28:10    good_list.html    2
A    1001    2020-09-10 10:35:05    good_detail.html    3
A    1001    2020-09-10 10:42:55    cart.html    4

B    1001    2020-09-10 11:35:21    home.html    1
B    1001    2020-09-10 11:36:10    cart.html    2
B    1001    2020-09-10 11:38:12    trade.html    3
B    1001    2020-09-10 11:38:55    payment.html    4

C    1002    2020-09-10 09:40:00    home.html    1
C    1002    2020-09-10 09:41:00    mine.html    2
C    1002    2020-09-10 09:42:00    favor.html    3

D    1003    2020-09-10 13:10:00    home.html    1
D    1003    2020-09-10 13:15:00    search.html    2

一、sql语句:
1、建表:
create table page_session(
user_id string,
page_time string,
page string)
row format delimited
fields terminated by 't';
2、按时间排序-升序,按user_id分组,使用开窗函数lag()获得上一条的时间
select 
    user_id,
    page_time,
    page,
    lag(page_time) over(partition by user_id order by page_time asc) before_time
from page_session;
结果如下:
user_id    page_time                page                before_time
1001    2020-09-10 10:21:21        home.html           NULL
1001    2020-09-10 10:28:10        good_list.html      2020-09-10 10:21:21
1001    2020-09-10 10:35:05        good_detail.html    2020-09-10 10:28:10
1001    2020-09-10 10:42:55        cart.html           2020-09-10 10:35:05
1001    2020-09-10 11:35:21        home.html           2020-09-10 10:42:55
1001    2020-09-10 11:36:10        cart.html           2020-09-10 11:35:21
1001    2020-09-10 11:38:12        trade.html          2020-09-10 11:36:10
1001    2020-09-10 11:38:55        payment.html        2020-09-10 11:38:12
1002    2020-09-10 09:40:00        home.html           NULL
1002    2020-09-10 09:41:00        mine.html           2020-09-10 09:40:00
1002    2020-09-10 09:42:00        favor.html          2020-09-10 09:41:00
1003    2020-09-10 13:10:00        home.html           NULL
1003    2020-09-10 13:15:00        search.html         2020-09-10 13:10:00
3、给每个会话添加会话id


    两个条件判断是否为新会话:
    1、before_time是否为null  
    2、判断上一次时间与下一次时间是否大于半小时(时间需要转成时间戳),如果大于30分说明是新的会话
    如果是新会话,就使用concat()拼接--将user_id和时间戳拼接起来得到唯一的session_id

select
    user_id,
    page_time,
    page,
    before_time,
    if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,
    concat(user_id,'-',unix_timestamp(page_time)),null ) session_id
from(
    select 
        user_id,
        page_time,
        page,
        lag(page_time) over(partition by user_id order by page_time asc) before_time
    from page_session) tmp1;
结果如下:
user_id    page_time                page                before_time                session_id
1001    2020-09-10 10:21:21        home.html           NULL                    1001-1599733281
1001    2020-09-10 10:28:10        good_list.html      2020-09-10 10:21:21        NULL
1001    2020-09-10 10:35:05        good_detail.html    2020-09-10 10:28:10        NULL
1001    2020-09-10 10:42:55        cart.html           2020-09-10 10:35:05        NULL
1001    2020-09-10 11:35:21        home.html           2020-09-10 10:42:55        1001-1599737721
1001    2020-09-10 11:36:10        cart.html           2020-09-10 11:35:21        NULL
1001    2020-09-10 11:38:12        trade.html          2020-09-10 11:36:10        NULL
1001    2020-09-10 11:38:55        payment.html        2020-09-10 11:38:12        NULL
1002    2020-09-10 09:40:00        home.html           NULL                    1002-1599730800
1002    2020-09-10 09:41:00        mine.html           2020-09-10 09:40:00        NULL
1002    2020-09-10 09:42:00        favor.html          2020-09-10 09:41:00        NULL
1003    2020-09-10 13:10:00        home.html           NULL                    1003-1599743400
1003    2020-09-10 13:15:00        search.html         2020-09-10 13:10:00        NULL
4、给每个会话的其他数据都添加上对应的session_id


        last_value(xx,true) --返回一组值中的最后一个值(该组通常是有序集合)。
        将第二个参数设置为true是忽略null值,如果这组值中的最后一个值是null值,返回该集合中的最后一个非空值。
        默认为false,如果最后一个值为null,函数将返回 null。
        如果所有值均为空值,则返回 null。

select
    user_id,
    page_time,
    page,
    last_value(session_id,true) over(partition by user_id order by page_time asc) session_id
from(
    select
        user_id,
        page_time,
        page,
        before_time,
        if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,
        concat(user_id,'-',unix_timestamp(page_time)),null ) session_id
    from(
            select 
                user_id,
                page_time,
                page,
                lag(page_time) over(partition by user_id order by page_time asc) before_time
            from page_session) tmp1
        ) tmp2;
结果如下:
user_id    page_time                page                session_id
1001    2020-09-10 10:21:21     home.html           1001-1599733281
1001    2020-09-10 10:28:10     good_list.html      1001-1599733281
1001    2020-09-10 10:35:05     good_detail.html    1001-1599733281
1001    2020-09-10 10:42:55     cart.html           1001-1599733281
1001    2020-09-10 11:35:21     home.html           1001-1599737721
1001    2020-09-10 11:36:10     cart.html           1001-1599737721
1001    2020-09-10 11:38:12     trade.html          1001-1599737721
1001    2020-09-10 11:38:55     payment.html        1001-1599737721
1002    2020-09-10 09:40:00     home.html           1002-1599730800
1002    2020-09-10 09:41:00     mine.html           1002-1599730800
1002    2020-09-10 09:42:00     favor.html          1002-1599730800
1003    2020-09-10 13:10:00     home.html           1003-1599743400
1003    2020-09-10 13:15:00     search.html         1003-1599743400
5、求行为轨迹--


    ROW_NUMBER()排序:会根据顺序计算
    

select
    user_id,
    page_time,
    page,
       session_id,
       row_number() over(partition by session_id order by page_time asc) step
from (
         select user_id,
                page_time,
                page,
                last_value(session_id, true) over (partition by user_id order by page_time asc) session_id
         from (
                  select user_id,
                         page_time,
                         page,
                         if(before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30 * 60,
                            concat(user_id, '-', unix_timestamp(page_time)), null) session_id
                  from (
                           select user_id,
                                  page_time,
                                  page,
                                  lag(page_time) over (partition by user_id order by page_time asc) before_time
                           from page_session
                       ) tmp1
              ) tmp2
     ) tmp3;
结果如下:
user_id    page_time                page                session_id            step
1001    2020-09-10 10:21:21        home.html           1001-1599733281        1
1001    2020-09-10 10:28:10        good_list.html      1001-1599733281        2
1001    2020-09-10 10:35:05        good_detail.html    1001-1599733281        3
1001    2020-09-10 10:42:55        cart.html           1001-1599733281        4

1001    2020-09-10 11:35:21        home.html           1001-1599737721        1
1001    2020-09-10 11:36:10        cart.html           1001-1599737721        2
1001    2020-09-10 11:38:12        trade.html          1001-1599737721        3
1001    2020-09-10 11:38:55        payment.html        1001-1599737721        4

1002    2020-09-10 09:40:00        home.html           1002-1599730800        1
1002    2020-09-10 09:41:00        mine.html           1002-1599730800        2
1002    2020-09-10 09:42:00        favor.html          1002-1599730800        3

1003    2020-09-10 13:10:00        home.html           1003-1599743400        1
1003    2020-09-10 13:15:00        search.html         1003-1599743400        2
二、spark代码实现:

import java.text.SimpleDateFormat
import java.util.UUID

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset

object SessionTest1 {
  def main(args: Array[String]): Unit = {

    //需求:求得每个用户每次会话的行为轨迹
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
    import spark.implicits._

    //1、读取数据(行类型转为元组)
    val ds: Dataset[(String, String, String)] = spark.read.option("sep","t").csv("datas/session.txt")
      .toDF("user_id","page_time","page").as[(String,String,String)]

    //转为rdd *** 作
    val rdd: RDD[(String, String, String)] = ds.rdd
    //此时的结果
    //RDD(
    //   (1001,2020-09-10 10:21:21,home.html),
    //   (1001,2020-09-10 10:28:10,good_list.html),
    //   (1001,2020-09-10 10:35:05,good_detail.html),
    //   ...
    //   (1003,2020-09-10 13:15:00,search.html)
    // )

    //2、转换数据类型--转为样例类
    val rdd2: RDD[UserAnalysis] = rdd.map{
      case(userid,timestr,page)=>
        //格式化时间
        val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        //获得时间戳-毫秒
        val time = formatter.parse(timestr).getTime
        UserAnalysis(userid,time,timestr,page)
    }
    //此时的结果:
    //RDD(
    //  UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)
    //  UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)
    //  UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)
    //  ...
    //  UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1))

    //3、按照用户id分组
    val rdd3: RDD[(String, Iterable[UserAnalysis])] = rdd2.groupBy(x => x.userid)
    //此时的结果:
    //RDD(
    //  1001->Iterable(
    //          UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)
    //          UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)
    //          UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)
    //          ...)
    //  ...
    //  1003->Iterable(
    //          ...
    //          UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1)))

    val rdd4: RDD[UserAnalysis] = rdd3.flatMap(x=>{
      //4、对每个用户所有数据排序
      val sortedList: List[UserAnalysis] = x._2.toList.sortBy(y=>y.time)
      //结果为:
      //List(
      //    UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)
      //    UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)
      //    UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)
      //    ...)

      //5、针对每个用户所有数据滑窗--每2个为一个窗口
      val slidingList: Iterator[List[UserAnalysis]] = sortedList.sliding(2)
      //结果为:
      //Iterator(
      //    List( UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1),
      //          UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1))
      //    List( UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,05899882-50bb-4c21-8cbf-32cae82c27f1,1),
      //          UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,4dd73745-b572-4304-a9ff-3b64d7b95b67,1) )
      //    ....
      // )

      //6、判断每个窗口中两个数据是否属于同一个session,如果属于同一个会话,修改session,step
      slidingList.foreach(list=>{
        //取出窗口中的第一个数据
        val first = list.head
        //取出窗口中的第二个数据
        val next = list.last
        //判断第一个数据和下一个数据的时间是否在30分钟内
        if (next.time-first.time <= 30*60*1000){
          //属于同一个会话,那么下一条数据的session和上一条一样
          next.session=first.session
          //属于同一个会话,那么下一条数据的轨迹step+1
          next.step=first.step+1
        }
      })
      //返回样例类对象(样例类对象对应的session和step已经修改了)
      x._2
    })

    //7、结果展示
    rdd4.foreach(println)
    //结果:
    //UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,1)
    //UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,2)
    //UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,3)
    //UserAnalysis(1001,1599705775000,2020-09-10 10:42:55,cart.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,4)
    //UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,7de3b33f-a476-407d-9dd6-a763130130d2,1)
    //UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,7de3b33f-a476-407d-9dd6-a763130130d2,2)
    //UserAnalysis(1001,1599709092000,2020-09-10 11:38:12,trade.html,7de3b33f-a476-407d-9dd6-a763130130d2,3)
    //UserAnalysis(1001,1599709135000,2020-09-10 11:38:55,payment.html,7de3b33f-a476-407d-9dd6-a763130130d2,4)
    //UserAnalysis(1002,1599702000000,2020-09-10 09:40:00,home.html,e84903df-73d5-4fee-83e5-a11c186a8e27,1)
    //UserAnalysis(1002,1599702060000,2020-09-10 09:41:00,mine.html,e84903df-73d5-4fee-83e5-a11c186a8e27,2)
    //UserAnalysis(1002,1599702120000,2020-09-10 09:42:00,favor.html,e84903df-73d5-4fee-83e5-a11c186a8e27,3)
    //UserAnalysis(1003,1599714600000,2020-09-10 13:10:00,home.html,292ff465-60e8-483f-908a-3d03eebca0f9,1)
    //UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,292ff465-60e8-483f-908a-3d03eebca0f9,2)
  }
}

//定义样例类
//UUID(Universally Unique IDentifier)全局唯一标识符,是指在一台机器上生成的数字,它保证对在同一时空中的所有机器都是唯一的
//UUID.randomUUID().toString()是javaJDK(1.5以上的版本)提供的一个自动生成主键的方法,它生成的是以为32位的数字和字母组合的字符,中间还参杂着4个 - 符号。
case class UserAnalysis(userid:String,time:Long,timestr:String,page:String,var session:String = UUID.randomUUID().toString,var step:Int=1)


 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5652437.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存