-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathspark_utils.py
27 lines (22 loc) · 899 Bytes
/
spark_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def get_partition_info(df: DataFrame, logger: Logger) -> None:
"""
Get partition info to easily determine optimal partition count for repartition/coalesce
:param df:
:param logger:
:return: None. Prints the results to the console
"""
import statistics
def get_partition_len(iterator):
yield sum(1 for _ in iterator)
rdd: RDD = df.rdd
count = rdd.getNumPartitions()
lengths = rdd.mapPartitions(get_partition_len, True).collect()
logger.info(f"{count} partition(s) total.")
process_marker(logger, message="PARTITION SIZE STATS")
logger.info(f"\tmin: {min(lengths)}")
logger.info(f"\tmax: {max(lengths)}")
logger.info(f"\tavg: {sum(lengths) / len(lengths)}")
logger.info(f"\tstddev: {statistics.stdev(lengths)}")
logger.info("\tdetailed info")
for i, pl in enumerate(lengths):
logger.info(f"{i}. {pl}")