diff --git a/src/backend/api/airflow_api.py b/src/backend/api/airflow_api.py index 88b3d24..b357f86 100644 --- a/src/backend/api/airflow_api.py +++ b/src/backend/api/airflow_api.py @@ -19,11 +19,14 @@ def airflow_get(url): auth=basic, headers={'content-type': 'application/json'}) return response -def airflow_post(url, data): +def airflow_post(url, json_object): basic = HTTPBasicAuth(os.getenv('AIRFLOW_USERNAME'), os.getenv('AIRFLOW_PASSWORD')) - response = requests.post(os.getenv('AIRFLOW_SERVER_URL') + 'api/v1/' + url, data, + response = requests.post(os.getenv('AIRFLOW_SERVER_URL') + 'api/v1/' + url, json=json_object, auth=basic, headers={'content-type': 'application/json'}) - return response + if response.status_code == 200: + return response + else: + return jsonify({'error': 'Failed to post to apache airflow'}), 500 @airflow_api.route('/dags', methods=['GET']) @@ -39,9 +42,9 @@ def dags(): @secure def dagsExecuteById(id): file_name = request.args.get('parameter') - data = { "conf": download_file(file_name) } + json_config = {'conf': download_file(file_name)} - response = airflow_post('dags/' + id + '/dagRuns', json=data) + response = airflow_post('dags/' + id + '/dagRuns', json_config) if response.status_code == 200: return jsonify(response.json()) else: @@ -84,3 +87,15 @@ def dagsDetailsByIdByExecutionDateByTaskInstance(id, execution_date, task_instan else: return jsonify({'error': 'Failed to trigger Airflow DAG'}), 500 + +@airflow_api.route('/inputData', methods=['POST']) +def test_input_endpoint(): + + data = request.json + print(data) + print(id) + if data: + return jsonify({'message': 'successful data transfer for: '}), 200 + else: + return jsonify({'error': 'Entity not found'}), 400 + diff --git a/src/backend/api/metadata.py b/src/backend/api/metadata.py new file mode 100644 index 0000000..011a787 --- /dev/null +++ b/src/backend/api/metadata.py @@ -0,0 +1,134 @@ +from flask import request, jsonify, Blueprint + +from database.mongo_repo import metadataDB +from database.models.metadata_details import MetadataDetails +from services.auth_service import secure +from services.store_s3metadata import ( + insert_all_s3files_metadata, + insert_one_s3file_metadata, + remove_s3metadata, +) + + +metadata = Blueprint("metadata", __name__, template_folder="templates") + + +@metadata.route("/datapipeline_metadata", methods=["GET"]) +@secure +def get_all_datapipeline_metadatas(): + metadata = metadataDB.find() + + allData = [] + for data in metadata: + if "datapipelineId" in data: + allData.append( + { + "datapipelineId": data["datapipelineId"], + "s3bucketfileId": data["s3bucketfileId"], + "result": data["result"], + "create_date": data["create_date"], + "state": data["state"], + "file_type": data["file_type"], + "file_size": data["file_size"], + } + ) + else: + continue + return jsonify(allData), 201 + + +@metadata.route("/s3files_metadata", methods=["GET"]) +@secure +def get_all_s3files_metadatas(): + metadata = metadataDB.find() + allData = [] + + for data in metadata: + if "Key" in data: + allData.append( + { + "key": data["Key"], + "last_modified": data["LastModified"], + "size": data["Size"], + "etag": data["ETag"], + "storage_class": data["StorageClass"], + } + ) + else: + continue + return jsonify(allData), 201 + + +@metadata.route("/metadata/datapipline_result", methods=["POST"]) +@secure +def insert_file_metadata(): + data = request.json + + if "datapipelineId" not in data or "s3bucketfileId" not in data: + return ( + jsonify({"error": "Missing datapipelineId or s3bucketfileId in request."}), + 400, + ) + if "create_date" not in data or "state" not in data: + return jsonify({"error": "Missing create_date or state in request."}), 400 + if "file_type" not in data or "file_size" not in data: + return jsonify({"error": "Missing file_type or file_size in request."}), 400 + if "result" not in data: + return jsonify({"error": "Missing result in request."}) + + store_metadata = MetadataDetails( + data["datapipelineId"], + data["s3bucketfileId"], + data["result"], + data["create_date"], + data["state"], + data["file_type"], + data["file_size"], + ) + + metadataDB.insert_one(store_metadata.to_json()) + print(store_metadata.to_json()) + + return jsonify({"message": "Datapipeline metadata is stored successfully"}), 201 + + +@metadata.route("/metadata/store_all_s3metadata", methods=["POST"]) +@secure +def store_all_s3files_metadata(): + insert_all_s3files_metadata(metadataDB) + return jsonify( + {"message": "The metadatas of files in S3 bucket are stored successfully!"} + ) + + +@metadata.route("/metadata/store_single_s3metadata", methods=["POST"]) +@secure +def store_single_s3metadata(): + data = request.json + response = insert_one_s3file_metadata(metadataDB, data["file_name"]) + if response != None: + return jsonify( + {"message": "The metadatas of uploaded file is stored successfully!"} + ) + else: + return jsonify({"message": "There is no such a file in the S3 bucket!"}) + + +@metadata.route("/metadata/delete_all_metadata", methods=["DELETE"]) +@secure +def delete_all_metadata(): + metadataDB.delete_many({}) + + return jsonify({"message": "All metadatas are deleted successfully"}) + + +@metadata.route("/metadata/delete_single_s3file_metadata", methods=["DELETE"]) +@secure +def delete_single_s3file_metadata(): + data = request.json + + response = remove_s3metadata(metadataDB, data["file_name"]) + if response == None: + return jsonify({"message": "Metadata of file is not exist"}) + else: + return jsonify({"message": "Metadata of file is deleted successfully"}) diff --git a/src/backend/api/upload_api.py b/src/backend/api/upload_api.py index de0caa3..b0c679c 100644 --- a/src/backend/api/upload_api.py +++ b/src/backend/api/upload_api.py @@ -4,14 +4,17 @@ from services.auth_service import secure from services.upload_to_s3 import upload_to_s3, download_file, list_file, file_name_check, get_upload_rul - upload_api = Blueprint("upload_api", __name__, template_folder="templates") ALLOWED_EXTENSIONS = {'csv'} + @upload_api.route('/upload_url', methods=['GET']) @secure def upload_url(): - return jsonify(get_upload_rul()) + file_name = request.args.get('fileName') + if file_name: + return jsonify(get_upload_rul(file_name)) + @upload_api.route('/upload', methods=['GET', 'POST']) @secure @@ -40,7 +43,11 @@ def download(): try: objects = list_file() if objects: - return jsonify(objects) + files = [obj['Key'] for obj in objects] + # return render_template('list.html', files=files) + return jsonify({"files": files}) + else: + return "The bucket is empty." except Exception as e: return jsonify({f"Error: {e}"}) diff --git a/src/backend/app.py b/src/backend/app.py index 408b620..c36540c 100644 --- a/src/backend/app.py +++ b/src/backend/app.py @@ -9,6 +9,7 @@ from api.datapipeline import datapipeline from api.fileWP import fileWP from api.upload_api import upload_api +from api.metadata import metadata from services.auth_service import secure from flask_cors import CORS @@ -17,41 +18,48 @@ # TODO get origin figured out nicely CORS(app) load_dotenv() -app.config.update({ - 'SECRET_KEY': os.getenv('OIDC_SECRET_KEY'), - 'TESTING': True, - 'DEBUG': True, - 'OIDC_CLIENT_SECRETS': 'client_secrets.json', - 'OIDC_ID_TOKEN_COOKIE_SECURE': False, - 'OIDC_USER_INFO_ENABLED': True, - 'OIDC_OPENID_REALM': 'master', - 'OIDC_SCOPES': ['openid', 'email', 'profile'], - 'OIDC_INTROSPECTION_AUTH_METHOD': 'client_secret_post', -}) +app.config.update( + { + "SECRET_KEY": os.getenv("OIDC_SECRET_KEY"), + "TESTING": True, + "DEBUG": True, + "OIDC_CLIENT_SECRETS": "client_secrets.json", + "OIDC_ID_TOKEN_COOKIE_SECURE": False, + "OIDC_USER_INFO_ENABLED": True, + "OIDC_OPENID_REALM": "master", + "OIDC_SCOPES": ["openid", "email", "profile"], + "OIDC_INTROSPECTION_AUTH_METHOD": "client_secret_post", + } +) oidc = OpenIDConnect(app) app.register_blueprint(upload_api) -app.register_blueprint(datapipeline, url_prefix='/') +app.register_blueprint(datapipeline, url_prefix="/") app.register_blueprint(fileWP) app.register_blueprint(airflow_api) +app.register_blueprint(metadata) -@app.route('/') + +@app.route("/") @secure def root(): - return redirect(url_for('index')) + return redirect(url_for("index")) + -@app.route('/index') +@app.route("/index") @secure def index(): - return 'Welcome %s' % oidc.user_getfield('email') + return "Welcome %s" % oidc.user_getfield("email") -@app.route('/auth') + +@app.route("/auth") @oidc.require_login def auth(): - return redirect(url_for('index')) + return redirect(url_for("index")) + if __name__ == "__main__": load_dotenv() # Please do not set debug=True in production - app.run(host=os.getenv('HOST_URL'), port=int(os.getenv('HOST_PORT')), debug=True) + app.run(host=os.getenv("HOST_URL"), port=int(os.getenv("HOST_PORT")), debug=True) diff --git a/src/backend/database/models/metadata_details.py b/src/backend/database/models/metadata_details.py new file mode 100644 index 0000000..f48bcd2 --- /dev/null +++ b/src/backend/database/models/metadata_details.py @@ -0,0 +1,29 @@ +class MetadataDetails: + def __init__( + self, + datapipelineId, + s3bucketfileId, + result, + create_date, + state, + file_type, + file_size, + ): + self.datapipelineId = datapipelineId + self.s3bucketfileId = s3bucketfileId + self.result = result + self.create_date = create_date + self.state = str(state) + self.file_type = file_type + self.file_size = file_size + + def to_json(self): + return { + "datapipelineId": self.datapipelineId, + "s3bucketfileId": self.s3bucketfileId, + "result": self.result, + "create_date": self.create_date, + "state": str(self.state), + "file_type": self.file_type, + "file_size": self.file_size, + } diff --git a/src/backend/database/mongo_repo.py b/src/backend/database/mongo_repo.py index 27dc55c..8165778 100644 --- a/src/backend/database/mongo_repo.py +++ b/src/backend/database/mongo_repo.py @@ -4,13 +4,16 @@ load_dotenv() -client = MongoClient(host=os.getenv('MONGODB_URL'), - port=int(os.getenv('MONGODB_PORT')), - username=os.getenv('MONGODB_USER'), - password=os.getenv('MONGODB_PASSWORD'), - authSource='admin') +client = MongoClient( + host=os.getenv("MONGODB_URL"), + port=int(os.getenv("MONGODB_PORT")), + username=os.getenv("MONGODB_USER"), + password=os.getenv("MONGODB_PASSWORD"), + authSource="admin", +) -db = client['dpms_db'] -user = db['user'] -fileWPDB = db['fileWP'] -datapipelineDB = db['datapipeline'] +db = client["dpms_db"] +user = db["user"] +fileWPDB = db["fileWP"] +datapipelineDB = db["datapipeline"] +metadataDB = db["metadata"] diff --git a/src/backend/services/store_s3metadata.py b/src/backend/services/store_s3metadata.py new file mode 100644 index 0000000..cc8da26 --- /dev/null +++ b/src/backend/services/store_s3metadata.py @@ -0,0 +1,53 @@ +import os +import boto3 +from dotenv import load_dotenv + +# from database.models.s3_detials_entity import S3ObjectDetails + + +# import upload_to_s3 + + +load_dotenv() + +AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY") +AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY") +REGION = os.getenv("REGION") +BUCKET_NAME = os.getenv("BUCKET_NAME") +s3_client = boto3.client( + "s3", + aws_access_key_id=AWS_ACCESS_KEY, + aws_secret_access_key=AWS_SECRET_KEY, + region_name=REGION, +) + + +def insert_all_s3files_metadata(collection): + s3file_metadata = s3_client.list_objects_v2(Bucket=BUCKET_NAME) + s3file_metadata_contents = s3file_metadata.get("Contents", []) + response = collection.insert_many(s3file_metadata_contents) + return response + + +def insert_one_s3file_metadata(collection, s3_key): + s3file_metadata = s3_client.list_objects_v2(Bucket=BUCKET_NAME) + s3file_metadata_contents = s3file_metadata.get("Contents", []) + target_s3file_metadata = None + for d in s3file_metadata_contents: + if d["Key"] == s3_key: + target_s3file_metadata = d + response = collection.insert_one(target_s3file_metadata) + break + if target_s3file_metadata == None: + return None + + return response + + +def remove_s3metadata(collection, key): + metadata = collection.find_one({"Key": key}) + if metadata == None: + return None + else: + result = collection.delete_one({"Key": key}) + return result diff --git a/src/backend/services/upload_to_s3.py b/src/backend/services/upload_to_s3.py index 9dbdbf4..2e6549d 100644 --- a/src/backend/services/upload_to_s3.py +++ b/src/backend/services/upload_to_s3.py @@ -32,18 +32,17 @@ def upload_to_s3(path, s3_key): return False -def get_upload_rul(): +def get_upload_rul(file_name): try: key = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(10)) s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION) - url = s3.generate_presigned_post(Bucket=BUCKET_NAME, Key=key, ExpiresIn=3600) + url = s3.generate_presigned_url('put_object', Params={'Bucket': BUCKET_NAME, 'Key': file_name}, ExpiresIn=3600) return url except Exception as e: print(f"Error: {e}") - def get_file_details(path, bucket_name, s3_key): # Get details of a specific file try: diff --git a/src/datapipeline/dags/input_output.py b/src/datapipeline/dags/input_output.py new file mode 100644 index 0000000..ddea2b8 --- /dev/null +++ b/src/datapipeline/dags/input_output.py @@ -0,0 +1,75 @@ +import requests +from airflow import DAG, HttpOperator +from datetime import datetime, timedelta +from airflow.operators.python_operator import PythonOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +import pandas as pd +from airflow.models import Variable +import json +from airflow.providers.http.operators.http import SimpleHttpOperator + +default_args = { + 'owner': 'IAV', + 'retries': 5, + 'retry_delay': timedelta(minutes=2) +} + + +def read_and_count_words(**kwargs): + download_url = Variable.get("download_url", default_var="") + if not download_url: + print("Download URL not provided.") + return + + # Download the file from the provided URL + response = requests.get(download_url) + if response.status_code == 200: + file_content = response.text + + # Now 'file_content' contains the content of the file + # Proceed with processing the file content as needed + file_reader = pd.read_csv(pd.compat.StringIO(file_content)) + print("File read successfully") + print(file_reader.head()) + concatenated_text = file_reader.apply(lambda x: ' '.join(x.astype(str)), axis=1) + total_word_count = concatenated_text.str.split().str.len().sum() + print(f"Total word count is {total_word_count}") + else: + print(f"Failed to download file from URL: {download_url}") + + +dag = DAG( + dag_id="output_dag", + default_args=default_args, + description="DAG counting words", + start_date=datetime(2023, 11, 4, 2), + schedule_interval='@daily' +) + +trigger_task = TriggerDagRunOperator( + task_id="triggerTask", + trigger_dag_id="output_dag", + conf={ + 'download_url': 'https://example.com/your-file.csv' + }, + dag=dag, +) + +task_read_and_count_words = PythonOperator( + task_id="readAndCountWords", + python_callable=read_and_count_words, + provide_context=True, + dag=dag, +) + +send_response = SimpleHttpOperator( + task_id="sendresponse", + http_conn_id="test-connection", + method="POST", + endpoint="inputData", + data=json.dumps({'test': 'hello this is data from http operator'}), + headers={"Content-Type": "application/json"}, + dag=dag +) + +trigger_task >> task_read_and_count_words >> send_response \ No newline at end of file diff --git a/src/datapipeline/dags/output_dag.py b/src/datapipeline/dags/output_dag.py new file mode 100644 index 0000000..da6bb61 --- /dev/null +++ b/src/datapipeline/dags/output_dag.py @@ -0,0 +1,39 @@ +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +import pandas as pd +from airflow.models import Variable +import json +from airflow.providers.http.operators.http import SimpleHttpOperator + +default_args={ + 'owner': 'IAV', + 'retries':5, + 'retry_delay':timedelta(minutes=2) +} + +dag = DAG( + dag_id="output_dag", + default_args=default_args, + description="DAG counting words", + start_date=datetime(2023, 11, 4, 2), + schedule_interval='@daily' +) + +trigger_task = TriggerDagRunOperator( + task_id="triggerTask", + trigger_dag_id="output_dag", + conf={ + 'download_url': 'https://example.com/your-file.csv' + }, + dag=dag, +) +send_response = SimpleHttpOperator( + task_id="sendresponse", + http_conn_id="test-connection", + method="POST", + endpoint="inputData", + data=json.dumps({'test': 'hello this is data from http operator'}), + headers={"Content-Type": "application/json"}, + dag=dag +) + +trigger_task >> send_response \ No newline at end of file diff --git a/src/frontend/src/app/app-routing.module.ts b/src/frontend/src/app/app-routing.module.ts index 2e99f03..b0b211c 100644 --- a/src/frontend/src/app/app-routing.module.ts +++ b/src/frontend/src/app/app-routing.module.ts @@ -5,8 +5,8 @@ import { UploadFileComponent } from './modules/upload-file/upload-file.component import { DownloadComponent } from './modules/download/download.component'; import {ListDatapipelineComponent} from "./pages/datapipeline/pages/list-datapipeline/list-datapipeline.component"; import {EditDatapipelineComponent} from "./pages/datapipeline/pages/edit-datapipeline/edit-datapipeline.component"; -import { ListS3bucketfilesComponent } from './pages/s3bucketfiles/list-s3bucketfiles/list-s3bucketfiles.component'; -import { CreateDatapipelineComponent } from './pages/datapipeline/pages/create-datapipeline/create-datapipeline.component'; +import {S3UploadFilesComponent} from './pages/s3-upload-files/s3-upload-files.component'; +import { startDataPipelineComponent } from './pages/start-data-pipeline/start-data-pipeline.component'; const routes: Routes = [ { path: '', redirectTo: '/home', pathMatch: 'full' }, @@ -14,10 +14,10 @@ const routes: Routes = [ { path: 'upload', component: UploadFileComponent }, { path: 'download', component: DownloadComponent }, { path:'datapipeline',component: ListDatapipelineComponent}, - { path:'datapipeline/new',component: CreateDatapipelineComponent}, + { path:'datapipeline/new',component: EditDatapipelineComponent}, { path:'datapipeline/:id',component: EditDatapipelineComponent}, - { path: 's3list', component: ListS3bucketfilesComponent }, - {path:'newdatapipeline',component:CreateDatapipelineComponent}, + { path:'s3upload',component: S3UploadFilesComponent}, + { path:'startpipeline',component: startDataPipelineComponent}, // TODO // { path: '**', component: PageNotFoundComponent } ]; diff --git a/src/frontend/src/app/app.module.ts b/src/frontend/src/app/app.module.ts index b5dd5d1..a771297 100644 --- a/src/frontend/src/app/app.module.ts +++ b/src/frontend/src/app/app.module.ts @@ -12,7 +12,10 @@ import { DownloadComponent } from './modules/download/download.component'; import { DataTablesModule } from "angular-datatables"; import {DatapipelineModule} from "./pages/datapipeline/datapipeline.module"; import { NgbModule } from '@ng-bootstrap/ng-bootstrap'; -import { ListS3bucketfilesComponent } from './pages/s3bucketfiles/list-s3bucketfiles/list-s3bucketfiles.component'; +import { FormsModule } from '@angular/forms'; +import { S3UploadFilesComponent } from './pages/s3-upload-files/s3-upload-files.component'; +import { startDataPipelineComponent } from './pages/start-data-pipeline/start-data-pipeline.component'; + @NgModule({ @@ -23,7 +26,8 @@ import { ListS3bucketfilesComponent } from './pages/s3bucketfiles/list-s3bucketf SideBarComponent, UploadFileComponent, DownloadComponent, - ListS3bucketfilesComponent, + S3UploadFilesComponent, + startDataPipelineComponent, ], imports: [ BrowserModule, @@ -32,7 +36,8 @@ import { ListS3bucketfilesComponent } from './pages/s3bucketfiles/list-s3bucketf HttpClientModule, DataTablesModule, DatapipelineModule, - NgbModule + NgbModule, + FormsModule ], providers: [], bootstrap: [AppComponent], diff --git a/src/frontend/src/app/core/services/lists3bucket/lists3bucket.service.specs.ts b/src/frontend/src/app/core/services/lists3bucket/lists3bucket.service.specs.ts new file mode 100644 index 0000000..e69de29 diff --git a/src/frontend/src/app/core/services/lists3bucket/lists3bucket.service.ts b/src/frontend/src/app/core/services/lists3bucket/lists3bucket.service.ts new file mode 100644 index 0000000..1ddc68a --- /dev/null +++ b/src/frontend/src/app/core/services/lists3bucket/lists3bucket.service.ts @@ -0,0 +1,29 @@ +import { Injectable } from '@angular/core'; +import {HttpClient} from "@angular/common/http"; +import { Observable } from 'rxjs'; + +@Injectable({ + providedIn: 'root', +}) +export class BackendService { + private baseUrl = 'http://127.0.0.1:5000'; + + constructor(private http: HttpClient) {} + + getS3Files(): Observable<{ files: string[] }> { + const url = `${this.baseUrl}/download`; + return this.http.get<{ files: string[] }>(url); + } + + getAvailablePipelines(): Observable { + const url = `${this.baseUrl}/datapipelines`; + return this.http.get(url); + } + + startPipeline(s3File: string, pipelineId: string): Observable { + const url = `${this.baseUrl}/startPipeline`; + const body = { s3File, pipelineId }; + return this.http.post(url, body); + } +} + diff --git a/src/frontend/src/app/core/services/s3-file-upload.service.spec.ts b/src/frontend/src/app/core/services/s3-file-upload.service.spec.ts new file mode 100644 index 0000000..5fb01da --- /dev/null +++ b/src/frontend/src/app/core/services/s3-file-upload.service.spec.ts @@ -0,0 +1,16 @@ +import { TestBed } from '@angular/core/testing'; + +import { S3FileUploadService } from './s3-file-upload.service'; + +describe('S3FileUploadService', () => { + let service: S3FileUploadService; + + beforeEach(() => { + TestBed.configureTestingModule({}); + service = TestBed.inject(S3FileUploadService); + }); + + it('should be created', () => { + expect(service).toBeTruthy(); + }); +}); diff --git a/src/frontend/src/app/core/services/s3-file-upload.service.ts b/src/frontend/src/app/core/services/s3-file-upload.service.ts new file mode 100644 index 0000000..ba3d7dd --- /dev/null +++ b/src/frontend/src/app/core/services/s3-file-upload.service.ts @@ -0,0 +1,34 @@ +import { HttpClient } from '@angular/common/http'; +import { Injectable } from '@angular/core'; +import { Observable } from 'rxjs/internal/Observable'; + +@Injectable({ + providedIn: 'root' +}) +export class S3FileUploadService { + + private backendUrl = "http://192.168.1.41:5000/"; // actual backend URL + + constructor(private http: HttpClient) { } + + uploadCsv(file: File): FormData { + const formData = new FormData(); + formData.append('file', file); + return formData; + } + + uploadFileToS3(formData: FormData): Observable { + return this.http.post(`${this.backendUrl}/upload`, formData); + } + + getPresignedUrl(fileName: string): Observable { + return this.http.get(`${this.backendUrl}upload_url?fileName=${fileName}`); + } + + uploadFileToS3Presigned(presignedUrl: string, formData: FormData): Observable { + return this.http.put(presignedUrl, formData); + + } +} + + diff --git a/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.html b/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.html new file mode 100644 index 0000000..6e0162a --- /dev/null +++ b/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.html @@ -0,0 +1,6 @@ +

s3-upload-files!

+
{{ successMessage }}
+ + + + diff --git a/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.scss b/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.scss new file mode 100644 index 0000000..e69de29 diff --git a/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.spec.ts b/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.spec.ts new file mode 100644 index 0000000..57431b5 --- /dev/null +++ b/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.spec.ts @@ -0,0 +1,21 @@ +import { ComponentFixture, TestBed } from '@angular/core/testing'; + +import { S3UploadFilesComponent } from './s3-upload-files.component'; + +describe('S3UploadFilesComponent', () => { + let component: S3UploadFilesComponent; + let fixture: ComponentFixture; + + beforeEach(() => { + TestBed.configureTestingModule({ + declarations: [S3UploadFilesComponent] + }); + fixture = TestBed.createComponent(S3UploadFilesComponent); + component = fixture.componentInstance; + fixture.detectChanges(); + }); + + it('should create', () => { + expect(component).toBeTruthy(); + }); +}); diff --git a/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.ts b/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.ts new file mode 100644 index 0000000..006be38 --- /dev/null +++ b/src/frontend/src/app/pages/s3-upload-files/s3-upload-files.component.ts @@ -0,0 +1,81 @@ +import { Component } from '@angular/core'; +import { S3FileUploadService } from 'src/app/core/services/s3-file-upload.service'; + +@Component({ + selector: 'app-s3-upload-files', + templateUrl: './s3-upload-files.component.html', + styleUrls: ['./s3-upload-files.component.scss'] +}) +export class S3UploadFilesComponent { + private selectedFile!: File; + public successMessage!: string; + + constructor(private fileUploadService: S3FileUploadService) { } + + selectFile() { + const fileInput: HTMLInputElement | null = document.querySelector('input[type="file"]'); + + if (fileInput !== null) { + fileInput.click(); + + fileInput.addEventListener('change', (event) => { + const selectedFile: File | undefined = fileInput.files?.[0]; + + if (selectedFile !== undefined) { + this.selectedFile = selectedFile; + } + }); + } + } + + + uploadFile() { + if (!this.selectedFile) { + return; + } + + const formData = this.fileUploadService.uploadCsv(this.selectedFile); + + this.fileUploadService.uploadFileToS3(formData).subscribe( + response => { + this.successMessage = 'File uploaded successfully!'; + console.log(response); + }, + error => { + console.error('Error uploading file:', error); + } + ); + } + + + + + uploadFileWithUrl() { + if (!this.selectedFile) { + return; + } + + const formData = this.fileUploadService.uploadCsv(this.selectedFile); + + this.fileUploadService.getPresignedUrl(this.selectedFile.name).subscribe( + (presignedUrl) => { + this.uploadToPresignedUrl(presignedUrl, formData); + }, + (error) => { + console.error('Error getting presigned URL:', error); + } + ); + } + + private uploadToPresignedUrl(presignedUrl: string, formData: FormData): void { + this.fileUploadService.uploadFileToS3Presigned(presignedUrl, formData).subscribe( + (response) => { + this.successMessage = 'File uploaded successfully!'; + console.log(response); + }, + (error) => { + console.error('Error uploading file:', error); + } + ); + } +} \ No newline at end of file diff --git a/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.html b/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.html new file mode 100644 index 0000000..654ffc1 --- /dev/null +++ b/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.html @@ -0,0 +1,25 @@ +
+

Start Pipeline

+ + + + + +

Select S3 Files

+ + +
+ + +
+ +
+

Available Pipelines:

+
    +
  • {{ pipeline.name }}
  • +
+
+
+ diff --git a/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.scss b/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.scss new file mode 100644 index 0000000..e69de29 diff --git a/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.specs.ts b/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.specs.ts new file mode 100644 index 0000000..8270411 --- /dev/null +++ b/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.specs.ts @@ -0,0 +1,21 @@ +import { ComponentFixture, TestBed } from '@angular/core/testing'; + +import { PipelineComponent } from './start-data-pipeline.component'; + +describe('PipelineComponent', () => { + let component: PipelineComponent; + let fixture: ComponentFixture; + + beforeEach(() => { + TestBed.configureTestingModule({ + declarations: [PipelineComponent] + }); + fixture = TestBed.createComponent(PipelineComponent); + component = fixture.componentInstance; + fixture.detectChanges(); + }); + + it('should create', () => { + expect(component).toBeTruthy(); + }); +}); diff --git a/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.ts b/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.ts new file mode 100644 index 0000000..6fa83fd --- /dev/null +++ b/src/frontend/src/app/pages/start-data-pipeline/start-data-pipeline.component.ts @@ -0,0 +1,50 @@ +import { Component } from '@angular/core'; +import { Observable } from 'rxjs/internal/Observable'; +import { HttpClient } from '@angular/common/http'; +import { BackendService } from 'src/app/core/services/lists3bucket/lists3bucket.service'; + + +@Component({ + selector: 'app-start-data-pipeline', + templateUrl: './start-data-pipeline.component.html', + styleUrls: ['./start-data-pipeline.component.scss'] +}) + +export class startDataPipelineComponent { + s3Files: string[] = []; + pipelines: any[] | undefined; + selectedS3File: string | undefined; + selectedPipeline: any | undefined; + + constructor(private backendService: BackendService) {} + + getS3Files() { + this.backendService.getS3Files().subscribe( + (response: { files: string[] }) => { + console.log('API Response:', response); + // ... rest of the code + this.s3Files = response.files || []; + }, + (error) => { + console.error('Error fetching S3 files:', error); + } + ); + } + + + + getAvailablePipelines() { + this.backendService.getAvailablePipelines().subscribe((pipelines) => { + this.pipelines = pipelines; + }); + } + + startPipeline() { + if (this.selectedS3File && this.selectedPipeline) { + const pipelineId = this.selectedPipeline.id; // Adjust this based on your data structure + this.backendService.startPipeline(this.selectedS3File, pipelineId).subscribe(() => { + console.log('Pipeline started successfully!'); + }); + } + } +}