Skip to content

Commit

Permalink
Merge pull request #122 from amosproj/feature/apache_airflow_input_ou…
Browse files Browse the repository at this point in the history
…tput

Feature/apache airflow input output
  • Loading branch information
Keldami authored Dec 20, 2023
2 parents 0d39df4 + 2544264 commit 9d715e4
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 53 deletions.
50 changes: 30 additions & 20 deletions src/backend/api/airflow_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ def airflow_post(url, json_object):
basic = HTTPBasicAuth(os.getenv('AIRFLOW_USERNAME'), os.getenv('AIRFLOW_PASSWORD'))
response = requests.post(os.getenv('AIRFLOW_SERVER_URL') + 'api/v1/' + url, json=json_object,
auth=basic, headers={'content-type': 'application/json'})
if response.status_code == 200:
return response
else:
return jsonify({'error': 'Failed to post to apache airflow'}), 500
return response


@airflow_api.route('/dags', methods=['GET'])
Expand All @@ -38,13 +35,38 @@ def dags():
else:
return jsonify({'error': 'Failed to trigger Airflow DAG'}), 500

@airflow_api.route('/dags/<id>/execute', methods=['GET'])
@airflow_api.route('/dags/<id>/execute', methods=['POST'])
@secure
def dagsExecuteById(id):
file_name = request.args.get('parameter')
json_config = {'conf': download_file(file_name)}
data = request.json

response = airflow_post('dags/' + id + '/dagRuns', json_config)
if "datapipelineId" not in data:
return (
jsonify({"error": "Missing datapipelineId in request."}),
400,
)
if "fileId" not in data:
return (
jsonify({"error": "Missing fileId in request."}),
400,
)

if "fileName" not in data:
return (
jsonify({"error": "Missing fileName in request."}),
400,
)


airflow_config = \
{'conf': {
"download": download_file(data['fileName']),
'datapipelineId': id,
'fileId': data['fileId']
}
}

response = airflow_post('dags/' + id + '/dagRuns', airflow_config)
if response.status_code == 200:
return jsonify(response.json())
else:
Expand Down Expand Up @@ -87,15 +109,3 @@ def dagsDetailsByIdByExecutionDateByTaskInstance(id, execution_date, task_instan
else:
return jsonify({'error': 'Failed to trigger Airflow DAG'}), 500


@airflow_api.route('/inputData', methods=['POST'])
def test_input_endpoint():

data = request.json
print(data)
print(id)
if data:
return jsonify({'message': 'successful data transfer for: '}), 200
else:
return jsonify({'error': 'Entity not found'}), 400

94 changes: 94 additions & 0 deletions src/backend/api/dp_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from flask import request, jsonify, Blueprint

from database.models.dp_run import DatapipelineRun
from database.mongo_repo import datapipelineRunDB
from services.auth_service import secure
from services.dp_run import run

dp_run = Blueprint("dp_run", __name__, template_folder="templates")




@dp_run.route("/dp_run", methods=["GET"])
@secure
def get_all_dp_runs():
dp_run = datapipelineRunDB.find()

allData = []
for d in dp_run:
allData.append({
"executionId": d["executionId"],
"datapipelineId": d["datapipelineId"],
"fileId": d["fileId"],
"result": d["result"],
"state": d["state"],
})
return jsonify(allData), 201


@dp_run.route("/dp_run/new", methods=["POST"])
@secure
def create_dp_run():
data = request.json

if "datapipelineId" not in data or "fileId" not in data:
return (
jsonify({"error": "Missing datapipelineId or s3BucketFileId in request."}),
400,
)

created_dp_run = DatapipelineRun(
data["datapipelineId"],
data["fileId"],
)

datapipelineRunDB.insert_one(created_dp_run.to_json())

return jsonify({"message": "Datapipeline dp_run is stored successfully",
"object": created_dp_run.to_json()}), 201

@dp_run.route("/dp_run/<executionId>/run", methods=["GET"])
@secure
def run_by_id(executionId):
run_response = run(executionId)
if run_response:
return jsonify({"message": "Successfully started"})
else:
return jsonify({"message": "Failed to start"})


@dp_run.route("/dp_run/<id>", methods=["DELETE"])
@secure
def delete_dp_run(id):

result = datapipelineRunDB.delete_one({"executionId": id})

if result.deleted_count > 0:
return jsonify({'message': 'Sucessfully deleted'}), 201
else:
return jsonify({'error': 'Entity not found'}), 400

@dp_run.route('/inputData', methods=['POST'])
# @public
def input_endpoint():

data = request.json

error_flag = False
if 'error' in data:
error_flag = True
if 'executionId' not in data or 'result' not in data:
return jsonify({'error': 'Missing id or result in request'}), 400

d = datapipelineRunDB.find_one({"executionId": data['executionId']})
if not d:
return jsonify({'error': 'Entity not found'}), 400

if error_flag:
datapipelineRunDB.update_one({"executionId": data['executionId']}, {'$set': { 'state': "FAILED" }})
else:
# TODO add to result not overwrite
datapipelineRunDB.update_one({"executionId": data['executionId']}, {'$set': { 'state': "SUCCESSFULL", 'result': data['result'] }})

return jsonify({'executionId': d['executionId'], 'result': d['result'], 'fileId': d['fileId'], 'datapipelineId': d['datapipelineId']}), 201
4 changes: 2 additions & 2 deletions src/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from api.airflow_api import airflow_api
from api.datapipeline import datapipeline
from api.dp_run import dp_run
from api.fileWP import fileWP
from api.upload_api import upload_api
from api.metadata import metadata
from services.auth_service import secure
from flask_cors import CORS

Expand Down Expand Up @@ -38,7 +38,7 @@
app.register_blueprint(datapipeline, url_prefix="/")
app.register_blueprint(fileWP)
app.register_blueprint(airflow_api)
app.register_blueprint(metadata)
app.register_blueprint(dp_run)


@app.route("/")
Expand Down
32 changes: 32 additions & 0 deletions src/backend/database/models/dp_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import uuid
from datetime import datetime

# Datapipeline State
# PENDING
# QUEUED
# FINISHED
# FAILED


class DatapipelineRun:
def __init__(
self,
datapipelineId,
fileId,
):
self.executionId = str(uuid.uuid4())
self.datapipelineId = datapipelineId
self.fileId = fileId
self.result = []
self.create_date = datetime.now()
self.state = "PENDING"

def to_json(self):
return {
"executionId": self.executionId,
"datapipelineId": self.datapipelineId,
"fileId": self.fileId,
"result": self.result,
"create_date": self.create_date,
"state": self.state,
}
1 change: 1 addition & 0 deletions src/backend/database/mongo_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
datapipelineDB = db["datapipeline"]
metadataDB = db["metadata"]
s3filename = db["S3FileNames'"]
datapipelineRunDB = db["dp_run"]
32 changes: 16 additions & 16 deletions src/backend/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
version: '3.8'
services:
app:
environment:
- MONGODB_URL=amos_mongodb
- MONGODB_PORT=27017
- MONGODB_USER=root
- MONGODB_PASSWORD=pass
- HOST_URL=0.0.0.0
- HOST_PORT=8000
build: .
volumes:
- .:/app
ports:
- "8000:8000"
command: python -u app.py
links:
- db
# app:
# environment:
# - MONGODB_URL=amos_mongodb
# - MONGODB_PORT=27017
# - MONGODB_USER=root
# - MONGODB_PASSWORD=pass
# - HOST_URL=0.0.0.0
# - HOST_PORT=8000
# build: .
# volumes:
# - .:/app
# ports:
# - "8000:8000"
# command: python -u app.py
# links:
# - db
db:
image: mongo:latest
hostname: amos_mongodb
Expand Down
14 changes: 13 additions & 1 deletion src/backend/services/auth_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

from functools import wraps
from flask import request, redirect, url_for, session
from flask import redirect, url_for, session
from dotenv import load_dotenv


Expand All @@ -18,3 +18,15 @@ def decorated_function(*args, **kwargs):
return f(*args, **kwargs)
return decorated_function


def public(f):
@wraps(f)
def decorated_function(*args, **kwargs):
load_dotenv()
if (os.getenv('DPMS_PASSWORD') != session.get('password')
or os.getenv('DPMS_USERNAME') != session.get('username')):
print('Name or password for authentication is wrong')

return f(*args, **kwargs)
return decorated_function

33 changes: 33 additions & 0 deletions src/backend/services/dp_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from flask import jsonify

from api.airflow_api import dagsExecuteById, airflow_post
from database.mongo_repo import datapipelineRunDB
from services.upload_to_s3 import download_file


def run(executionId):
dp_run = datapipelineRunDB.find_one({"executionId": executionId})

if not dp_run:
return False

response = run_airflow(executionId, dp_run['datapipelineId'], dp_run['fileId'])

if response.status_code == 200:
datapipelineRunDB.update_one({"executionId": executionId}, {'$set': { 'state': "QUEUED"}})
return True
else:
datapipelineRunDB.update_one({"executionId": executionId}, {'$set': { 'state': "FAILED"}})
return False


def run_airflow(executionId, datapipelineId, fileId):

airflow_config = {'conf': {"download_url": download_file(fileId).get("download_url"),
'executionId': executionId,
'datapipelineId': datapipelineId,
'fileId': fileId}}

response = airflow_post('dags/' + datapipelineId + '/dagRuns', airflow_config)
return response

Loading

0 comments on commit 9d715e4

Please sign in to comment.