Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DE박상민_w5m1 #281

Open
wants to merge 1 commit into
base: ssangmin-junior
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added missions/.DS_Store
Binary file not shown.
Binary file added missions/W5/.DS_Store
Binary file not shown.
117 changes: 117 additions & 0 deletions missions/W5/m1/m1.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import pyarrow.parquet as pq\n",
"trips = pq.read_table('/Users/admin/softeer/week5/yellow_tripdata_2024-05.parquet')\n",
"trips = trips.to_pandas()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"ename": "Py4JJavaError",
"evalue": "An error occurred while calling o32.applyModifiableSettings.\n: java.lang.IllegalStateException: LiveListenerBus is stopped.\n\tat org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:92)\n\tat org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:75)\n\tat org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:115)\n\tat org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)\n\tat org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)\n\tat org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)\n\tat org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)\n\tat org.apache.spark.sql.SparkSession$.conf$lzycompute$1(SparkSession.scala:1213)\n\tat org.apache.spark.sql.SparkSession$.conf$1(SparkSession.scala:1213)\n\tat org.apache.spark.sql.SparkSession$.applyModifiableSettings(SparkSession.scala:1216)\n\tat java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:580)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:1570)\n",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[3], line 12\u001b[0m\n\u001b[1;32m 6\u001b[0m trips \u001b[38;5;241m=\u001b[39m pq\u001b[38;5;241m.\u001b[39mread_table(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m/Users/admin/softeer/week5/yellow_tripdata_2024-05.parquet\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[1;32m 8\u001b[0m \u001b[38;5;66;03m# Spark Session 생성\u001b[39;00m\n\u001b[1;32m 9\u001b[0m spark \u001b[38;5;241m=\u001b[39m \u001b[43mSparkSession\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mbuilder\u001b[49m\u001b[43m \u001b[49m\u001b[43m\\\u001b[49m\n\u001b[1;32m 10\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mappName\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mNYC_TLC_Data_Analysis\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[43m\\\u001b[49m\n\u001b[1;32m 11\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mmaster\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mlocal[*]\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[43m\\\u001b[49m\n\u001b[0;32m---> 12\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgetOrCreate\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 14\u001b[0m \u001b[38;5;66;03m# PyArrow 테이블을 Spark DataFrame으로 변환\u001b[39;00m\n\u001b[1;32m 15\u001b[0m taxi_trip_df \u001b[38;5;241m=\u001b[39m spark\u001b[38;5;241m.\u001b[39mcreateDataFrame(trips\u001b[38;5;241m.\u001b[39mto_pandas())\n",
"File \u001b[0;32m~/softeer/myenv/lib/python3.12/site-packages/pyspark/sql/session.py:504\u001b[0m, in \u001b[0;36mSparkSession.Builder.getOrCreate\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 500\u001b[0m session \u001b[38;5;241m=\u001b[39m SparkSession(sc, options\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_options)\n\u001b[1;32m 501\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 502\u001b[0m \u001b[38;5;28;43mgetattr\u001b[39;49m\u001b[43m(\u001b[49m\n\u001b[1;32m 503\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mgetattr\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43msession\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_jvm\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mSparkSession$\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mMODULE$\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\n\u001b[0;32m--> 504\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mapplyModifiableSettings\u001b[49m\u001b[43m(\u001b[49m\u001b[43msession\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_jsparkSession\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_options\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 505\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m session\n",
"File \u001b[0;32m~/softeer/myenv/lib/python3.12/site-packages/py4j/java_gateway.py:1322\u001b[0m, in \u001b[0;36mJavaMember.__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1316\u001b[0m command \u001b[38;5;241m=\u001b[39m proto\u001b[38;5;241m.\u001b[39mCALL_COMMAND_NAME \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1317\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mcommand_header \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1318\u001b[0m args_command \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1319\u001b[0m proto\u001b[38;5;241m.\u001b[39mEND_COMMAND_PART\n\u001b[1;32m 1321\u001b[0m answer \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mgateway_client\u001b[38;5;241m.\u001b[39msend_command(command)\n\u001b[0;32m-> 1322\u001b[0m return_value \u001b[38;5;241m=\u001b[39m \u001b[43mget_return_value\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 1323\u001b[0m \u001b[43m \u001b[49m\u001b[43manswer\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgateway_client\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mtarget_id\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mname\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1325\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m temp_arg \u001b[38;5;129;01min\u001b[39;00m temp_args:\n\u001b[1;32m 1326\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mhasattr\u001b[39m(temp_arg, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m_detach\u001b[39m\u001b[38;5;124m\"\u001b[39m):\n",
"File \u001b[0;32m~/softeer/myenv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:179\u001b[0m, in \u001b[0;36mcapture_sql_exception.<locals>.deco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 177\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mdeco\u001b[39m(\u001b[38;5;241m*\u001b[39ma: Any, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkw: Any) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m Any:\n\u001b[1;32m 178\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 179\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mf\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43ma\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkw\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 180\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m Py4JJavaError \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 181\u001b[0m converted \u001b[38;5;241m=\u001b[39m convert_exception(e\u001b[38;5;241m.\u001b[39mjava_exception)\n",
"File \u001b[0;32m~/softeer/myenv/lib/python3.12/site-packages/py4j/protocol.py:326\u001b[0m, in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 324\u001b[0m value \u001b[38;5;241m=\u001b[39m OUTPUT_CONVERTER[\u001b[38;5;28mtype\u001b[39m](answer[\u001b[38;5;241m2\u001b[39m:], gateway_client)\n\u001b[1;32m 325\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m answer[\u001b[38;5;241m1\u001b[39m] \u001b[38;5;241m==\u001b[39m REFERENCE_TYPE:\n\u001b[0;32m--> 326\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m Py4JJavaError(\n\u001b[1;32m 327\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mAn error occurred while calling \u001b[39m\u001b[38;5;132;01m{0}\u001b[39;00m\u001b[38;5;132;01m{1}\u001b[39;00m\u001b[38;5;132;01m{2}\u001b[39;00m\u001b[38;5;124m.\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39m\n\u001b[1;32m 328\u001b[0m \u001b[38;5;28mformat\u001b[39m(target_id, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m.\u001b[39m\u001b[38;5;124m\"\u001b[39m, name), value)\n\u001b[1;32m 329\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 330\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m Py4JError(\n\u001b[1;32m 331\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mAn error occurred while calling \u001b[39m\u001b[38;5;132;01m{0}\u001b[39;00m\u001b[38;5;132;01m{1}\u001b[39;00m\u001b[38;5;132;01m{2}\u001b[39;00m\u001b[38;5;124m. Trace:\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;132;01m{3}\u001b[39;00m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39m\n\u001b[1;32m 332\u001b[0m \u001b[38;5;28mformat\u001b[39m(target_id, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m.\u001b[39m\u001b[38;5;124m\"\u001b[39m, name, value))\n",
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o32.applyModifiableSettings.\n: java.lang.IllegalStateException: LiveListenerBus is stopped.\n\tat org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:92)\n\tat org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:75)\n\tat org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:115)\n\tat org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)\n\tat org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)\n\tat org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)\n\tat org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)\n\tat org.apache.spark.sql.SparkSession$.conf$lzycompute$1(SparkSession.scala:1213)\n\tat org.apache.spark.sql.SparkSession$.conf$1(SparkSession.scala:1213)\n\tat org.apache.spark.sql.SparkSession$.applyModifiableSettings(SparkSession.scala:1216)\n\tat java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:580)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:1570)\n"
]
}
],
"source": [
"import pyarrow.parquet as pq\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import col, sum as _sum, to_date\n",
"\n",
"# PyArrow를 사용하여 Parquet 파일 읽기\n",
"trips = pq.read_table('/Users/admin/softeer/week5/yellow_tripdata_2024-05.parquet')\n",
"\n",
"# Spark Session 생성\n",
"spark = SparkSession.builder \\\n",
" .appName(\"NYC_TLC_Data_Analysis\") \\\n",
" .master(\"local[*]\") \\\n",
" .getOrCreate()\n",
"\n",
"# PyArrow 테이블을 Spark DataFrame으로 변환\n",
"taxi_trip_df = spark.createDataFrame(trips.to_pandas())\n",
"\n",
"# 데이터 클리닝\n",
"taxi_trip_df = taxi_trip_df.filter((col('fare_amount') > 0) & (col('trip_distance') > 0))\n",
"\n",
"# 필요한 열 선택 및 데이터 변환\n",
"taxi_trip_df = taxi_trip_df.select('pickup_datetime', 'trip_distance', 'fare_amount')\n",
"taxi_trip_df = taxi_trip_df.withColumn('pickup_date', to_date(col('pickup_datetime')))\n",
"\n",
"# 집계 로직: 일별 집계\n",
"daily_metrics_df = taxi_trip_df.groupBy('pickup_date') \\\n",
" .agg(\n",
" _sum('fare_amount').alias('total_revenue'),\n",
" _sum('trip_distance').alias('total_trip_distance'),\n",
" count('*').alias('total_trips')\n",
" )\n",
"\n",
"# 전체 집계 결과 계산\n",
"total_trips = taxi_trip_df.count()\n",
"total_revenue = taxi_trip_df.agg(_sum('fare_amount').alias('total_revenue')).collect()[0]['total_revenue']\n",
"avg_trip_distance = taxi_trip_df.agg(avg('trip_distance').alias('avg_trip_distance')).collect()[0]['avg_trip_distance']\n",
"\n",
"# 전체 집계 결과 출력\n",
"print(f\"Total Trips: {total_trips}\")\n",
"print(f\"Total Revenue: {total_revenue}\")\n",
"print(f\"Average Trip Distance: {avg_trip_distance}\")\n",
"\n",
"# 일별 집계 결과 출력\n",
"daily_metrics_df.show()\n",
"\n",
"# 결과 저장 (로컬 파일 시스템에 저장)\n",
"output_path = \"/Users/admin/softeer/week5/output\"\n",
"daily_metrics_df.write.csv(f\"{output_path}/daily_metrics\", header=True)\n",
"\n",
"# Spark 세션 종료\n",
"spark.stop()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.