From 190f2db9c81868b295f5a5b559ab6fb502452a1b Mon Sep 17 00:00:00 2001 From: enricogallinucci Date: Wed, 3 Apr 2024 22:26:59 +0200 Subject: [PATCH] update first class --- materials/2024_bbs_dm_spark_basics.ipynb | 877 +---------------------- 1 file changed, 1 insertion(+), 876 deletions(-) diff --git a/materials/2024_bbs_dm_spark_basics.ipynb b/materials/2024_bbs_dm_spark_basics.ipynb index 8059695..08e8882 100644 --- a/materials/2024_bbs_dm_spark_basics.ipynb +++ b/materials/2024_bbs_dm_spark_basics.ipynb @@ -1,876 +1 @@ -{ - "nbformat": 4, - "nbformat_minor": 0, - "metadata": { - "colab": { - "provenance": [], - "collapsed_sections": [ - "oBd7XwkFBDEF" - ] - }, - "kernelspec": { - "name": "python3", - "display_name": "Python 3" - }, - "language_info": { - "name": "python" - } - }, - "cells": [ - { - "cell_type": "markdown", - "source": [ - "# Install Spark & initialize application\n", - "\n", - "Run the following code to install Spark in your Colab environment." - ], - "metadata": { - "id": "EsElqAaj4Sse" - } - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "fbvEUbWIHm2s" - }, - "outputs": [], - "source": [ - "!apt-get install openjdk-8-jdk-headless -qq > /dev/null\n", - "!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz\n", - "# If the above command is too slow, uncomment the following and try it\n", - "# !wget -q https://big.csr.unibo.it/downloads/bbs-dm/spark-3.2.1-bin-hadoop2.7.tgz\n", - "!tar xf spark-3.5.1-bin-hadoop3.tgz\n", - "!pip install -q findspark" - ] - }, - { - "cell_type": "code", - "source": [ - "import os\n", - "os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-8-openjdk-amd64\"\n", - "os.environ[\"SPARK_HOME\"] = \"/content/spark-3.5.1-bin-hadoop3\"\n", - "import findspark\n", - "findspark.init()\n", - "findspark.find() # Should return '/content/spark-3.5.1-bin-hadoop3'" - ], - "metadata": { - "id": "4oTFM5YtJvv7" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "from pyspark.sql import SparkSession\n", - "\n", - "spark = SparkSession.builder\\\n", - " .master(\"local\")\\\n", - " .appName(\"Colab\")\\\n", - " .config('spark.ui.port', '4050')\\\n", - " .getOrCreate()\n", - "sc = spark.sparkContext\n", - "\n", - "sc" - ], - "metadata": { - "id": "KJlzVAmbJ9vL" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "# Spark: working with RDDs\n", - "\n", - "Check the documentation: [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis)." - ], - "metadata": { - "id": "oBd7XwkFBDEF" - } - }, - { - "cell_type": "markdown", - "source": [ - "## Basics" - ], - "metadata": { - "id": "jcoFwGvm4gj6" - } - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "autoscroll": "auto", - "id": "vlFswJyWytKG" - }, - "outputs": [], - "source": [ - "# let's create a simple example\n", - "riddle1 = \"over the bench the sheep lives under the bench the sheep dies\"\n", - "riddle2 = [\"over the bench the sheep lives\", \"under the bench the sheep dies\"]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "autoscroll": "auto", - "id": "WHywj4BmytKH" - }, - "outputs": [], - "source": [ - "# create an RDD from the `riddle` string\n", - "rdd1 = sc.parallelize(riddle1.split(\" \"))\n", - "# each tuple of the RDD corresponds to a single word\n", - "\n", - "print(rdd1)\n", - "# why is there no result returned?" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "autoscroll": "auto", - "id": "3oYjBLnOytKI" - }, - "outputs": [], - "source": [ - "# compute the RDD\n", - "print(rdd1.collect())" - ] - }, - { - "cell_type": "code", - "source": [ - "rdd2 = sc.parallelize(riddle2)\n", - "print(rdd2.collect())" - ], - "metadata": { - "id": "exzQLruZ9qgg" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "## Transformations" - ], - "metadata": { - "id": "iyH7halU9HiB" - } - }, - { - "cell_type": "code", - "source": [ - "# map: returns a new RDD by applying a function to each of the elements in the original RDD\n", - "rdd1.map(lambda s: s.upper()).collect()" - ], - "metadata": { - "id": "sndBHyEF86T5" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# flatMap: returns a new RDD by applying the function to every element of the parent RDD and then flattening the result\n", - "rdd2.flatMap(lambda s: s.split(\" \")).collect()" - ], - "metadata": { - "id": "9MlPRBd_-Cl1" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# filter: returns a new RDD containing only the elements in the parent RDD that satisfy the function inside filter\n", - "rdd1.filter(lambda s: s.startswith(\"u\")).collect()" - ], - "metadata": { - "id": "UlT_jxmH9Myx" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# distinct: returns a new RDD that contains only the distinct elements in the parent RDD\n", - "rdd1.distinct().collect()" - ], - "metadata": { - "id": "QxxJdRxW-Xcj" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# groupByKey: groups the values for each key in the (key, value) pairs of the RDD into a single sequence\n", - "rdd1.map(lambda s: (s,1)).groupByKey().mapValues(list).collect()\n", - "\n", - "# (first map converts to a key-value RDD)\n", - "# (mapValues is a map that operates only on the values - in this case, used to convert from ResultIterable to List for printing reasons)" - ], - "metadata": { - "id": "dBAh2Gs8-fdM" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# reduceByKey: when called on a key-value RDD, returns a new dataset in which the values for each of its key are aggregated\n", - "rdd1.map(lambda s: (s,1)).reduceByKey(lambda x, y: x + y).collect()" - ], - "metadata": { - "id": "cH1dxiGl_cS5" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# sortByKey: returns a new RDD with (key,value) pairs of parent RDD in sorted order according to the key\n", - "rdd1.map(lambda s: (s,1)).sortByKey().collect()" - ], - "metadata": { - "id": "-XJWbbv6_0Tw" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# join: starting from two RDD with (key, value1) and (key, value2) pairs, returns a new RDD with (key, (value1, value2)) pairs\n", - "rddA = sc.parallelize([(1, \"A1\"), (2, \"A2\"), (3, \"A3\")])\n", - "rddB = sc.parallelize([(1, \"B1\"), (2, \"B2\"), (4, \"B4\")])\n", - "rddA.join(rddB).collect()" - ], - "metadata": { - "id": "SGIn90U0Md8j" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "## Actions" - ], - "metadata": { - "id": "gKJ_7MCsAIy_" - } - }, - { - "cell_type": "code", - "source": [ - "# collect: returns a list that contains all the elements of the RDD\n", - "rdd1.collect()" - ], - "metadata": { - "id": "XQXX-d20_vWo" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# count: returns the number of elements in the RDD\n", - "rdd1.count()" - ], - "metadata": { - "id": "FXuL9K2qATfL" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# reduce: aggregates the elements of the RDD using a function that takes two elements of the RDD as input and gives the result\n", - "sc.parallelize([1, 2, 3, 4, 5]).reduce( lambda x, y: x * y)" - ], - "metadata": { - "id": "YI10eDDnAWe7" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# take: returns the first n elements of RDD in the same order\n", - "rdd1.take(2)" - ], - "metadata": { - "id": "CgN5qtwBAvIv" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# saveAsTextFile: saves the content of the RDD to a file\n", - "rdd1.saveAsTextFile(\"rdd1\")" - ], - "metadata": { - "id": "yS8SuYRQvV6F" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "## Examples" - ], - "metadata": { - "id": "gmosJNIOA5S_" - } - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "autoscroll": "auto", - "id": "mGRkO7_oytKK" - }, - "outputs": [], - "source": [ - "# Flatten the words beginning with the letter C\n", - "# - map: transform each string in upper case (remember: map returns a new RDD with the same cardinality)\n", - "# - filter: keep only the strings beginning with \"C\" (remember: filter returns a new RDD with the same or smaller cardinality)\n", - "# - flatMap: explode each string into its characters (remember: flatMap returns a new RDD with the any cardinality)\n", - "rdd1\\\n", - " .map(lambda s: s.upper())\\\n", - " .filter(lambda s: s.startswith(\"U\"))\\\n", - " .flatMap(lambda s: list(s))\\\n", - " .collect()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "autoscroll": "auto", - "id": "Wrr37cZ2ytKL" - }, - "outputs": [], - "source": [ - "# A simple word count\n", - "# - map: map each word to a tuple (word, 1); each tuple represent the count associate with a word\n", - "# - reduceByKey: group all the tuples with the same word and sum the counts\n", - "# - sortBy: sort tuples by count\n", - "rdd1\\\n", - " .map(lambda s: (s, 1))\\\n", - " .reduceByKey(lambda a, b: a + b)\\\n", - " .sortBy(lambda x: x[1], False)\\\n", - " .collect()" - ] - }, - { - "cell_type": "code", - "source": [ - "# Compute average length of words depending on their initial letter\n", - "# map: map each word to a key-value tuple (word, (wordLength, 1)), where the value is an object composed by two value: the length of the word and a 1\n", - "# reduceByKey: group all the tuples with the same word to 1) sum the lengths, and 2) sum the counts\n", - "# mapValues: divides the sums by the counts to compute the averages\n", - "# sortBy: sort tuples by averages\n", - "rdd1\\\n", - " .map(lambda s: (s[0], (len(s),1)))\\\n", - " .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\\\n", - " .mapValues(lambda x: x[0]/x[1])\\\n", - " .sortBy(lambda x: x[1], False)\\\n", - " .collect()" - ], - "metadata": { - "id": "zxBoMsnTCATh" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "# Spark: working with DataFrames\n", - "\n", - "Check the documentation: [here](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.sql.DataFrame.html).\n", - "\n", - "What is different from Pandas' DataFrames?\n", - "\n", - "- Spark supports parallelization (Pandas doesn't), thus it's more suitable for big data processing\n", - "- Spark follows Lazy Execution, which means that a task is not executed until an action is performed (Pandas follows Eager Execution, which means task is executed immediately)\n", - "- Spark has immutability (Pandas has mutability)\n", - "- The data structure is similar, the APIs are different" - ], - "metadata": { - "id": "sCXrRXAAEtet" - } - }, - { - "cell_type": "code", - "source": [ - "# !wget https://raw.githubusercontent.com/w4bo/2023-bbs-dm/master/materials/datasets/housing.csv\n", - "!wget https://big.csr.unibo.it/downloads/bbs-dm/datasets/housing.csv\n", - "df = spark.read.option(\"delimiter\", \",\").option(\"header\", \"true\").csv(\"housing.csv\")\n", - "df.show()" - ], - "metadata": { - "id": "IGChYOQWRZku" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# Switching from Spark to Pandas\n", - "pandasDF = df.toPandas()\n", - "print(pandasDF)" - ], - "metadata": { - "id": "_aSykf1wV8dB" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# Switching from Pandas to Spark\n", - "df = spark.createDataFrame(pandasDF)\n", - "df.show()" - ], - "metadata": { - "id": "cFUtMUaUWTY5" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# select: returns a new DataFrame with only selected columns (similar to a map on RDDs)\n", - "df.select('population','median_house_value').show()" - ], - "metadata": { - "id": "eFAs2zBHPLTA" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# select, similarly to a map, allows column values to be redefined\n", - "df.select(df.population,df.median_house_value/1000).show()\n", - "# put the operation within parenthesis and add .alias('median_house_value_in_K$')" - ], - "metadata": { - "id": "FmClDtFaQqO7" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# withColumn: used to manipulate (rename, change the value, convert the datatype)\n", - "# an existing column in a dataframe (or to create a new column) while keeping the rest intact\n", - "df.withColumn('median_house_value_in_K$',df.median_house_value/1000).show()" - ], - "metadata": { - "id": "4whu8BylWfAK" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# filter: returns a new DataFrame containing only the elements in the parent DataFrame that satisfy the function inside filter (as in RDDs)\n", - "# orderBY: orders the DataFrame by the selected column(s)\n", - "df.filter(df.population > 1000).orderBy(df.population.asc()).show()" - ], - "metadata": { - "id": "9zrnJn3_Q0PD" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# groupBy: returns a new DataFrame which is the result of an aggregation\n", - "df.groupBy(df.ocean_proximity).agg({'median_house_value': 'avg', '*': 'count'}).show()" - ], - "metadata": { - "id": "1qv5_B90Raho" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# withColumnRenamed: rename a column\n", - "df.groupBy(df.ocean_proximity).agg({'*': 'count'}).withColumnRenamed(\"count(1)\", \"tot\").show()" - ], - "metadata": { - "id": "E3b1rxzMN7Lm" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# SQL queries can be run on a DataFrame\n", - "df.createOrReplaceTempView(\"housing\")\n", - "spark.sql(\"select ocean_proximity, avg(median_house_value) as avg_price from housing group by ocean_proximity order by avg_price desc\").show()" - ], - "metadata": { - "id": "TaP-GatdTD7k" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "# Exercise: creating a cube\n", - "\n", - "You are working with two files:\n", - "\n", - "- weather-stations.csv: it contains a list of weather stations that capture weather information every day of every year throughout the world\n", - " - Each station is identified by a StationID\n", - "- weather-sample-10k.csv: it contains the data measured by a certain station on a certain date (a sample of 10k lines collected from the National Climatic Data Center of the USA)\n", - " - Each weather measurenent is identified by a StationID and a Timestamp\n", - "\n", - "Your goal is to create a single file representing the following cube and to run some queries through PowerBI.\n", - "\n", - "![image.png]()\n", - "\n", - "The procedure to create the cube is the following.\n", - "\n", - "1. On the stations file:\n", - " 1. replace empty states and countries in stations with a placeholder value (e.g., \"XX\");\n", - " 1. keep only the following fields: stationId, state, country, elevation\n", - "2. On the weather-sample file:\n", - " 1. filter out weather wrong measurements (i.e., where airTemperatureQuality=9);\n", - " 1. keep only the following fields: stationId, airTemperature, date, month, year\n", - " 1. create a new fulldate field by concatenating year, month, and date\n", - " 1. create a new fullmonth field by concatenating year and month\n", - "1. Join stations with weather measurements on the stationId field\n", - "1. Keep only the following fields: state, country, elevation, fulldate, fullmonth, year, airTemperature\n", - "1. Aggregate the measurements by state and date to take the average temperature\n", - " - Group by: state, country, elevation, fulldate, fullmonth, year\n", - " - Calculation: avg(airTemperature)\n", - "1. Save the result on a file" - ], - "metadata": { - "id": "Dv5BUnaAWWEi" - } - }, - { - "cell_type": "code", - "source": [ - "!wget https://big.csr.unibo.it/downloads/bbs-dm/datasets/weather-stations.csv\n", - "!wget https://big.csr.unibo.it/downloads/bbs-dm/datasets/weather-sample-10k.txt" - ], - "metadata": { - "id": "REn3hjicbRFL" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "## Spark" - ], - "metadata": { - "id": "xO6bQ3NQwDAj" - } - }, - { - "cell_type": "code", - "source": [ - "dfW = sc.textFile(\"weather-sample-10k.txt\")\\\n", - " .map(lambda l: (l[4:15],l[15:19],l[19:21],l[21:23],int(l[87:92])/10,l[92:93]))\\\n", - " .toDF([\"stationId\",\"year\",\"month\",\"day\",\"airTemperature\",\"airTemperatureQuality\"])\n", - "dfW.show()" - ], - "metadata": { - "id": "Xcd4fAhgeDhH" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "from pyspark.sql.functions import concat\n", - "dfS = spark.read.option(\"delimiter\", \",\").option(\"header\", \"false\").csv(\"weather-stations.csv\")\n", - "dfS = dfS.select(concat(dfS[0],dfS[1]),dfS[2],dfS[3],dfS[4],dfS[5],dfS[6],dfS[7],dfS[8],dfS[9],dfS[10])\\\n", - " .toDF(\"stationId\",\"city\",\"country\",\"state\",\"call\",\"latitude\",\"longitude\",\"elevation\",\"date_begin\",\"date_end\")\n", - "dfS.show()" - ], - "metadata": { - "id": "OaknyiXHeCbF" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "1. On the stations file:\n", - " 1. replace empty states and countries in stations with a placeholder value (e.g., \"XX\");\n", - " 1. keep only the following fields: stationId, state, country, elevation" - ], - "metadata": { - "id": "8QqJZrglm-ZD" - } - }, - { - "cell_type": "code", - "source": [ - "dfS1 = dfS.fillna({'state': 'XX', 'country':'XX'})\n", - "dfS2 = dfS1.select('stationId','state','country','elevation')\n", - "dfS2.show()" - ], - "metadata": { - "id": "8v1YuN-4nQCQ" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "2. On the weather-sample file:\n", - " 1. filter out weather wrong measurements (i.e., where airTemperatureQuality=9);\n", - " 1. keep only the following fields: stationId, airTemperature, date, month, year\n", - " 1. create a new fulldate field by concatenating year, month, and date\n", - " 1. create a new fullmonth field by concatenating year and month" - ], - "metadata": { - "id": "5LxL1WQvnF0J" - } - }, - { - "cell_type": "code", - "source": [ - "from pyspark.sql.functions import concat, lit\n", - "dfW1 = dfW.where(\"airTemperature < 9\")\n", - "dfW2 = dfW1.select('stationId','airTemperature','day','month','year')\n", - "dfW3 = dfW2.withColumn(\"fulldate\", concat(dfW1.year,lit(\"-\"),dfW1.month,lit(\"-\"),dfW1.day))\n", - "dfW4 = dfW3.withColumn(\"fullmonth\", concat(dfW1.year,lit(\"-\"),dfW1.month))\n", - "dfW4.show()" - ], - "metadata": { - "id": "Cf9bm6CspR8x" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "3. Join stations with weather measurements on the stationId field\n" - ], - "metadata": { - "id": "jy2yp35unHhO" - } - }, - { - "cell_type": "code", - "source": [ - "dfJ = dfS2.join(dfW4, \"stationId\")\n", - "dfJ.show()" - ], - "metadata": { - "id": "oHTKQhQJrPMU" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "4. Keep only the following fields: state, country, elevation, fulldate, fullmonth, year, airTemperature" - ], - "metadata": { - "id": "mv_dfniznJ2A" - } - }, - { - "cell_type": "code", - "source": [ - "dfJ2 = dfJ.select(\"state\", \"country\", \"elevation\", \"fulldate\", \"fullmonth\", \"year\", \"airTemperature\")\n", - "dfJ2.show()" - ], - "metadata": { - "id": "dYyMxZaSrprK" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "5. Aggregate the measurements by state, country and date to take the average temperature\n", - " - Group by: state, country, elevation, fulldate, fullmonth, year\n", - " - Calculation: avg(airTemperature)" - ], - "metadata": { - "id": "YXKChhXwnLlv" - } - }, - { - "cell_type": "code", - "source": [ - "dfG = dfJ2.groupBy(\"state\", \"country\", \"elevation\", \"fulldate\", \"fullmonth\", \"year\").agg({'airTemperature': 'avg'})\n", - "dfG.show()" - ], - "metadata": { - "id": "shVr9k9hryqH" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "6. Save the result on a file" - ], - "metadata": { - "id": "zwXx8h-tv4-v" - } - }, - { - "cell_type": "code", - "source": [ - "dfG.write.mode('overwrite').option('header','true').csv(\"weather-cube\")" - ], - "metadata": { - "id": "pKTH93LQu61U" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "## PowerBI\n", - "\n", - "Download the file from the left panel of this notebook (in case of issues, download it from [here](https://raw.githubusercontent.com/w4bo/2023-bbs-dm/master/materials/datasets/weather-cube.csv)) and load it in [Power BI](https://app.powerbi.com/).\n", - "- Visualize the daily trend of average temperatures for each country\n", - "- Show the average temperature on the map\n", - "- Compute bins for the elevation field and show the average temperature for each bin\n", - "\n", - "\n", - "The final Power BI file will be available [here](https://big.csr.unibo.it/downloads/bbs-dm/results/weather-cube.twb)." - ], - "metadata": { - "id": "jkKWX-mWv8S9" - } - }, - { - "cell_type": "markdown", - "source": [ - "# Additional exercises\n", - "\n", - "The solution will be available [here](https://big.csr.unibo.it/downloads/bbs-dm/results/2024_bbs_dm_spark_basics_solution.ipynb).\n", - "\n", - "## Getting familiar with data frame transformations\n", - "\n", - "Carry out the following operations (in any order).\n", - "\n", - "- ```.select()``` operator; starting from ```dfS```:\n", - " 1. keep only country, elevation, date_begin, and date_end\n", - " 1. keep only the first four characters of date_begin using [sf.substring](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.substring.html)\n", - " 1. use [concat](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.concat.html) to concatenate date_begin with date_end and putting an underscore (\\_) in the middle; since the underscore is not a column, declare it as ```sf.lit(\"_\")```\n", - " 1. use [coalesce](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.coalesce.html) to take the value of country if not null, otherwise the value of city\n", - " 1. put countries in lowercase using [sf.lower](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.lower.html)\n", - " 1. in the previous four points, use ```.alias()``` to give a meaningful name to the obtained columns\n", - "- ```.withColumn()``` operator; starting from ```dfS```:\n", - " 1. do the same as points 2 to 5 of the ```select``` operator, but using ```withColumn```\n", - "- ```.filter()``` operator; starting from ```dfS```, keep only the rows where:\n", - " 1. elevation is greater than 5000\n", - " 1. country is not null, using [sf.isnotnull](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.isnotnull.html)\n", - " 1. conditions 1 and 2 are both true; conditions must be put between parenthesis and separated by \"&\" (e.g., check [here](https://www.geeksforgeeks.org/pyspark-filter-dataframe-based-on-multiple-conditions/))\n", - " 1. either one of conditions 1 and 2 is true; conditions must be put between parenthesis and separated by \"|\" (e.g., check [here](https://www.geeksforgeeks.org/pyspark-filter-dataframe-based-on-multiple-conditions/))\n", - " 1. date_begin is the first day of the month (requires to use substring)\n", - "- ```.groupBy()``` operator; starting from ```dfW```:\n", - " 1. group by airTemperatureQuality to count how many rows there are for each value\n", - " 1. as above, but also calculate the average temperature\n", - " 1. as above, but also given meaningful names to the results using ```withColumnRenamed```\n", - " 1. group by month to calculate the minimum and maximum temperatures and order by month using ```orderby```; to aggregate differently on the same column, use the [sf.max](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.max.html) and [sf.min](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.min.html) functions inside the ```agg``` function instead of the object enclosed by brackets ```{}```\n", - " 1. group by month and day to calculate the minimum and maximum temperatures\n", - " 1. group by stationId and year month to count the number of rows" - ], - "metadata": { - "id": "679cmN5vLOMH" - } - }, - { - "cell_type": "code", - "source": [ - "import pyspark.sql.functions as sf\n", - "\n", - "# TODO HERE" - ], - "metadata": { - "id": "82YodmWqG-JZ" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "## Complete exercises\n", - "\n", - "Carry out the following exercises (in any order).\n", - "\n", - "- Check if there exist stations with a negative elevation; then, calculate how many of these stations exist in each country; rename the result to \"cnt\" and order the result by decreasing cnt\n", - "- Take only stations with positive elevation, compute the maximum elevation by country and rename the result to \"elevation\"; then, join the result with the original dfS to get, for each country, the name of the city with the highest elevation (join key: ```[\"country\",\"elevation\"]```); order the result by decreasing elevation\n", - "- Take only weather values with airQuality==1, compute the minimum temperature for each stationId and rename it to \"minTemperature\"; then, join the result with dfS and keep only the columns \"minTemperature\" and \"elevation\"; finally, use the correlation between the two columns. To do the last part, you need to:\n", - " - cast the elevation to an integer datatype: you need to add ```from pyspark.sql.types import IntegerType``` and then ```df.myfield.cast(IntegerType())```;\n", - " - compute the correlation with ```df.stat.corr(\"myfield1\",\"myfield2\")```." - ], - "metadata": { - "id": "JFGe5AvcHCKM" - } - }, - { - "cell_type": "code", - "source": [ - "# TODO HERE" - ], - "metadata": { - "id": "KoSvRLcWYXQq" - }, - "execution_count": null, - "outputs": [] - } - ] -} \ No newline at end of file +{"nbformat":4,"nbformat_minor":0,"metadata":{"colab":{"provenance":[{"file_id":"1pOVAYbnaLc_KEkrop-dwD3qlAZYKwjCr","timestamp":1711709014150}],"authorship_tag":"ABX9TyOBxrWRv/pd7+23sx/7cFes"},"kernelspec":{"name":"python3","display_name":"Python 3"},"language_info":{"name":"python"}},"cells":[{"cell_type":"markdown","source":["# Install Spark & initialize application\n","\n","Run the following code to install Spark in your Colab environment."],"metadata":{"id":"EsElqAaj4Sse"}},{"cell_type":"code","execution_count":null,"metadata":{"id":"fbvEUbWIHm2s"},"outputs":[],"source":["!apt-get install openjdk-8-jdk-headless -qq > /dev/null\n","!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz\n","# If the above command is too slow, uncomment the following and try it\n","# !wget -q https://big.csr.unibo.it/downloads/bbs-dm/spark-3.5.1-bin-hadoop3.tgz\n","!tar xf spark-3.5.1-bin-hadoop3.tgz\n","!pip install -q findspark"]},{"cell_type":"code","source":["import os\n","os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-8-openjdk-amd64\"\n","os.environ[\"SPARK_HOME\"] = \"/content/spark-3.5.1-bin-hadoop3\"\n","import findspark\n","findspark.init()\n","findspark.find() # Should return '/content/spark-3.5.1-bin-hadoop3'"],"metadata":{"id":"4oTFM5YtJvv7"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["from pyspark.sql import SparkSession\n","\n","spark = SparkSession.builder\\\n"," .master(\"local\")\\\n"," .appName(\"Colab\")\\\n"," .config('spark.ui.port', '4050')\\\n"," .getOrCreate()\n","sc = spark.sparkContext\n","\n","sc"],"metadata":{"id":"KJlzVAmbJ9vL"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["# Spark: working with RDDs\n","\n","Check the documentation: [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis)."],"metadata":{"id":"oBd7XwkFBDEF"}},{"cell_type":"markdown","source":["## Basics"],"metadata":{"id":"jcoFwGvm4gj6"}},{"cell_type":"code","execution_count":null,"metadata":{"autoscroll":"auto","id":"vlFswJyWytKG"},"outputs":[],"source":["# let's create a simple example\n","riddle1 = \"over the bench the sheep lives under the bench the sheep dies\"\n","riddle2 = [\"over the bench the sheep lives\", \"under the bench the sheep dies\"]"]},{"cell_type":"code","execution_count":null,"metadata":{"autoscroll":"auto","id":"WHywj4BmytKH"},"outputs":[],"source":["# create an RDD from the `riddle` string\n","rdd1 = sc.parallelize(riddle1.split(\" \"))\n","# each tuple of the RDD corresponds to a single word\n","\n","print(rdd1)\n","# why is there no result returned?"]},{"cell_type":"code","execution_count":null,"metadata":{"autoscroll":"auto","id":"3oYjBLnOytKI"},"outputs":[],"source":["# compute the RDD\n","print(rdd1.collect())"]},{"cell_type":"code","source":["rdd2 = sc.parallelize(riddle2)\n","print(rdd2.collect())"],"metadata":{"id":"exzQLruZ9qgg"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["## Transformations"],"metadata":{"id":"iyH7halU9HiB"}},{"cell_type":"code","source":["# map: returns a new RDD by applying a function to each of the elements in the original RDD\n","rdd1.map(lambda s: s.upper()).collect()"],"metadata":{"id":"sndBHyEF86T5"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# flatMap: returns a new RDD by applying the function to every element of the parent RDD and then flattening the result\n","rdd2.flatMap(lambda s: s.split(\" \")).collect()"],"metadata":{"id":"9MlPRBd_-Cl1"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# filter: returns a new RDD containing only the elements in the parent RDD that satisfy the function inside filter\n","rdd1.filter(lambda s: s.startswith(\"u\")).collect()"],"metadata":{"id":"UlT_jxmH9Myx"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# distinct: returns a new RDD that contains only the distinct elements in the parent RDD\n","rdd1.distinct().collect()"],"metadata":{"id":"QxxJdRxW-Xcj"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# groupByKey: groups the values for each key in the (key, value) pairs of the RDD into a single sequence\n","rdd1.map(lambda s: (s,1)).groupByKey().mapValues(list).collect()\n","\n","# (first map converts to a key-value RDD)\n","# (mapValues is a map that operates only on the values - in this case, used to convert from ResultIterable to List for printing reasons)"],"metadata":{"id":"dBAh2Gs8-fdM"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# reduceByKey: when called on a key-value RDD, returns a new dataset in which the values for each of its key are aggregated\n","rdd1.map(lambda s: (s,1)).reduceByKey(lambda x, y: x + y).collect()"],"metadata":{"id":"cH1dxiGl_cS5"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# sortByKey: returns a new RDD with (key,value) pairs of parent RDD in sorted order according to the key\n","rdd1.map(lambda s: (s,1)).sortByKey().collect()"],"metadata":{"id":"-XJWbbv6_0Tw"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# join: starting from two RDD with (key, value1) and (key, value2) pairs, returns a new RDD with (key, (value1, value2)) pairs\n","\n","# rddProvinces: (initials, name, region)\n","rddProvinces = sc.parallelize([(\"BO\", \"Bologna\", \"Emilia-Romagna\"),(\"RA\", \"Ravenna\", \"Emilia-Romagna\"),(\"MI\", \"Milan\", \"Lombardia\")])\n","# rddPeople: (id, name, province)\n","rddPeople = sc.parallelize([(1, \"Enrico\", \"RA\"),(2, \"Alice\", \"RA\"),(3, \"Bob\", \"BO\"),(4, \"Charlie\", \"FC\")])\n","\n","# This does not work\n","rddPeople.join(rddProvinces).collect()"],"metadata":{"id":"SGIn90U0Md8j"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["rddProvinces2 = rddProvinces.map(lambda p: (p[0], (p[1],p[2])))\n","rddProvinces2.collect()"],"metadata":{"id":"gQ54N0nKjmJC"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["rddPeople2 = rddPeople.map(lambda p: (p[2], (p[0],p[1])))\n","rddPeople2.collect()"],"metadata":{"id":"PmG1337hk0a9"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["rddPeople2.join(rddProvinces2).collect()"],"metadata":{"id":"AJaZHHXYj-vp"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["## Actions"],"metadata":{"id":"gKJ_7MCsAIy_"}},{"cell_type":"code","source":["# collect: returns a list that contains all the elements of the RDD\n","rdd1.collect()"],"metadata":{"id":"XQXX-d20_vWo"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# count: returns the number of elements in the RDD\n","rdd1.count()"],"metadata":{"id":"FXuL9K2qATfL"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# reduce: aggregates the elements of the RDD using a function that takes two elements of the RDD as input and gives the result\n","sc.parallelize([1, 2, 3, 4, 5]).reduce( lambda x, y: x * y)"],"metadata":{"id":"YI10eDDnAWe7"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# take: returns the first n elements of RDD in the same order\n","rdd1.take(2)"],"metadata":{"id":"CgN5qtwBAvIv"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# saveAsTextFile: saves the content of the RDD to a file\n","rdd1.saveAsTextFile(\"rdd1\")"],"metadata":{"id":"yS8SuYRQvV6F"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["## Examples"],"metadata":{"id":"gmosJNIOA5S_"}},{"cell_type":"code","execution_count":null,"metadata":{"autoscroll":"auto","id":"mGRkO7_oytKK"},"outputs":[],"source":["# Flatten the words beginning with the letter C\n","# - map: transform each string in upper case (remember: map returns a new RDD with the same cardinality)\n","# - filter: keep only the strings beginning with \"C\" (remember: filter returns a new RDD with the same or smaller cardinality)\n","# - flatMap: explode each string into its characters (remember: flatMap returns a new RDD with the any cardinality)\n","rdd1\\\n"," .map(lambda s: s.upper())\\\n"," .filter(lambda s: s.startswith(\"U\"))\\\n"," .flatMap(lambda s: list(s))\\\n"," .collect()"]},{"cell_type":"code","execution_count":null,"metadata":{"autoscroll":"auto","id":"Wrr37cZ2ytKL"},"outputs":[],"source":["# A simple word count\n","# - map: map each word to a tuple (word, 1); each tuple represent the count associate with a word\n","# - reduceByKey: group all the tuples with the same word and sum the counts\n","# - sortBy: sort tuples by count\n","rdd1\\\n"," .map(lambda s: (s, 1))\\\n"," .reduceByKey(lambda a, b: a + b)\\\n"," .sortBy(lambda x: x[1], False)\\\n"," .collect()"]},{"cell_type":"code","source":["# Compute average length of words depending on their initial letter\n","# map: map each word to a key-value tuple (word, (wordLength, 1)), where the value is an object composed by two value: the length of the word and a 1\n","# reduceByKey: group all the tuples with the same word to 1) sum the lengths, and 2) sum the counts\n","# mapValues: divides the sums by the counts to compute the averages\n","# sortBy: sort tuples by averages\n","rdd1\\\n"," .map(lambda s: (s[0], (len(s),1)))\\\n"," .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\\\n"," .mapValues(lambda x: x[0]/x[1])\\\n"," .sortBy(lambda x: x[1], False)\\\n"," .collect()"],"metadata":{"id":"zxBoMsnTCATh"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["# Spark: working with DataFrames\n","\n","Check the documentation: [here](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.sql.DataFrame.html).\n","\n","What is different from Pandas' DataFrames?\n","\n","- Spark supports parallelization (Pandas doesn't), thus it's more suitable for big data processing\n","- Spark follows Lazy Execution, which means that a task is not executed until an action is performed (Pandas follows Eager Execution, which means task is executed immediately)\n","- Spark has immutability (Pandas has mutability)\n","- The data structure is similar, the APIs are different"],"metadata":{"id":"sCXrRXAAEtet"}},{"cell_type":"code","source":["!wget https://raw.githubusercontent.com/w4bo/2024-bbs-dm/master/materials/datasets/housing.csv\n","df = spark.read.option(\"delimiter\", \",\").option(\"header\", \"true\").csv(\"housing.csv\")\n","df.show()"],"metadata":{"id":"IGChYOQWRZku"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# Switching from Spark to Pandas\n","pandasDF = df.toPandas()\n","print(pandasDF)"],"metadata":{"id":"_aSykf1wV8dB"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# Switching from Pandas to Spark\n","df = spark.createDataFrame(pandasDF)\n","df.show()"],"metadata":{"id":"cFUtMUaUWTY5"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# select: returns a new DataFrame with only selected columns (similar to a map on RDDs)\n","df.select('population','median_house_value').show()"],"metadata":{"id":"eFAs2zBHPLTA"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# select, similarly to a map, allows column values to be redefined\n","df.select(df.population,df.median_house_value/1000).show()\n","# put the operation within parenthesis and add .alias('median_house_value_in_K$')"],"metadata":{"id":"FmClDtFaQqO7"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# withColumn: used to manipulate (rename, change the value, convert the datatype)\n","# an existing column in a dataframe (or to create a new column) while keeping the rest intact\n","df.withColumn('median_house_value_in_K$',df.median_house_value/1000).show()"],"metadata":{"id":"4whu8BylWfAK"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# filter: returns a new DataFrame containing only the elements in the parent DataFrame that satisfy the function inside filter (as in RDDs)\n","# orderBY: orders the DataFrame by the selected column(s)\n","df.filter(df.population > 1000).orderBy(df.population.asc()).show()"],"metadata":{"id":"9zrnJn3_Q0PD"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# groupBy: returns a new DataFrame which is the result of an aggregation\n","df.groupBy(df.ocean_proximity).agg({'median_house_value': 'avg', '*': 'count'}).show()"],"metadata":{"id":"1qv5_B90Raho"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# withColumnRenamed: rename a column\n","df.groupBy(df.ocean_proximity).agg({'*': 'count'}).withColumnRenamed(\"count(1)\", \"tot\").show()"],"metadata":{"id":"E3b1rxzMN7Lm"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["# SQL queries can be run on a DataFrame\n","df.createOrReplaceTempView(\"housing\")\n","spark.sql(\"select ocean_proximity, avg(median_house_value) as avg_price from housing group by ocean_proximity order by avg_price desc\").show()"],"metadata":{"id":"TaP-GatdTD7k"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["# Exercise: creating a cube\n","\n","You are working with two files:\n","\n","- weather-stations.csv: it contains a list of weather stations that capture weather information every day of every year throughout the world\n"," - Each station is identified by a StationID\n","- weather-sample-10k.csv: it contains the data measured by a certain station on a certain date (a sample of 10k lines collected from the National Climatic Data Center of the USA)\n"," - Each weather measurenent is identified by a StationID and a Timestamp\n","\n","Your goal is to create a single file representing the following cube and to run some queries through PowerBI.\n","\n","![image.png]()\n","\n","The procedure to create the cube is the following.\n","\n","1. On the stations file:\n"," 1. replace empty states and countries in stations with a placeholder value (e.g., \"XX\");\n"," 1. keep only the following fields: stationId, state, country, elevation\n","2. On the weather-sample file:\n"," 1. filter out weather wrong measurements (i.e., where airTemperatureQuality=9);\n"," 1. keep only the following fields: stationId, airTemperature, date, month, year\n"," 1. create a new fulldate field by concatenating year, month, and date\n"," 1. create a new fullmonth field by concatenating year and month\n","1. Join stations with weather measurements on the stationId field\n","1. Keep only the following fields: state, country, elevation, fulldate, fullmonth, year, airTemperature\n","1. Aggregate the measurements by state and date to take the average temperature\n"," - Group by: state, country, elevation, fulldate, fullmonth, year\n"," - Calculation: avg(airTemperature)\n","1. Save the result on a file"],"metadata":{"id":"Dv5BUnaAWWEi"}},{"cell_type":"code","source":["!wget https://raw.githubusercontent.com/w4bo/2024-bbs-dm/master/materials/datasets/weather-stations.csv\n","!wget https://raw.githubusercontent.com/w4bo/2024-bbs-dm/master/materials/datasets/weather-sample-10k.txt"],"metadata":{"id":"REn3hjicbRFL"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["## Spark"],"metadata":{"id":"xO6bQ3NQwDAj"}},{"cell_type":"code","source":["dfW = sc.textFile(\"weather-sample-10k.txt\")\\\n"," .map(lambda l: (l[4:15],l[15:19],l[19:21],l[21:23],int(l[87:92])/10,l[92:93]))\\\n"," .toDF([\"stationId\",\"year\",\"month\",\"day\",\"airTemperature\",\"airTemperatureQuality\"])\n","dfW.show()"],"metadata":{"id":"Xcd4fAhgeDhH"},"execution_count":null,"outputs":[]},{"cell_type":"code","source":["from pyspark.sql.functions import concat\n","dfS = spark.read.option(\"delimiter\", \",\").option(\"header\", \"false\").csv(\"weather-stations.csv\")\n","dfS = dfS.select(concat(dfS[0],dfS[1]),dfS[2],dfS[3],dfS[4],dfS[5],dfS[6],dfS[7],dfS[8],dfS[9],dfS[10])\\\n"," .toDF(\"stationId\",\"city\",\"country\",\"state\",\"call\",\"latitude\",\"longitude\",\"elevation\",\"date_begin\",\"date_end\")\n","dfS.show()"],"metadata":{"id":"OaknyiXHeCbF"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["1. On the stations file:\n"," 1. replace empty states and countries in stations with a placeholder value (e.g., \"XX\");\n"," 1. keep only the following fields: stationId, state, country, elevation"],"metadata":{"id":"8QqJZrglm-ZD"}},{"cell_type":"code","source":["dfS1 = dfS.fillna({'state': 'XX', 'country':'XX'})\n","dfS2 = dfS1.select('stationId','state','country','elevation')\n","dfS2.show()"],"metadata":{"id":"8v1YuN-4nQCQ"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["2. On the weather-sample file:\n"," 1. filter out weather wrong measurements (i.e., where airTemperatureQuality=9);\n"," 1. keep only the following fields: stationId, airTemperature, date, month, year\n"," 1. create a new fulldate field by concatenating year, month, and date\n"," 1. create a new fullmonth field by concatenating year and month"],"metadata":{"id":"5LxL1WQvnF0J"}},{"cell_type":"code","source":["from pyspark.sql.functions import concat, lit\n","dfW1 = dfW.where(\"airTemperature < 9\")\n","dfW2 = dfW1.select('stationId','airTemperature','day','month','year')\n","dfW3 = dfW2.withColumn(\"fulldate\", concat(dfW1.year,lit(\"-\"),dfW1.month,lit(\"-\"),dfW1.day))\n","dfW4 = dfW3.withColumn(\"fullmonth\", concat(dfW1.year,lit(\"-\"),dfW1.month))\n","dfW4.show()"],"metadata":{"id":"Cf9bm6CspR8x"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["3. Join stations with weather measurements on the stationId field\n"],"metadata":{"id":"jy2yp35unHhO"}},{"cell_type":"code","source":["dfJ = dfS2.join(dfW4, \"stationId\")\n","dfJ.show()"],"metadata":{"id":"oHTKQhQJrPMU"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["4. Keep only the following fields: state, country, elevation, fulldate, fullmonth, year, airTemperature"],"metadata":{"id":"mv_dfniznJ2A"}},{"cell_type":"code","source":["dfJ2 = dfJ.select(\"state\", \"country\", \"elevation\", \"fulldate\", \"fullmonth\", \"year\", \"airTemperature\")\n","dfJ2.show()"],"metadata":{"id":"dYyMxZaSrprK"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["5. Aggregate the measurements by state, country and date to take the average temperature\n"," - Group by: state, country, elevation, fulldate, fullmonth, year\n"," - Calculation: avg(airTemperature)"],"metadata":{"id":"YXKChhXwnLlv"}},{"cell_type":"code","source":["dfG = dfJ2.groupBy(\"state\", \"country\", \"elevation\", \"fulldate\", \"fullmonth\", \"year\").agg({'airTemperature': 'avg'})\n","dfG.show()"],"metadata":{"id":"shVr9k9hryqH"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["6. Save the result on a file"],"metadata":{"id":"zwXx8h-tv4-v"}},{"cell_type":"code","source":["dfG.write.mode('overwrite').option('header','true').csv(\"weather-cube\")"],"metadata":{"id":"pKTH93LQu61U"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["## PowerBI\n","\n","Download the file from the left panel of this notebook (in case of issues, download it from [here](https://raw.githubusercontent.com/w4bo/2024-bbs-dm/master/materials/results/weather-cube.csv)) and load it in [Power BI](https://app.powerbi.com/).\n","- Visualize the daily trend of average temperatures for each country\n","- Show the average temperature on the map\n","- Compute bins for the elevation field and show the average temperature for each bin\n","\n","\n","The final Power BI file will be available [here](https://raw.githubusercontent.com/w4bo/2024-bbs-dm/master/materials/results/weather-cube.pbix)."],"metadata":{"id":"jkKWX-mWv8S9"}},{"cell_type":"markdown","source":["# Additional exercises\n","\n","The solution will be available [here](https://raw.githubusercontent.com/w4bo/2024-bbs-dm/master/materials/results/2024_bbs_dm_spark_basics_solution.ipynb).\n","\n","## Getting familiar with data frame transformations\n","\n","Carry out the following operations (in any order).\n","\n","- ```.select()``` operator; starting from ```dfS```:\n"," 1. keep only country, elevation, date_begin, and date_end\n"," 1. keep only the first four characters of date_begin using [sf.substring](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.substring.html)\n"," 1. use [concat](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.concat.html) to concatenate date_begin with date_end and putting an underscore (\\_) in the middle; since the underscore is not a column, declare it as ```sf.lit(\"_\")```\n"," 1. use [coalesce](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.coalesce.html) to take the value of country if not null, otherwise the value of city\n"," 1. put countries in lowercase using [sf.lower](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.lower.html)\n"," 1. in the previous four points, use ```.alias()``` to give a meaningful name to the obtained columns\n","- ```.withColumn()``` operator; starting from ```dfS```:\n"," 1. do the same as points 2 to 5 of the ```select``` operator, but using ```withColumn```\n","- ```.filter()``` operator; starting from ```dfS```, keep only the rows where:\n"," 1. elevation is greater than 5000\n"," 1. country is not null, using [sf.isnotnull](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.isnotnull.html)\n"," 1. conditions 1 and 2 are both true; conditions must be put between parenthesis and separated by \"&\" (e.g., check [here](https://www.geeksforgeeks.org/pyspark-filter-dataframe-based-on-multiple-conditions/))\n"," 1. either one of conditions 1 and 2 is true; conditions must be put between parenthesis and separated by \"|\" (e.g., check [here](https://www.geeksforgeeks.org/pyspark-filter-dataframe-based-on-multiple-conditions/))\n"," 1. date_begin is the first day of the month (requires to use substring)\n","- ```.groupBy()``` operator; starting from ```dfW```:\n"," 1. group by airTemperatureQuality to count how many rows there are for each value\n"," 1. as above, but also calculate the average temperature\n"," 1. as above, but also given meaningful names to the results using ```withColumnRenamed```\n"," 1. group by month to calculate the minimum and maximum temperatures and order by month using ```orderby```; to aggregate differently on the same column, use the [sf.max](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.max.html) and [sf.min](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.min.html) functions inside the ```agg``` function instead of the object enclosed by brackets ```{}```\n"," 1. group by month and day to calculate the minimum and maximum temperatures\n"," 1. group by stationId and year month to count the number of rows"],"metadata":{"id":"679cmN5vLOMH"}},{"cell_type":"code","source":["import pyspark.sql.functions as sf\n","\n","# TODO HERE"],"metadata":{"id":"82YodmWqG-JZ"},"execution_count":null,"outputs":[]},{"cell_type":"markdown","source":["## Complete exercises\n","\n","Carry out the following exercises (in any order).\n","\n","- Check if there exist stations with a negative elevation; then, calculate how many of these stations exist in each country; rename the result to \"cnt\" and order the result by decreasing cnt\n","- Take only stations with positive elevation, compute the maximum elevation by country and rename the result to \"elevation\"; then, join the result with the original dfS to get, for each country, the name of the city with the highest elevation (join key: ```[\"country\",\"elevation\"]```); order the result by decreasing elevation\n","- Take only weather values with airQuality==1, compute the minimum temperature for each stationId and rename it to \"minTemperature\"; then, join the result with dfS and keep only the columns \"minTemperature\" and \"elevation\"; finally, use the correlation between the two columns. To do the last part, you need to:\n"," - cast the elevation to an integer datatype: you need to add ```from pyspark.sql.types import IntegerType``` and then ```df.myfield.cast(IntegerType())```;\n"," - compute the correlation with ```df.stat.corr(\"myfield1\",\"myfield2\")```."],"metadata":{"id":"JFGe5AvcHCKM"}},{"cell_type":"code","source":["# TODO HERE"],"metadata":{"id":"AG9YwgeaL_Db"},"execution_count":null,"outputs":[]}]} \ No newline at end of file