spark window 的使用

网友投稿 870 2022-11-11

spark window 的使用

spark window 的使用

1. window 用在rank 中的使用

看这样一个需求,求出每个销售人员的按照销售金额大小的orderid

package com.waitingfyimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._object WindowTest { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[2]") .appName("WindowTest") .getOrCreate() import spark.implicits._ val orders = Seq( ("1", "s1", "2017-05-01", 100), ("2", "s1", "2017-05-02", 200), ("3", "s1", "2017-05-02", 200), ("4", "s2", "2017-05-01", 300), ("5", "s2", "2017-05-01", 100), ("6", "s3", "2017-05-01", 100), ("6", "s3", "2017-05-02", 50) ).toDF("order_id", "seller_id", "order_date", "price") val rankSpec = Window.partitionBy("seller_id").orderBy(orders("price").desc) val shopOrderRank = orders.withColumn("rank", dense_rank.over(rankSpec)) shopOrderRank.show() spark.close() }}

+--------+---------+----------+-----+----+ |order_id|seller_id|order_date|price|rank| +--------+---------+----------+-----+----+ | 4| s2|2017-05-01| 300| 1| | 5| s2|2017-05-01| 100| 2| | 6| s3|2017-05-01| 100| 1| | 6| s3|2017-05-02| 50| 2| | 2| s1|2017-05-02| 200| 1| | 3| s1|2017-05-02| 200| 1| | 1| s1|2017-05-01| 100| 2| +--------+---------+----------+-----+----+

注意下这边用了dense_rank,所以金额一样的,排名一样,而且下一个排名不会出现断层 如果用rank,结果如下:

+--------+---------+----------+-----+----+ |order_id|seller_id|order_date|price|rank| +--------+---------+----------+-----+----+ | 4| s2|2017-05-01| 300| 1| | 5| s2|2017-05-01| 100| 2| | 6| s3|2017-05-01| 100| 1| | 6| s3|2017-05-02| 50| 2| | 2| s1|2017-05-02| 200| 1| | 3| s1|2017-05-02| 200| 1| | 1| s1|2017-05-01| 100| 3| +--------+---------+----------+-----+----+

如果用row_number 结果如下:

+--------+---------+----------+-----+----+ |order_id|seller_id|order_date|price|rank| +--------+---------+----------+-----+----+ | 4| s2|2017-05-01| 300| 1| | 5| s2|2017-05-01| 100| 2| | 6| s3|2017-05-01| 100| 1| | 6| s3|2017-05-02| 50| 2| | 2| s1|2017-05-02| 200| 1| | 3| s1|2017-05-02| 200| 2| | 1| s1|2017-05-01| 100| 3| +--------+---------+----------+-----+----+

2. 求移动平均,或者移动和类似rowsBetween的使用

现在需求是求一个营业员的最近2个订单的平均成交价格

package com.waitingfyimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._object WindowTest { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[2]") .appName("WindowTest") .getOrCreate() import spark.implicits._ val orders = Seq( ("1", "s1", "2017-05-01", 100), ("2", "s1", "2017-05-02", 200), ("3", "s1", "2017-05-02", 200), ("4", "s2", "2017-05-01", 300), ("5", "s2", "2017-05-01", 100), ("6", "s3", "2017-05-01", 100), ("6", "s3", "2017-05-02", 50) ).toDF("order_id", "seller_id", "order_date", "price") val rankSpec = Window.partitionBy("seller_id").orderBy(orders("order_date")).rowsBetween(-1, 0)// val shopOrderRank =// orders.withColumn("rank", row_number.over(rankSpec)) val shopOrderRank = orders.withColumn("avg sum", avg("price").over(rankSpec)) shopOrderRank.show() spark.close() }}

注意rowsBetween的使用。

+--------+---------+----------+-----+-------+ |order_id|seller_id|order_date|price|avg sum| +--------+---------+----------+-----+-------+ | 4| s2|2017-05-01| 300| 300.0| | 5| s2|2017-05-01| 100| 200.0| | 6| s3|2017-05-01| 100| 100.0| | 6| s3|2017-05-02| 50| 75.0| | 1| s1|2017-05-01| 100| 100.0| | 2| s1|2017-05-02| 200| 150.0| | 3| s1|2017-05-02| 200| 200.0| +--------+---------+----------+-----+-------+

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Python 连接 Hbase happybase
下一篇:使用log4j2自定义配置文件位置和文件名(附log4j2.xml配置实例)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~