1001 2020-09-10 10:21:21 home.html
1001 2020-09-10 10:28:10 good_list.html
1001 2020-09-10 10:35:05 good_detail.html
1001 2020-09-10 10:42:55 cart.html
1001 2020-09-10 11:35:21 home.html
1001 2020-09-10 11:36:10 cart.html
1001 2020-09-10 11:38:12 trade.html
1001 2020-09-10 11:38:55 payment.html
1002 2020-09-10 09:40:00 home.html
1002 2020-09-10 09:41:00 mine.html
1002 2020-09-10 09:42:00 favor.html
1003 2020-09-10 13:10:00 home.html
1003 2020-09-10 13:15:00 search.html
求得用户每次会话的行为轨迹(同一用户,上一条与下一条在半小时内为一次会话)
要求结果如下:
A 1001 2020-09-10 10:21:21 home.html 1
A 1001 2020-09-10 10:28:10 good_list.html 2
A 1001 2020-09-10 10:35:05 good_detail.html 3
A 1001 2020-09-10 10:42:55 cart.html 4
B 1001 2020-09-10 11:35:21 home.html 1
B 1001 2020-09-10 11:36:10 cart.html 2
B 1001 2020-09-10 11:38:12 trade.html 3
B 1001 2020-09-10 11:38:55 payment.html 4
C 1002 2020-09-10 09:40:00 home.html 1
C 1002 2020-09-10 09:41:00 mine.html 2
C 1002 2020-09-10 09:42:00 favor.html 3
D 1003 2020-09-10 13:10:00 home.html 1
D 1003 2020-09-10 13:15:00 search.html 2
1、建表:
create table page_session( user_id string, page_time string, page string) row format delimited fields terminated by 't';2、按时间排序-升序,按user_id分组,使用开窗函数lag()获得上一条的时间
select user_id, page_time, page, lag(page_time) over(partition by user_id order by page_time asc) before_time from page_session;结果如下:
user_id page_time page before_time 1001 2020-09-10 10:21:21 home.html NULL 1001 2020-09-10 10:28:10 good_list.html 2020-09-10 10:21:21 1001 2020-09-10 10:35:05 good_detail.html 2020-09-10 10:28:10 1001 2020-09-10 10:42:55 cart.html 2020-09-10 10:35:05 1001 2020-09-10 11:35:21 home.html 2020-09-10 10:42:55 1001 2020-09-10 11:36:10 cart.html 2020-09-10 11:35:21 1001 2020-09-10 11:38:12 trade.html 2020-09-10 11:36:10 1001 2020-09-10 11:38:55 payment.html 2020-09-10 11:38:12 1002 2020-09-10 09:40:00 home.html NULL 1002 2020-09-10 09:41:00 mine.html 2020-09-10 09:40:00 1002 2020-09-10 09:42:00 favor.html 2020-09-10 09:41:00 1003 2020-09-10 13:10:00 home.html NULL 1003 2020-09-10 13:15:00 search.html 2020-09-10 13:10:003、给每个会话添加会话id
两个条件判断是否为新会话:
1、before_time是否为null
2、判断上一次时间与下一次时间是否大于半小时(时间需要转成时间戳),如果大于30分说明是新的会话
如果是新会话,就使用concat()拼接--将user_id和时间戳拼接起来得到唯一的session_id
select user_id, page_time, page, before_time, if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60, concat(user_id,'-',unix_timestamp(page_time)),null ) session_id from( select user_id, page_time, page, lag(page_time) over(partition by user_id order by page_time asc) before_time from page_session) tmp1;结果如下:
user_id page_time page before_time session_id 1001 2020-09-10 10:21:21 home.html NULL 1001-1599733281 1001 2020-09-10 10:28:10 good_list.html 2020-09-10 10:21:21 NULL 1001 2020-09-10 10:35:05 good_detail.html 2020-09-10 10:28:10 NULL 1001 2020-09-10 10:42:55 cart.html 2020-09-10 10:35:05 NULL 1001 2020-09-10 11:35:21 home.html 2020-09-10 10:42:55 1001-1599737721 1001 2020-09-10 11:36:10 cart.html 2020-09-10 11:35:21 NULL 1001 2020-09-10 11:38:12 trade.html 2020-09-10 11:36:10 NULL 1001 2020-09-10 11:38:55 payment.html 2020-09-10 11:38:12 NULL 1002 2020-09-10 09:40:00 home.html NULL 1002-1599730800 1002 2020-09-10 09:41:00 mine.html 2020-09-10 09:40:00 NULL 1002 2020-09-10 09:42:00 favor.html 2020-09-10 09:41:00 NULL 1003 2020-09-10 13:10:00 home.html NULL 1003-1599743400 1003 2020-09-10 13:15:00 search.html 2020-09-10 13:10:00 NULL4、给每个会话的其他数据都添加上对应的session_id
last_value(xx,true) --返回一组值中的最后一个值(该组通常是有序集合)。
将第二个参数设置为true是忽略null值,如果这组值中的最后一个值是null值,返回该集合中的最后一个非空值。
默认为false,如果最后一个值为null,函数将返回 null。
如果所有值均为空值,则返回 null。
select user_id, page_time, page, last_value(session_id,true) over(partition by user_id order by page_time asc) session_id from( select user_id, page_time, page, before_time, if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60, concat(user_id,'-',unix_timestamp(page_time)),null ) session_id from( select user_id, page_time, page, lag(page_time) over(partition by user_id order by page_time asc) before_time from page_session) tmp1 ) tmp2;结果如下:
user_id page_time page session_id 1001 2020-09-10 10:21:21 home.html 1001-1599733281 1001 2020-09-10 10:28:10 good_list.html 1001-1599733281 1001 2020-09-10 10:35:05 good_detail.html 1001-1599733281 1001 2020-09-10 10:42:55 cart.html 1001-1599733281 1001 2020-09-10 11:35:21 home.html 1001-1599737721 1001 2020-09-10 11:36:10 cart.html 1001-1599737721 1001 2020-09-10 11:38:12 trade.html 1001-1599737721 1001 2020-09-10 11:38:55 payment.html 1001-1599737721 1002 2020-09-10 09:40:00 home.html 1002-1599730800 1002 2020-09-10 09:41:00 mine.html 1002-1599730800 1002 2020-09-10 09:42:00 favor.html 1002-1599730800 1003 2020-09-10 13:10:00 home.html 1003-1599743400 1003 2020-09-10 13:15:00 search.html 1003-15997434005、求行为轨迹--
ROW_NUMBER()排序:会根据顺序计算
select user_id, page_time, page, session_id, row_number() over(partition by session_id order by page_time asc) step from ( select user_id, page_time, page, last_value(session_id, true) over (partition by user_id order by page_time asc) session_id from ( select user_id, page_time, page, if(before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30 * 60, concat(user_id, '-', unix_timestamp(page_time)), null) session_id from ( select user_id, page_time, page, lag(page_time) over (partition by user_id order by page_time asc) before_time from page_session ) tmp1 ) tmp2 ) tmp3;结果如下:
user_id page_time page session_id step 1001 2020-09-10 10:21:21 home.html 1001-1599733281 1 1001 2020-09-10 10:28:10 good_list.html 1001-1599733281 2 1001 2020-09-10 10:35:05 good_detail.html 1001-1599733281 3 1001 2020-09-10 10:42:55 cart.html 1001-1599733281 4 1001 2020-09-10 11:35:21 home.html 1001-1599737721 1 1001 2020-09-10 11:36:10 cart.html 1001-1599737721 2 1001 2020-09-10 11:38:12 trade.html 1001-1599737721 3 1001 2020-09-10 11:38:55 payment.html 1001-1599737721 4 1002 2020-09-10 09:40:00 home.html 1002-1599730800 1 1002 2020-09-10 09:41:00 mine.html 1002-1599730800 2 1002 2020-09-10 09:42:00 favor.html 1002-1599730800 3 1003 2020-09-10 13:10:00 home.html 1003-1599743400 1 1003 2020-09-10 13:15:00 search.html 1003-1599743400 2二、spark代码实现:
import java.text.SimpleDateFormat import java.util.UUID import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset object SessionTest1 { def main(args: Array[String]): Unit = { //需求:求得每个用户每次会话的行为轨迹 import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate() import spark.implicits._ //1、读取数据(行类型转为元组) val ds: Dataset[(String, String, String)] = spark.read.option("sep","t").csv("datas/session.txt") .toDF("user_id","page_time","page").as[(String,String,String)] //转为rdd *** 作 val rdd: RDD[(String, String, String)] = ds.rdd //此时的结果 //RDD( // (1001,2020-09-10 10:21:21,home.html), // (1001,2020-09-10 10:28:10,good_list.html), // (1001,2020-09-10 10:35:05,good_detail.html), // ... // (1003,2020-09-10 13:15:00,search.html) // ) //2、转换数据类型--转为样例类 val rdd2: RDD[UserAnalysis] = rdd.map{ case(userid,timestr,page)=> //格式化时间 val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") //获得时间戳-毫秒 val time = formatter.parse(timestr).getTime UserAnalysis(userid,time,timestr,page) } //此时的结果: //RDD( // UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1) // UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1) // UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1) // ... // UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1)) //3、按照用户id分组 val rdd3: RDD[(String, Iterable[UserAnalysis])] = rdd2.groupBy(x => x.userid) //此时的结果: //RDD( // 1001->Iterable( // UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1) // UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1) // UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1) // ...) // ... // 1003->Iterable( // ... // UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1))) val rdd4: RDD[UserAnalysis] = rdd3.flatMap(x=>{ //4、对每个用户所有数据排序 val sortedList: List[UserAnalysis] = x._2.toList.sortBy(y=>y.time) //结果为: //List( // UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1) // UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1) // UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1) // ...) //5、针对每个用户所有数据滑窗--每2个为一个窗口 val slidingList: Iterator[List[UserAnalysis]] = sortedList.sliding(2) //结果为: //Iterator( // List( UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1), // UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)) // List( UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,05899882-50bb-4c21-8cbf-32cae82c27f1,1), // UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,4dd73745-b572-4304-a9ff-3b64d7b95b67,1) ) // .... // ) //6、判断每个窗口中两个数据是否属于同一个session,如果属于同一个会话,修改session,step slidingList.foreach(list=>{ //取出窗口中的第一个数据 val first = list.head //取出窗口中的第二个数据 val next = list.last //判断第一个数据和下一个数据的时间是否在30分钟内 if (next.time-first.time <= 30*60*1000){ //属于同一个会话,那么下一条数据的session和上一条一样 next.session=first.session //属于同一个会话,那么下一条数据的轨迹step+1 next.step=first.step+1 } }) //返回样例类对象(样例类对象对应的session和step已经修改了) x._2 }) //7、结果展示 rdd4.foreach(println) //结果: //UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,1) //UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,2) //UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,3) //UserAnalysis(1001,1599705775000,2020-09-10 10:42:55,cart.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,4) //UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,7de3b33f-a476-407d-9dd6-a763130130d2,1) //UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,7de3b33f-a476-407d-9dd6-a763130130d2,2) //UserAnalysis(1001,1599709092000,2020-09-10 11:38:12,trade.html,7de3b33f-a476-407d-9dd6-a763130130d2,3) //UserAnalysis(1001,1599709135000,2020-09-10 11:38:55,payment.html,7de3b33f-a476-407d-9dd6-a763130130d2,4) //UserAnalysis(1002,1599702000000,2020-09-10 09:40:00,home.html,e84903df-73d5-4fee-83e5-a11c186a8e27,1) //UserAnalysis(1002,1599702060000,2020-09-10 09:41:00,mine.html,e84903df-73d5-4fee-83e5-a11c186a8e27,2) //UserAnalysis(1002,1599702120000,2020-09-10 09:42:00,favor.html,e84903df-73d5-4fee-83e5-a11c186a8e27,3) //UserAnalysis(1003,1599714600000,2020-09-10 13:10:00,home.html,292ff465-60e8-483f-908a-3d03eebca0f9,1) //UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,292ff465-60e8-483f-908a-3d03eebca0f9,2) } } //定义样例类 //UUID(Universally Unique IDentifier)全局唯一标识符,是指在一台机器上生成的数字,它保证对在同一时空中的所有机器都是唯一的 //UUID.randomUUID().toString()是javaJDK(1.5以上的版本)提供的一个自动生成主键的方法,它生成的是以为32位的数字和字母组合的字符,中间还参杂着4个 - 符号。 case class UserAnalysis(userid:String,time:Long,timestr:String,page:String,var session:String = UUID.randomUUID().toString,var step:Int=1)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)