diff --git a/src/backend/api/airflow_api.py b/src/backend/api/airflow_api.py index b357f86..b8a4846 100644 --- a/src/backend/api/airflow_api.py +++ b/src/backend/api/airflow_api.py @@ -23,10 +23,7 @@ def airflow_post(url, json_object): basic = HTTPBasicAuth(os.getenv('AIRFLOW_USERNAME'), os.getenv('AIRFLOW_PASSWORD')) response = requests.post(os.getenv('AIRFLOW_SERVER_URL') + 'api/v1/' + url, json=json_object, auth=basic, headers={'content-type': 'application/json'}) - if response.status_code == 200: - return response - else: - return jsonify({'error': 'Failed to post to apache airflow'}), 500 + return response @airflow_api.route('/dags', methods=['GET']) @@ -38,13 +35,38 @@ def dags(): else: return jsonify({'error': 'Failed to trigger Airflow DAG'}), 500 -@airflow_api.route('/dags//execute', methods=['GET']) +@airflow_api.route('/dags//execute', methods=['POST']) @secure def dagsExecuteById(id): - file_name = request.args.get('parameter') - json_config = {'conf': download_file(file_name)} + data = request.json - response = airflow_post('dags/' + id + '/dagRuns', json_config) + if "datapipelineId" not in data: + return ( + jsonify({"error": "Missing datapipelineId in request."}), + 400, + ) + if "fileId" not in data: + return ( + jsonify({"error": "Missing fileId in request."}), + 400, + ) + + if "fileName" not in data: + return ( + jsonify({"error": "Missing fileName in request."}), + 400, + ) + + + airflow_config = \ + {'conf': { + "download": download_file(data['fileName']), + 'datapipelineId': id, + 'fileId': data['fileId'] + } + } + + response = airflow_post('dags/' + id + '/dagRuns', airflow_config) if response.status_code == 200: return jsonify(response.json()) else: @@ -87,15 +109,3 @@ def dagsDetailsByIdByExecutionDateByTaskInstance(id, execution_date, task_instan else: return jsonify({'error': 'Failed to trigger Airflow DAG'}), 500 - -@airflow_api.route('/inputData', methods=['POST']) -def test_input_endpoint(): - - data = request.json - print(data) - print(id) - if data: - return jsonify({'message': 'successful data transfer for: '}), 200 - else: - return jsonify({'error': 'Entity not found'}), 400 - diff --git a/src/backend/api/dp_run.py b/src/backend/api/dp_run.py new file mode 100644 index 0000000..932ac96 --- /dev/null +++ b/src/backend/api/dp_run.py @@ -0,0 +1,94 @@ +from flask import request, jsonify, Blueprint + +from database.models.dp_run import DatapipelineRun +from database.mongo_repo import datapipelineRunDB +from services.auth_service import secure +from services.dp_run import run + +dp_run = Blueprint("dp_run", __name__, template_folder="templates") + + + + +@dp_run.route("/dp_run", methods=["GET"]) +@secure +def get_all_dp_runs(): + dp_run = datapipelineRunDB.find() + + allData = [] + for d in dp_run: + allData.append({ + "executionId": d["executionId"], + "datapipelineId": d["datapipelineId"], + "fileId": d["fileId"], + "result": d["result"], + "state": d["state"], + }) + return jsonify(allData), 201 + + +@dp_run.route("/dp_run/new", methods=["POST"]) +@secure +def create_dp_run(): + data = request.json + + if "datapipelineId" not in data or "fileId" not in data: + return ( + jsonify({"error": "Missing datapipelineId or s3BucketFileId in request."}), + 400, + ) + + created_dp_run = DatapipelineRun( + data["datapipelineId"], + data["fileId"], + ) + + datapipelineRunDB.insert_one(created_dp_run.to_json()) + + return jsonify({"message": "Datapipeline dp_run is stored successfully", + "object": created_dp_run.to_json()}), 201 + +@dp_run.route("/dp_run//run", methods=["GET"]) +@secure +def run_by_id(executionId): + run_response = run(executionId) + if run_response: + return jsonify({"message": "Successfully started"}) + else: + return jsonify({"message": "Failed to start"}) + + +@dp_run.route("/dp_run/", methods=["DELETE"]) +@secure +def delete_dp_run(id): + + result = datapipelineRunDB.delete_one({"executionId": id}) + + if result.deleted_count > 0: + return jsonify({'message': 'Sucessfully deleted'}), 201 + else: + return jsonify({'error': 'Entity not found'}), 400 + +@dp_run.route('/inputData', methods=['POST']) +# @public +def input_endpoint(): + + data = request.json + + error_flag = False + if 'error' in data: + error_flag = True + if 'executionId' not in data or 'result' not in data: + return jsonify({'error': 'Missing id or result in request'}), 400 + + d = datapipelineRunDB.find_one({"executionId": data['executionId']}) + if not d: + return jsonify({'error': 'Entity not found'}), 400 + + if error_flag: + datapipelineRunDB.update_one({"executionId": data['executionId']}, {'$set': { 'state': "FAILED" }}) + else: + # TODO add to result not overwrite + datapipelineRunDB.update_one({"executionId": data['executionId']}, {'$set': { 'state': "SUCCESSFULL", 'result': data['result'] }}) + + return jsonify({'executionId': d['executionId'], 'result': d['result'], 'fileId': d['fileId'], 'datapipelineId': d['datapipelineId']}), 201 diff --git a/src/backend/app.py b/src/backend/app.py index c36540c..2afcc64 100644 --- a/src/backend/app.py +++ b/src/backend/app.py @@ -7,9 +7,9 @@ from api.airflow_api import airflow_api from api.datapipeline import datapipeline +from api.dp_run import dp_run from api.fileWP import fileWP from api.upload_api import upload_api -from api.metadata import metadata from services.auth_service import secure from flask_cors import CORS @@ -38,7 +38,7 @@ app.register_blueprint(datapipeline, url_prefix="/") app.register_blueprint(fileWP) app.register_blueprint(airflow_api) -app.register_blueprint(metadata) +app.register_blueprint(dp_run) @app.route("/") diff --git a/src/backend/database/models/dp_run.py b/src/backend/database/models/dp_run.py new file mode 100644 index 0000000..aea7234 --- /dev/null +++ b/src/backend/database/models/dp_run.py @@ -0,0 +1,32 @@ +import uuid +from datetime import datetime + +# Datapipeline State +# PENDING +# QUEUED +# FINISHED +# FAILED + + +class DatapipelineRun: + def __init__( + self, + datapipelineId, + fileId, + ): + self.executionId = str(uuid.uuid4()) + self.datapipelineId = datapipelineId + self.fileId = fileId + self.result = [] + self.create_date = datetime.now() + self.state = "PENDING" + + def to_json(self): + return { + "executionId": self.executionId, + "datapipelineId": self.datapipelineId, + "fileId": self.fileId, + "result": self.result, + "create_date": self.create_date, + "state": self.state, + } diff --git a/src/backend/database/mongo_repo.py b/src/backend/database/mongo_repo.py index ddc1bed..db0c092 100644 --- a/src/backend/database/mongo_repo.py +++ b/src/backend/database/mongo_repo.py @@ -18,3 +18,4 @@ datapipelineDB = db["datapipeline"] metadataDB = db["metadata"] s3filename = db["S3FileNames'"] +datapipelineRunDB = db["dp_run"] diff --git a/src/backend/docker-compose.yml b/src/backend/docker-compose.yml index f80ea0b..d424117 100644 --- a/src/backend/docker-compose.yml +++ b/src/backend/docker-compose.yml @@ -1,21 +1,21 @@ version: '3.8' services: - app: - environment: - - MONGODB_URL=amos_mongodb - - MONGODB_PORT=27017 - - MONGODB_USER=root - - MONGODB_PASSWORD=pass - - HOST_URL=0.0.0.0 - - HOST_PORT=8000 - build: . - volumes: - - .:/app - ports: - - "8000:8000" - command: python -u app.py - links: - - db +# app: +# environment: +# - MONGODB_URL=amos_mongodb +# - MONGODB_PORT=27017 +# - MONGODB_USER=root +# - MONGODB_PASSWORD=pass +# - HOST_URL=0.0.0.0 +# - HOST_PORT=8000 +# build: . +# volumes: +# - .:/app +# ports: +# - "8000:8000" +# command: python -u app.py +# links: +# - db db: image: mongo:latest hostname: amos_mongodb diff --git a/src/backend/services/auth_service.py b/src/backend/services/auth_service.py index 4bcad84..6380060 100644 --- a/src/backend/services/auth_service.py +++ b/src/backend/services/auth_service.py @@ -1,7 +1,7 @@ import os from functools import wraps -from flask import request, redirect, url_for, session +from flask import redirect, url_for, session from dotenv import load_dotenv @@ -18,3 +18,15 @@ def decorated_function(*args, **kwargs): return f(*args, **kwargs) return decorated_function + +def public(f): + @wraps(f) + def decorated_function(*args, **kwargs): + load_dotenv() + if (os.getenv('DPMS_PASSWORD') != session.get('password') + or os.getenv('DPMS_USERNAME') != session.get('username')): + print('Name or password for authentication is wrong') + + return f(*args, **kwargs) + return decorated_function + diff --git a/src/backend/services/dp_run.py b/src/backend/services/dp_run.py new file mode 100644 index 0000000..7b8623e --- /dev/null +++ b/src/backend/services/dp_run.py @@ -0,0 +1,33 @@ +from flask import jsonify + +from api.airflow_api import dagsExecuteById, airflow_post +from database.mongo_repo import datapipelineRunDB +from services.upload_to_s3 import download_file + + +def run(executionId): + dp_run = datapipelineRunDB.find_one({"executionId": executionId}) + + if not dp_run: + return False + + response = run_airflow(executionId, dp_run['datapipelineId'], dp_run['fileId']) + + if response.status_code == 200: + datapipelineRunDB.update_one({"executionId": executionId}, {'$set': { 'state': "QUEUED"}}) + return True + else: + datapipelineRunDB.update_one({"executionId": executionId}, {'$set': { 'state': "FAILED"}}) + return False + + +def run_airflow(executionId, datapipelineId, fileId): + + airflow_config = {'conf': {"download_url": download_file(fileId).get("download_url"), + 'executionId': executionId, + 'datapipelineId': datapipelineId, + 'fileId': fileId}} + + response = airflow_post('dags/' + datapipelineId + '/dagRuns', airflow_config) + return response + diff --git a/src/datapipeline/dags/input_output.py b/src/datapipeline/dags/input_output.py index ddea2b8..9a9160e 100644 --- a/src/datapipeline/dags/input_output.py +++ b/src/datapipeline/dags/input_output.py @@ -1,11 +1,11 @@ import requests -from airflow import DAG, HttpOperator -from datetime import datetime, timedelta +from airflow import DAG from airflow.operators.python_operator import PythonOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator import pandas as pd -from airflow.models import Variable +from datetime import datetime, timedelta +from airflow.operators.trigger_dagrun import TriggerDagRunOperator import json +import io from airflow.providers.http.operators.http import SimpleHttpOperator default_args = { @@ -16,11 +16,20 @@ def read_and_count_words(**kwargs): - download_url = Variable.get("download_url", default_var="") - if not download_url: - print("Download URL not provided.") + download_url = kwargs['dag_run'].conf.get('download_url') + executionId = kwargs['dag_run'].conf.get('executionId') + + if not executionId: + kwargs['ti'].xcom_push(key="test-identifier", value={"error": "executionId not provided."}) + print("Error download url not found") return + if not download_url : + kwargs['ti'].xcom_push(key="test-identifier", value={"error": "Download URL not provided.", "executionId": executionId }) + print("Error download url not found") + return + + # Download the file from the provided URL response = requests.get(download_url) if response.status_code == 200: @@ -28,29 +37,36 @@ def read_and_count_words(**kwargs): # Now 'file_content' contains the content of the file # Proceed with processing the file content as needed - file_reader = pd.read_csv(pd.compat.StringIO(file_content)) + file_reader = pd.read_csv(io.StringIO(file_content)) print("File read successfully") print(file_reader.head()) concatenated_text = file_reader.apply(lambda x: ' '.join(x.astype(str)), axis=1) - total_word_count = concatenated_text.str.split().str.len().sum() + print(concatenated_text) + # TODO + total_word_count = 50 #concatenated_text.str.split().str.len().sum() print(f"Total word count is {total_word_count}") + kwargs['ti'].xcom_push(key="test-identifier", value={"result": {"word_count": total_word_count}, + "executionId": executionId}) else: print(f"Failed to download file from URL: {download_url}") + kwargs['ti'].xcom_push(key="test-identifier", value={"error": "Failed to download file from URL", + "executionId": executionId}) + return dag = DAG( - dag_id="output_dag", + dag_id="input_output", default_args=default_args, - description="DAG counting words", + description="DAG to test input and output", start_date=datetime(2023, 11, 4, 2), - schedule_interval='@daily' + schedule_interval=None ) trigger_task = TriggerDagRunOperator( task_id="triggerTask", trigger_dag_id="output_dag", conf={ - 'download_url': 'https://example.com/your-file.csv' + 'Error': 'No conf given.' }, dag=dag, ) @@ -62,13 +78,15 @@ def read_and_count_words(**kwargs): dag=dag, ) + send_response = SimpleHttpOperator( task_id="sendresponse", http_conn_id="test-connection", method="POST", endpoint="inputData", - data=json.dumps({'test': 'hello this is data from http operator'}), + data=json.dumps("{{ task_instance.xcom_pull(task_ids='readAndCountWords', key='test-identifier') }}"), headers={"Content-Type": "application/json"}, + response_check=lambda response: True if response.status_code == 200 else False, dag=dag ) diff --git a/src/frontend/src/app/pages/datapipeline-run/datapipeline-run.module.ts b/src/frontend/src/app/pages/datapipeline-run/datapipeline-run.module.ts new file mode 100644 index 0000000..faeaf40 --- /dev/null +++ b/src/frontend/src/app/pages/datapipeline-run/datapipeline-run.module.ts @@ -0,0 +1,15 @@ +import { NgModule } from '@angular/core'; +import { CommonModule } from '@angular/common'; +import { ListDatapipelineRunComponent } from './pages/list-datapipeline-run/list-datapipeline-run.component'; + + + +@NgModule({ + declarations: [ + ListDatapipelineRunComponent + ], + imports: [ + CommonModule + ] +}) +export class DatapipelineRunModule { } diff --git a/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.html b/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.html new file mode 100644 index 0000000..c77903a --- /dev/null +++ b/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.html @@ -0,0 +1 @@ +

list-datapipeline-run works!

diff --git a/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.scss b/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.scss new file mode 100644 index 0000000..e69de29 diff --git a/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.spec.ts b/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.spec.ts new file mode 100644 index 0000000..7d99077 --- /dev/null +++ b/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.spec.ts @@ -0,0 +1,21 @@ +import { ComponentFixture, TestBed } from '@angular/core/testing'; + +import { ListDatapipelineRunComponent } from './list-datapipeline-run.component'; + +describe('ListDatapipelineRunComponent', () => { + let component: ListDatapipelineRunComponent; + let fixture: ComponentFixture; + + beforeEach(() => { + TestBed.configureTestingModule({ + declarations: [ListDatapipelineRunComponent] + }); + fixture = TestBed.createComponent(ListDatapipelineRunComponent); + component = fixture.componentInstance; + fixture.detectChanges(); + }); + + it('should create', () => { + expect(component).toBeTruthy(); + }); +}); diff --git a/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.ts b/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.ts new file mode 100644 index 0000000..ff20f95 --- /dev/null +++ b/src/frontend/src/app/pages/datapipeline-run/pages/list-datapipeline-run/list-datapipeline-run.component.ts @@ -0,0 +1,10 @@ +import { Component } from '@angular/core'; + +@Component({ + selector: 'app-list-datapipeline-run', + templateUrl: './list-datapipeline-run.component.html', + styleUrls: ['./list-datapipeline-run.component.scss'] +}) +export class ListDatapipelineRunComponent { + +}