Skip to content

Commit

Permalink
Update step_by_step.md
Browse files Browse the repository at this point in the history
  • Loading branch information
emiliocimino authored Sep 8, 2023
1 parent dc30ff0 commit 33fbbbe
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions docs/step_by_step.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# FIWARE PySpark Connector Step-by-Step Tutorial

This is a step-by-step tutorial on how to configure a working example to try the FIWARE pyspark connector. In this tutorial, we simulate an on-line residual useful life prediction for batteries. To do so, a machine learning model trained on battery data is deployed as a PySpark algorithm using a spark cluster, while an Orion Context Broker provides data from unseen same-type batteries. This setup allows the real-time prediction of battery residual useful lifes.
This is a step-by-step tutorial on how to configure a working example to try the FIWARE PySpark Connector. In this tutorial, we simulate an on-line residual useful life prediction for batteries. To do so, a machine learning model trained on battery data is deployed as a PySpark algorithm using a spark cluster, while an Orion Context Broker provides data from unseen same-type batteries. This setup allows the real-time prediction of battery residual useful lifes.

**TUTORIAL STATUS: UPDATED TO LAST VERSION**

Expand All @@ -27,7 +27,7 @@ The above figure explains the overall architecture set up in the tutorial and ho

### How the FIWARE PySpark Connector Works

The mechanism behind the pyspark connector is quite simple: it sets up a simple HTTP server to receive notifications from Orion and then pushes it inside PySpark using the [SocketTextStream](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.streaming.StreamingContext.socketTextStream.html) function that creates an input TCP source for building *Resilient Distributed Datasets* (RDDs), the streaming data unit of pyspark.
The mechanism behind the FIWARE PySpark Connector is quite simple: it sets up a simple HTTP server to receive notifications from Orion and then pushes it inside PySpark using the [SocketTextStream](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.streaming.StreamingContext.socketTextStream.html) function that creates an input TCP source for building *Resilient Distributed Datasets* (RDDs), the streaming data unit of PySpark.
The figure below shows the detailed process of connector setup, followed by data reception, management, processing and sinking.

![SequenceDiagram](https://github.com/Engineering-Research-and-Development/fiware-orion-pyspark-connector/assets/103200695/1e2618d4-6641-4ced-b412-305c1db4bff0)
Expand All @@ -38,8 +38,8 @@ The figure below shows the detailed process of connector setup, followed by data
- Connector's HTTP server starts in sleeping phase
- Connector's Multi-Thread Socket Server (MTSS) starts, remaining in listening phase
- Spark Streaming context is initialized
- pyspark's SocketTextStream function creates a TCP input with any available local IPs and Port, connecting to MTSS
- MTSS saves pyspark's TCP socket as "first client"
- PySpark's SocketTextStream function creates a TCP input with any available local IPs and Port, connecting to MTSS
- MTSS saves PySpark's TCP socket as "first client"
- MTSS awakens HTTP Server
- Streaming Context and RDD channel are returned

Expand All @@ -63,7 +63,7 @@ The figure below shows the detailed process of connector setup, followed by data

- **Fourth Phase: Data Write-back**
- Spark driver calls the *forEachRDD* function, then passes each rdd to the *forEachPartition* function, hence mapping the last result to a worker
- using *forEachRDD* is used as RDD sink (since pyspark is lazy, all operations are performed only when a sink operation is performed)
- using *forEachRDD* is used as RDD sink (since PySpark is lazy, all operations are performed only when a sink operation is performed)
- using *forEachPartition* allow to set up connector parameters only once, then it iterates on incoming RDDs
- *forEachPartition* needs a callback function that uses an iterator as argument
- Spark driver sinks the data flux, mapping to worker an output function
Expand Down Expand Up @@ -167,13 +167,13 @@ spark-submit predict.py --py-files model.pickle
- After updating the entity, the spark job should print some lines such as:
![image](https://github.com/Engineering-Research-and-Development/fiware-orion-pyspark-connector/assets/103200695/841747ac-f5aa-4a6f-9887-85a0a41664e0)

Until now, we just explained how FIWARE PySpark connector works, after being correctly configured and used. Next part of this tutorial provides deeper comprehension of what it is necessary to do with custom algorithms to achieve those results.
Until now, we just explained how FIWARE PySpark Connector works, after being correctly configured and used. Next part of this tutorial provides deeper comprehension of what it is necessary to do with custom algorithms to achieve those results.


## How to Develop an NGSI-Integrated Custom Algorithm with FIWARE PySpark Connector

### Configuration
To start getting familiar with FIWARE pyspark connector, let's speak of its configuration file.
To start getting familiar with FIWARE PySpark Connector, let's speak of its configuration file.
Configuration file contains some class definition and default configuration values. Here's an example:

![image](https://github.com/Engineering-Research-and-Development/fiware-orion-pyspark-connector/assets/103200695/41d2c7f0-c0d8-4b7b-a7fe-b06292182221)
Expand Down Expand Up @@ -209,11 +209,11 @@ The below diagram is a scheme showing how classes are linked together:

### Building an integrated algorithm

This subsection wants to dive inside the code contained in [**predict.py**](https://github.com/Engineering-Research-and-Development/fiware-orion-pyspark-connector/blob/main/tutorial_resources/jobs/Concrete_Prediction/predict.py) to understand how to integrate your custom algorithm with the FIWARE PySpark connector.
This subsection wants to dive inside the code contained in [**predict.py**](https://github.com/Engineering-Research-and-Development/fiware-orion-pyspark-connector/blob/main/tutorial_resources/jobs/Concrete_Prediction/predict.py) to understand how to integrate your custom algorithm with the FIWARE PySpark Connector.

- **Step 1: Start a Spark Session**

First and foremost, to use pyspark it is important to import PySpark libraries, the FIWARE PySpark Connector library and create or get a spark session to use:
First and foremost, to use PySpark it is important to import PySpark libraries, the FIWARE PySpark Connector library and create or get a spark session to use:

```python
from fpc import connector
Expand Down Expand Up @@ -273,7 +273,7 @@ def PredictAndComputeError(x, y):

**Step 5: Define Worker Mapping in PySpark**

Now is necessary to define some mapping functions to work with RDDs. This is the most tricky part of all FIWARE PySpark Connecto integration and it will be explained point by point.
Now is necessary to define some mapping functions to work with RDDs. This is the most tricky part of all FIWARE PySpark Connector integration and it will be explained point by point.

```python
try:
Expand Down

0 comments on commit 33fbbbe

Please sign in to comment.