随笔-314  评论-209  文章-0  trackbacks-0

spark 累加历史主要用到了窗口函数,而进行全部统计,则需要用到rollup函数

1  应用场景:

  1、我们需要统计用户的总使用时长(累加历史)

  2、前台展现页面需要对多个维度进行查询,如:产品、地区等等

  3、需要展现的表格头如: 产品、2015-04、2015-05、2015-06

2 原始数据:

product_code |event_date |duration |
-------------|-----------|---------|
1438         |2016-05-13 |165      |
1438         |2016-05-14 |595      |
1438         |2016-05-15 |105      |
1629         |2016-05-13 |12340    |
1629         |2016-05-14 |13850    |
1629         |2016-05-15 |227      |

3 业务场景实现

3.1 业务场景1:累加历史:

如数据源所示:我们已经有当天用户的使用时长,我们期望在进行统计的时候,14号能累加13号的,15号能累加14、13号的,以此类推

3.1.1 spark-sql实现

//spark sql 使用窗口函数累加历史数据
sqlcontext.sql(
"""
  select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration
  from userlogs_date
""").show
 ----- ---------- ------------                                                  
|pcode|event_date|sum_duration|
 ----- ---------- ------------ 
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
 ----- ---------- ------------ 

3.1.2 dataframe实现

 

//使用column提供的over 函数,传入窗口操作
import org.apache.spark.sql.expressions._
val first_2_now_window = window.partitionby("pcode").orderby("event_date")
df_userlogs_date.select(
    $"pcode",
    $"event_date",
    sum($"duration").over(first_2_now_window).as("sum_duration")
).show
 ----- ---------- ------------                                                  
|pcode|event_date|sum_duration|
 ----- ---------- ------------ 
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
 ----- ---------- ------------ 

 3.1.3 扩展 累加一段时间范围内

实际业务中的累加逻辑远比上面复杂,比如,累加之前n天,累加前n天到后n天等等。以下我们来实现:

 3.1.3.1 累加历史所有:

select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
window.partitionby("pcode").orderby("event_date").rowsbetween(long.minvalue,0)
window.partitionby("pcode").orderby("event_date")
上边四种写法完全相等

3.1.3.2 累加n天之前,假设n=3
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and current row) as sum_duration from userlogs_date
window.partitionby("pcode").orderby("event_date").rowsbetween(-3,0

  3.1.3.3 累加前n天,后m天: 假设n=3 m=5 

select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and 5 following ) as sum_duration from userlogs_date
window.partitionby("pcode").orderby("event_date").rowsbetween(-3,5)
3.1.3.4 累加该分区内所有行
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
window.partitionby("pcode").orderby("event_date").rowsbetween(long.minvalue,long.maxvalue)

 总结如下:

preceding:用于累加前n行(分区之内)。若是从分区第一行头开始,则为 unbounded。 n为:相对当前行向前的偏移量
following :与preceding相反,累加后n行(分区之内)。若是累加到该分区结束,则为 unbounded。n为:相对当前行向后的偏移量
current row:顾名思义,当前行,偏移量为0
说明:上边的前n,后m,以及current row均会累加该偏移量所在行

3.1.3.4 实测结果
累加历史:分区内当天及之前所有 写法1:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date

 ----- ---------- ------------                                                  
|pcode|event_date|sum_duration|
 ----- ---------- ------------ 
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
 ----- ---------- ------------ 
累加历史:分区内当天及之前所有 写法2:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
 ----- ---------- ------------                                                  
|pcode|event_date|sum_duration|
 ----- ---------- ------------ 
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
 ----- ---------- ------------ 
累加当日和昨天:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and current row) as sum_duration from userlogs_date
 ----- ---------- ------------                                                  
|pcode|event_date|sum_duration|
 ----- ---------- ------------ 
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       14077|
 ----- ---------- ------------ 
累加当日、昨日、明日:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and 1 following ) as sum_duration from userlogs_date
 ----- ---------- ------------                                                  
|pcode|event_date|sum_duration|
 ----- ---------- ------------ 
| 1438|2016-05-13|         760|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       26190|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       14077|
 ----- ---------- ------------ 
累加分区内所有:当天和之前之后所有select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
 ----- ---------- ------------                                                  
|pcode|event_date|sum_duration|
 ----- ---------- ------------ 
| 1438|2016-05-13|         865|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       26417|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       26417|
 ----- ---------- ------------ 
3.2 业务场景2:统计全部

3.2.1 spark sql实现

//spark sql 使用rollup添加all统计
sqlcontext.sql(
"""
  select pcode,event_date,sum(duration) as sum_duration
  from userlogs_date_1
  group by pcode,event_date with rollup
  order by pcode,event_date
""").show()
 ----- ---------- ------------                                                  
|pcode|event_date|sum_duration|
 ----- ---------- ------------ 
| null|      null|       27282|
| 1438|      null|         865|
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         595|
| 1438|2016-05-15|         105|
| 1629|      null|       26417|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       13850|
| 1629|2016-05-15|         227|
 ----- ---------- ------------ 

3.2.2 dataframe函数实现

//使用dataframe提供的rollup函数,进行多维度all统计
df_userlogs_date.rollup($"pcode", $"event_date").agg(sum($"duration")).orderby($"pcode", $"event_date")
 ----- ---------- -------------                                                 
|pcode|event_date|sum(duration)|
 ----- ---------- ------------- 
| null|      null|        27282|
| 1438|      null|          865|
| 1438|2016-05-13|          165|
| 1438|2016-05-14|          595|
| 1438|2016-05-15|          105|
| 1629|      null|        26417|
| 1629|2016-05-13|        12340|
| 1629|2016-05-14|        13850|
| 1629|2016-05-15|          227|
 ----- ---------- ------------- 

  3.3 行转列 ->pivot

 

 

pivot目前还没有sql语法,先用df语法吧
val userlogs_date_all = sqlcontext.sql("select dcode, pcode,event_date,sum(duration) as duration from userlogs group by dognum, pcode,event_date ")
userlogs_date_all.registertemptable("userlogs_date_all")
val dates = userlogs_date_all.select($"event_date").map(row => row.getas[string]("event_date")).distinct().collect().tolist
userlogs_date_all.groupby($"dcode", $"pcode").pivot("event_date", dates).sum("duration").na.fill(0).show
 ----------------- ----- ---------- ---------- ---------- ---------- 
|            dcode|pcode|2016-05-26|2016-05-13|2016-05-14|2016-05-15|
 ----------------- ----- ---------- ---------- ---------- ---------- 
|         f2429186| 1438|         0|         0|       227|         0|
|        ai2342441| 1438|         0|         0|         0|       345|
|       a320018711| 1438|         0|       939|         0|         0|
|         h2635817| 1438|         0|       522|         0|         0|
|         d0288196| 1438|         0|       101|         0|         0|
|         y0242218| 1438|         0|      1036|         0|         0|
|         h2392574| 1438|         0|         0|       689|         0|
|         d2245588| 1438|         0|         0|         1|         0|
|         y2514906| 1438|         0|         0|       118|         4|
|         h2540419| 1438|         0|       465|       242|         5|
|         r2231926| 1438|         0|         0|       305|         0|
|         h2684591| 1438|         0|       136|         0|         0|
|         a2548470| 1438|         0|       412|         0|         0|
|         gh000309| 1438|         0|         0|         0|         4|
|         h2293216| 1438|         0|         0|         0|       534|
|         r2170601| 1438|         0|         0|         0|         0|
|b2365238;b2559538| 1438|         0|         0|         0|         0|
|         bq005465| 1438|         0|         0|       642|        78|
|        ah2180324| 1438|         0|       608|       146|        36|
|         h0279306| 1438|         0|       490|         0|         0|
 ----------------- ----- ---------- ---------- ---------- ---------- 

 

 

附录

下面是这两个函数的官方api说明:

org.apache.spark.sql.scala
1
def rollup(col1: string, cols: string*): groupeddata
create a multi-dimensional rollup for the current dataframe using the specified columns, so we can run aggregation on them. see groupeddata for all the available aggregate functions.
this is a variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions).
// compute the average for all numeric columns rolluped by department and group.
df.rollup("department", "group").avg()
// compute the max age and average salary, rolluped by department and gender.
df.rollup($"department", $"gender").agg(map(
  "salary" -> "avg",
  "age" -> "max"
))
def rollup(cols: column*): groupeddata
create a multi-dimensional rollup for the current dataframe using the specified columns, so we can run aggregation on them. see groupeddata for all the available aggregate functions.
df.rollup($"department", $"group").avg()
// compute the max age and average salary, rolluped by department and gender.
df.rollup($"department", $"gender").agg(map(
  "salary" -> "avg",
  "age" -> "max"
))
org.apache.spark.sql.column.scala
def over(window: windowspec): column
define a windowing column.
val w = window.partitionby("name").orderby("id")
df.select(
  sum("price").over(w.rangebetween(long.minvalue, 2)),
  avg("price").over(w.rowsbetween(0, 4))
)
posted on 2017-10-23 22:05 xzc 阅读(791) 评论(0)  编辑  收藏 所属分类: hadoop
网站地图