diff --git a/missions/.DS_Store b/missions/.DS_Store new file mode 100644 index 00000000..6420a98f Binary files /dev/null and b/missions/.DS_Store differ diff --git a/missions/W5/.DS_Store b/missions/W5/.DS_Store new file mode 100644 index 00000000..a7a62f03 Binary files /dev/null and b/missions/W5/.DS_Store differ diff --git a/missions/W5/m1/m1.ipynb b/missions/W5/m1/m1.ipynb new file mode 100644 index 00000000..57874041 --- /dev/null +++ b/missions/W5/m1/m1.ipynb @@ -0,0 +1,117 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow.parquet as pq\n", + "trips = pq.read_table('/Users/admin/softeer/week5/yellow_tripdata_2024-05.parquet')\n", + "trips = trips.to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "ename": "Py4JJavaError", + "evalue": "An error occurred while calling o32.applyModifiableSettings.\n: java.lang.IllegalStateException: LiveListenerBus is stopped.\n\tat org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:92)\n\tat org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:75)\n\tat org.apache.spark.sql.internal.SharedState.(SharedState.scala:115)\n\tat org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)\n\tat org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)\n\tat org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)\n\tat org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)\n\tat org.apache.spark.sql.SparkSession$.conf$lzycompute$1(SparkSession.scala:1213)\n\tat org.apache.spark.sql.SparkSession$.conf$1(SparkSession.scala:1213)\n\tat org.apache.spark.sql.SparkSession$.applyModifiableSettings(SparkSession.scala:1216)\n\tat java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:580)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:1570)\n", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[3], line 12\u001b[0m\n\u001b[1;32m 6\u001b[0m trips \u001b[38;5;241m=\u001b[39m pq\u001b[38;5;241m.\u001b[39mread_table(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m/Users/admin/softeer/week5/yellow_tripdata_2024-05.parquet\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[1;32m 8\u001b[0m \u001b[38;5;66;03m# Spark Session 생성\u001b[39;00m\n\u001b[1;32m 9\u001b[0m spark \u001b[38;5;241m=\u001b[39m \u001b[43mSparkSession\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mbuilder\u001b[49m\u001b[43m \u001b[49m\u001b[43m\\\u001b[49m\n\u001b[1;32m 10\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mappName\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mNYC_TLC_Data_Analysis\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[43m\\\u001b[49m\n\u001b[1;32m 11\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mmaster\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mlocal[*]\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[43m\\\u001b[49m\n\u001b[0;32m---> 12\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgetOrCreate\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 14\u001b[0m \u001b[38;5;66;03m# PyArrow 테이블을 Spark DataFrame으로 변환\u001b[39;00m\n\u001b[1;32m 15\u001b[0m taxi_trip_df \u001b[38;5;241m=\u001b[39m spark\u001b[38;5;241m.\u001b[39mcreateDataFrame(trips\u001b[38;5;241m.\u001b[39mto_pandas())\n", + "File \u001b[0;32m~/softeer/myenv/lib/python3.12/site-packages/pyspark/sql/session.py:504\u001b[0m, in \u001b[0;36mSparkSession.Builder.getOrCreate\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 500\u001b[0m session \u001b[38;5;241m=\u001b[39m SparkSession(sc, options\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_options)\n\u001b[1;32m 501\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 502\u001b[0m \u001b[38;5;28;43mgetattr\u001b[39;49m\u001b[43m(\u001b[49m\n\u001b[1;32m 503\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mgetattr\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43msession\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_jvm\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mSparkSession$\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mMODULE$\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\n\u001b[0;32m--> 504\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mapplyModifiableSettings\u001b[49m\u001b[43m(\u001b[49m\u001b[43msession\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_jsparkSession\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_options\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 505\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m session\n", + "File \u001b[0;32m~/softeer/myenv/lib/python3.12/site-packages/py4j/java_gateway.py:1322\u001b[0m, in \u001b[0;36mJavaMember.__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1316\u001b[0m command \u001b[38;5;241m=\u001b[39m proto\u001b[38;5;241m.\u001b[39mCALL_COMMAND_NAME \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1317\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mcommand_header \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1318\u001b[0m args_command \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1319\u001b[0m proto\u001b[38;5;241m.\u001b[39mEND_COMMAND_PART\n\u001b[1;32m 1321\u001b[0m answer \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mgateway_client\u001b[38;5;241m.\u001b[39msend_command(command)\n\u001b[0;32m-> 1322\u001b[0m return_value \u001b[38;5;241m=\u001b[39m \u001b[43mget_return_value\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 1323\u001b[0m \u001b[43m \u001b[49m\u001b[43manswer\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgateway_client\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mtarget_id\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mname\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1325\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m temp_arg \u001b[38;5;129;01min\u001b[39;00m temp_args:\n\u001b[1;32m 1326\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mhasattr\u001b[39m(temp_arg, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m_detach\u001b[39m\u001b[38;5;124m\"\u001b[39m):\n", + "File \u001b[0;32m~/softeer/myenv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:179\u001b[0m, in \u001b[0;36mcapture_sql_exception..deco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 177\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mdeco\u001b[39m(\u001b[38;5;241m*\u001b[39ma: Any, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkw: Any) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m Any:\n\u001b[1;32m 178\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 179\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mf\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43ma\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkw\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 180\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m Py4JJavaError \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 181\u001b[0m converted \u001b[38;5;241m=\u001b[39m convert_exception(e\u001b[38;5;241m.\u001b[39mjava_exception)\n", + "File \u001b[0;32m~/softeer/myenv/lib/python3.12/site-packages/py4j/protocol.py:326\u001b[0m, in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 324\u001b[0m value \u001b[38;5;241m=\u001b[39m OUTPUT_CONVERTER[\u001b[38;5;28mtype\u001b[39m](answer[\u001b[38;5;241m2\u001b[39m:], gateway_client)\n\u001b[1;32m 325\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m answer[\u001b[38;5;241m1\u001b[39m] \u001b[38;5;241m==\u001b[39m REFERENCE_TYPE:\n\u001b[0;32m--> 326\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m Py4JJavaError(\n\u001b[1;32m 327\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mAn error occurred while calling \u001b[39m\u001b[38;5;132;01m{0}\u001b[39;00m\u001b[38;5;132;01m{1}\u001b[39;00m\u001b[38;5;132;01m{2}\u001b[39;00m\u001b[38;5;124m.\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39m\n\u001b[1;32m 328\u001b[0m \u001b[38;5;28mformat\u001b[39m(target_id, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m.\u001b[39m\u001b[38;5;124m\"\u001b[39m, name), value)\n\u001b[1;32m 329\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 330\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m Py4JError(\n\u001b[1;32m 331\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mAn error occurred while calling \u001b[39m\u001b[38;5;132;01m{0}\u001b[39;00m\u001b[38;5;132;01m{1}\u001b[39;00m\u001b[38;5;132;01m{2}\u001b[39;00m\u001b[38;5;124m. Trace:\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;132;01m{3}\u001b[39;00m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39m\n\u001b[1;32m 332\u001b[0m \u001b[38;5;28mformat\u001b[39m(target_id, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m.\u001b[39m\u001b[38;5;124m\"\u001b[39m, name, value))\n", + "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o32.applyModifiableSettings.\n: java.lang.IllegalStateException: LiveListenerBus is stopped.\n\tat org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:92)\n\tat org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:75)\n\tat org.apache.spark.sql.internal.SharedState.(SharedState.scala:115)\n\tat org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)\n\tat org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)\n\tat org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)\n\tat org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)\n\tat org.apache.spark.sql.SparkSession$.conf$lzycompute$1(SparkSession.scala:1213)\n\tat org.apache.spark.sql.SparkSession$.conf$1(SparkSession.scala:1213)\n\tat org.apache.spark.sql.SparkSession$.applyModifiableSettings(SparkSession.scala:1216)\n\tat java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:580)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:1570)\n" + ] + } + ], + "source": [ + "import pyarrow.parquet as pq\n", + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import col, sum as _sum, to_date\n", + "\n", + "# PyArrow를 사용하여 Parquet 파일 읽기\n", + "trips = pq.read_table('/Users/admin/softeer/week5/yellow_tripdata_2024-05.parquet')\n", + "\n", + "# Spark Session 생성\n", + "spark = SparkSession.builder \\\n", + " .appName(\"NYC_TLC_Data_Analysis\") \\\n", + " .master(\"local[*]\") \\\n", + " .getOrCreate()\n", + "\n", + "# PyArrow 테이블을 Spark DataFrame으로 변환\n", + "taxi_trip_df = spark.createDataFrame(trips.to_pandas())\n", + "\n", + "# 데이터 클리닝\n", + "taxi_trip_df = taxi_trip_df.filter((col('fare_amount') > 0) & (col('trip_distance') > 0))\n", + "\n", + "# 필요한 열 선택 및 데이터 변환\n", + "taxi_trip_df = taxi_trip_df.select('pickup_datetime', 'trip_distance', 'fare_amount')\n", + "taxi_trip_df = taxi_trip_df.withColumn('pickup_date', to_date(col('pickup_datetime')))\n", + "\n", + "# 집계 로직: 일별 집계\n", + "daily_metrics_df = taxi_trip_df.groupBy('pickup_date') \\\n", + " .agg(\n", + " _sum('fare_amount').alias('total_revenue'),\n", + " _sum('trip_distance').alias('total_trip_distance'),\n", + " count('*').alias('total_trips')\n", + " )\n", + "\n", + "# 전체 집계 결과 계산\n", + "total_trips = taxi_trip_df.count()\n", + "total_revenue = taxi_trip_df.agg(_sum('fare_amount').alias('total_revenue')).collect()[0]['total_revenue']\n", + "avg_trip_distance = taxi_trip_df.agg(avg('trip_distance').alias('avg_trip_distance')).collect()[0]['avg_trip_distance']\n", + "\n", + "# 전체 집계 결과 출력\n", + "print(f\"Total Trips: {total_trips}\")\n", + "print(f\"Total Revenue: {total_revenue}\")\n", + "print(f\"Average Trip Distance: {avg_trip_distance}\")\n", + "\n", + "# 일별 집계 결과 출력\n", + "daily_metrics_df.show()\n", + "\n", + "# 결과 저장 (로컬 파일 시스템에 저장)\n", + "output_path = \"/Users/admin/softeer/week5/output\"\n", + "daily_metrics_df.write.csv(f\"{output_path}/daily_metrics\", header=True)\n", + "\n", + "# Spark 세션 종료\n", + "spark.stop()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git "a/missions/W5/m1/\354\212\244\355\201\254\353\246\260\354\203\267 2024-08-26 \354\230\244\354\240\204 6.32.51.png" "b/missions/W5/m1/\354\212\244\355\201\254\353\246\260\354\203\267 2024-08-26 \354\230\244\354\240\204 6.32.51.png" new file mode 100644 index 00000000..2abb878c Binary files /dev/null and "b/missions/W5/m1/\354\212\244\355\201\254\353\246\260\354\203\267 2024-08-26 \354\230\244\354\240\204 6.32.51.png" differ