首先有这样的一张csv数据表
名称叫做test.csv
将test.csv上传至hdfs 然后在zeppelin中创建一张分区表
create table exam.userbehavior_partitioned( user_id string, item_id string, category_id string, behavior_type string, time string ) partitioned by (dt string) stored as orc
然后将数据插入到分区表中
首先先开启分区表设置
-- set hive.exec.dynamic.partition=true -- set hive.exec.dynamic.partition.mode=nostrict 开启动态分区
这里我们按照日期进行分区
将时间戳格式化为”年-月-日时:分:秒”格式,将数据插入至userbehavior_partitioned表中
insert into exam.userbehavior_partitioned partition(dt) select user_id,item_id,category_id,behavior_type, from_unixtime(cast(time as bigint),'YYYY-MM-dd HH:mm:ss') time, substring(from_unixtime(cast(time as bigint),'YYYY-MM-dd HH:mm:ss'),1,10) dt from exam.userbehavior
因为建表时time设置的为string类型 所以这里需要转类型为cast (time as bigint) 数据插入成功
附带两句查询语句
// 1使用SparkSQL统计用户最近购买时间。以2017-12-03为当前日期, // 计算时间范围为一个月,计算用户最近购买时间,时间的区间为0-30天,将其分为5档, // 0-6天,7-124天,13-18天,19-24天,25-30天分别对应评分4到0(15分) val df1=df.where("behavior_type='buy'") .withColumn("buytime",from_unixtime($"time","yyyy-MM-dd")) .where("buytime>='2017-11-03' and buytime<='2017-12-03'").cache() // df1.withColumn("day", // when(datediff(lit("2017-12-03"),$"buytime")<=lit(6),4). // when(datediff(lit("2017-12-03"),$"buytime")<=lit(12),3). // when(datediff(lit("2017-12-03"),$"buytime")<=lit(18),2). // when(datediff(lit("2017-12-03"),$"buytime")<=lit(24),1) // .otherwise(0)).show() // 2使用SparkSQL统计用户的消费频率。以2017-12-03为当前日期,计算时间范围为一个月, // 计算用户的消费次数,用户中消费次数从低到高为1-161次,将其分为5档 // ,1-32,33-64,65-96,97-128,129-161分别对应评分0到4(15分) df1.groupBy("user_id").agg(count("item_id") as "buynum") .withColumn("level", when(lit($"buynum")查询hdfs文件有多少行
hdfs dfs -cat 表 | wc -l
查询出有
普通查询有多少行
wc -l 表 即可
查询第一行数据
简单实用
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)