Skip to content

Commit 270f8f2

Browse files
pyspark window functions
1 parent 1ed144c commit 270f8f2

File tree

1 file changed

+73
-0
lines changed

1 file changed

+73
-0
lines changed

pyspark-window-functions.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
author SparkByExamples.com
4+
"""
5+
6+
import pyspark
7+
from pyspark.sql import SparkSession
8+
9+
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
10+
11+
simpleData = (("James", "Sales", 3000), \
12+
("Michael", "Sales", 4600), \
13+
("Robert", "Sales", 4100), \
14+
("Maria", "Finance", 3000), \
15+
("James", "Sales", 3000), \
16+
("Scott", "Finance", 3300), \
17+
("Jen", "Finance", 3900), \
18+
("Jeff", "Marketing", 3000), \
19+
("Kumar", "Marketing", 2000),\
20+
("Saif", "Sales", 4100) \
21+
)
22+
23+
columns= ["employee_name", "department", "salary"]
24+
25+
df = spark.createDataFrame(data = simpleData, schema = columns)
26+
27+
df.printSchema()
28+
df.show(truncate=False)
29+
30+
from pyspark.sql.window import Window
31+
from pyspark.sql.functions import row_number
32+
windowSpec = Window.partitionBy("department").orderBy("salary")
33+
34+
df.withColumn("row_number",row_number().over(windowSpec)) \
35+
.show(truncate=False)
36+
37+
from pyspark.sql.functions import rank
38+
df.withColumn("rank",rank().over(windowSpec)) \
39+
.show()
40+
41+
from pyspark.sql.functions import dense_rank
42+
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
43+
.show()
44+
45+
from pyspark.sql.functions import percent_rank
46+
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
47+
.show()
48+
49+
from pyspark.sql.functions import ntile
50+
df.withColumn("ntile",ntile(2).over(windowSpec)) \
51+
.show()
52+
53+
from pyspark.sql.functions import cume_dist
54+
df.withColumn("cume_dist",cume_dist().over(windowSpec)) \
55+
.show()
56+
57+
from pyspark.sql.functions import lag
58+
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
59+
.show()
60+
61+
from pyspark.sql.functions import lead
62+
df.withColumn("lead",lead("salary",2).over(windowSpec)) \
63+
.show()
64+
65+
windowSpecAgg = Window.partitionBy("department")
66+
from pyspark.sql.functions import col,avg,sum,min,max,row_number
67+
df.withColumn("row",row_number().over(windowSpec)) \
68+
.withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
69+
.withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
70+
.withColumn("min", min(col("salary")).over(windowSpecAgg)) \
71+
.withColumn("max", max(col("salary")).over(windowSpecAgg)) \
72+
.where(col("row")==1).select("department","avg","sum","min","max") \
73+
.show()

0 commit comments

Comments
 (0)