Skip to content

Commit

Permalink
Merge pull request #115 from amosproj/feature/apache_airflow_input_ou…
Browse files Browse the repository at this point in the history
…tput

Feature/apache airflow input output
  • Loading branch information
Keldami authored Dec 13, 2023
2 parents 1777636 + 39bc387 commit 2be5f4f
Show file tree
Hide file tree
Showing 24 changed files with 697 additions and 47 deletions.
25 changes: 20 additions & 5 deletions src/backend/api/airflow_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ def airflow_get(url):
auth=basic, headers={'content-type': 'application/json'})
return response

def airflow_post(url, data):
def airflow_post(url, json_object):
basic = HTTPBasicAuth(os.getenv('AIRFLOW_USERNAME'), os.getenv('AIRFLOW_PASSWORD'))
response = requests.post(os.getenv('AIRFLOW_SERVER_URL') + 'api/v1/' + url, data,
response = requests.post(os.getenv('AIRFLOW_SERVER_URL') + 'api/v1/' + url, json=json_object,
auth=basic, headers={'content-type': 'application/json'})
return response
if response.status_code == 200:
return response
else:
return jsonify({'error': 'Failed to post to apache airflow'}), 500


@airflow_api.route('/dags', methods=['GET'])
Expand All @@ -39,9 +42,9 @@ def dags():
@secure
def dagsExecuteById(id):
file_name = request.args.get('parameter')
data = { "conf": download_file(file_name) }
json_config = {'conf': download_file(file_name)}

response = airflow_post('dags/' + id + '/dagRuns', json=data)
response = airflow_post('dags/' + id + '/dagRuns', json_config)
if response.status_code == 200:
return jsonify(response.json())
else:
Expand Down Expand Up @@ -84,3 +87,15 @@ def dagsDetailsByIdByExecutionDateByTaskInstance(id, execution_date, task_instan
else:
return jsonify({'error': 'Failed to trigger Airflow DAG'}), 500


@airflow_api.route('/inputData', methods=['POST'])
def test_input_endpoint():

data = request.json
print(data)
print(id)
if data:
return jsonify({'message': 'successful data transfer for: '}), 200
else:
return jsonify({'error': 'Entity not found'}), 400

134 changes: 134 additions & 0 deletions src/backend/api/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from flask import request, jsonify, Blueprint

from database.mongo_repo import metadataDB
from database.models.metadata_details import MetadataDetails
from services.auth_service import secure
from services.store_s3metadata import (
insert_all_s3files_metadata,
insert_one_s3file_metadata,
remove_s3metadata,
)


metadata = Blueprint("metadata", __name__, template_folder="templates")


@metadata.route("/datapipeline_metadata", methods=["GET"])
@secure
def get_all_datapipeline_metadatas():
metadata = metadataDB.find()

allData = []
for data in metadata:
if "datapipelineId" in data:
allData.append(
{
"datapipelineId": data["datapipelineId"],
"s3bucketfileId": data["s3bucketfileId"],
"result": data["result"],
"create_date": data["create_date"],
"state": data["state"],
"file_type": data["file_type"],
"file_size": data["file_size"],
}
)
else:
continue
return jsonify(allData), 201


@metadata.route("/s3files_metadata", methods=["GET"])
@secure
def get_all_s3files_metadatas():
metadata = metadataDB.find()
allData = []

for data in metadata:
if "Key" in data:
allData.append(
{
"key": data["Key"],
"last_modified": data["LastModified"],
"size": data["Size"],
"etag": data["ETag"],
"storage_class": data["StorageClass"],
}
)
else:
continue
return jsonify(allData), 201


@metadata.route("/metadata/datapipline_result", methods=["POST"])
@secure
def insert_file_metadata():
data = request.json

if "datapipelineId" not in data or "s3bucketfileId" not in data:
return (
jsonify({"error": "Missing datapipelineId or s3bucketfileId in request."}),
400,
)
if "create_date" not in data or "state" not in data:
return jsonify({"error": "Missing create_date or state in request."}), 400
if "file_type" not in data or "file_size" not in data:
return jsonify({"error": "Missing file_type or file_size in request."}), 400
if "result" not in data:
return jsonify({"error": "Missing result in request."})

store_metadata = MetadataDetails(
data["datapipelineId"],
data["s3bucketfileId"],
data["result"],
data["create_date"],
data["state"],
data["file_type"],
data["file_size"],
)

metadataDB.insert_one(store_metadata.to_json())
print(store_metadata.to_json())

return jsonify({"message": "Datapipeline metadata is stored successfully"}), 201


@metadata.route("/metadata/store_all_s3metadata", methods=["POST"])
@secure
def store_all_s3files_metadata():
insert_all_s3files_metadata(metadataDB)
return jsonify(
{"message": "The metadatas of files in S3 bucket are stored successfully!"}
)


@metadata.route("/metadata/store_single_s3metadata", methods=["POST"])
@secure
def store_single_s3metadata():
data = request.json
response = insert_one_s3file_metadata(metadataDB, data["file_name"])
if response != None:
return jsonify(
{"message": "The metadatas of uploaded file is stored successfully!"}
)
else:
return jsonify({"message": "There is no such a file in the S3 bucket!"})


@metadata.route("/metadata/delete_all_metadata", methods=["DELETE"])
@secure
def delete_all_metadata():
metadataDB.delete_many({})

return jsonify({"message": "All metadatas are deleted successfully"})


@metadata.route("/metadata/delete_single_s3file_metadata", methods=["DELETE"])
@secure
def delete_single_s3file_metadata():
data = request.json

response = remove_s3metadata(metadataDB, data["file_name"])
if response == None:
return jsonify({"message": "Metadata of file is not exist"})
else:
return jsonify({"message": "Metadata of file is deleted successfully"})
13 changes: 10 additions & 3 deletions src/backend/api/upload_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
from services.auth_service import secure
from services.upload_to_s3 import upload_to_s3, download_file, list_file, file_name_check, get_upload_rul


upload_api = Blueprint("upload_api", __name__, template_folder="templates")
ALLOWED_EXTENSIONS = {'csv'}


@upload_api.route('/upload_url', methods=['GET'])
@secure
def upload_url():
return jsonify(get_upload_rul())
file_name = request.args.get('fileName')
if file_name:
return jsonify(get_upload_rul(file_name))


@upload_api.route('/upload', methods=['GET', 'POST'])
@secure
Expand Down Expand Up @@ -40,7 +43,11 @@ def download():
try:
objects = list_file()
if objects:
return jsonify(objects)
files = [obj['Key'] for obj in objects]
# return render_template('list.html', files=files)
return jsonify({"files": files})
else:
return "The bucket is empty."

except Exception as e:
return jsonify({f"Error: {e}"})
Expand Down
46 changes: 27 additions & 19 deletions src/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from api.datapipeline import datapipeline
from api.fileWP import fileWP
from api.upload_api import upload_api
from api.metadata import metadata
from services.auth_service import secure
from flask_cors import CORS

Expand All @@ -17,41 +18,48 @@
# TODO get origin figured out nicely
CORS(app)
load_dotenv()
app.config.update({
'SECRET_KEY': os.getenv('OIDC_SECRET_KEY'),
'TESTING': True,
'DEBUG': True,
'OIDC_CLIENT_SECRETS': 'client_secrets.json',
'OIDC_ID_TOKEN_COOKIE_SECURE': False,
'OIDC_USER_INFO_ENABLED': True,
'OIDC_OPENID_REALM': 'master',
'OIDC_SCOPES': ['openid', 'email', 'profile'],
'OIDC_INTROSPECTION_AUTH_METHOD': 'client_secret_post',
})
app.config.update(
{
"SECRET_KEY": os.getenv("OIDC_SECRET_KEY"),
"TESTING": True,
"DEBUG": True,
"OIDC_CLIENT_SECRETS": "client_secrets.json",
"OIDC_ID_TOKEN_COOKIE_SECURE": False,
"OIDC_USER_INFO_ENABLED": True,
"OIDC_OPENID_REALM": "master",
"OIDC_SCOPES": ["openid", "email", "profile"],
"OIDC_INTROSPECTION_AUTH_METHOD": "client_secret_post",
}
)
oidc = OpenIDConnect(app)


app.register_blueprint(upload_api)
app.register_blueprint(datapipeline, url_prefix='/')
app.register_blueprint(datapipeline, url_prefix="/")
app.register_blueprint(fileWP)
app.register_blueprint(airflow_api)
app.register_blueprint(metadata)

@app.route('/')

@app.route("/")
@secure
def root():
return redirect(url_for('index'))
return redirect(url_for("index"))


@app.route('/index')
@app.route("/index")
@secure
def index():
return 'Welcome %s' % oidc.user_getfield('email')
return "Welcome %s" % oidc.user_getfield("email")

@app.route('/auth')

@app.route("/auth")
@oidc.require_login
def auth():
return redirect(url_for('index'))
return redirect(url_for("index"))


if __name__ == "__main__":
load_dotenv()
# Please do not set debug=True in production
app.run(host=os.getenv('HOST_URL'), port=int(os.getenv('HOST_PORT')), debug=True)
app.run(host=os.getenv("HOST_URL"), port=int(os.getenv("HOST_PORT")), debug=True)
29 changes: 29 additions & 0 deletions src/backend/database/models/metadata_details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
class MetadataDetails:
def __init__(
self,
datapipelineId,
s3bucketfileId,
result,
create_date,
state,
file_type,
file_size,
):
self.datapipelineId = datapipelineId
self.s3bucketfileId = s3bucketfileId
self.result = result
self.create_date = create_date
self.state = str(state)
self.file_type = file_type
self.file_size = file_size

def to_json(self):
return {
"datapipelineId": self.datapipelineId,
"s3bucketfileId": self.s3bucketfileId,
"result": self.result,
"create_date": self.create_date,
"state": str(self.state),
"file_type": self.file_type,
"file_size": self.file_size,
}
21 changes: 12 additions & 9 deletions src/backend/database/mongo_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

load_dotenv()

client = MongoClient(host=os.getenv('MONGODB_URL'),
port=int(os.getenv('MONGODB_PORT')),
username=os.getenv('MONGODB_USER'),
password=os.getenv('MONGODB_PASSWORD'),
authSource='admin')
client = MongoClient(
host=os.getenv("MONGODB_URL"),
port=int(os.getenv("MONGODB_PORT")),
username=os.getenv("MONGODB_USER"),
password=os.getenv("MONGODB_PASSWORD"),
authSource="admin",
)

db = client['dpms_db']
user = db['user']
fileWPDB = db['fileWP']
datapipelineDB = db['datapipeline']
db = client["dpms_db"]
user = db["user"]
fileWPDB = db["fileWP"]
datapipelineDB = db["datapipeline"]
metadataDB = db["metadata"]
53 changes: 53 additions & 0 deletions src/backend/services/store_s3metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
import boto3
from dotenv import load_dotenv

# from database.models.s3_detials_entity import S3ObjectDetails


# import upload_to_s3


load_dotenv()

AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
REGION = os.getenv("REGION")
BUCKET_NAME = os.getenv("BUCKET_NAME")
s3_client = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=REGION,
)


def insert_all_s3files_metadata(collection):
s3file_metadata = s3_client.list_objects_v2(Bucket=BUCKET_NAME)
s3file_metadata_contents = s3file_metadata.get("Contents", [])
response = collection.insert_many(s3file_metadata_contents)
return response


def insert_one_s3file_metadata(collection, s3_key):
s3file_metadata = s3_client.list_objects_v2(Bucket=BUCKET_NAME)
s3file_metadata_contents = s3file_metadata.get("Contents", [])
target_s3file_metadata = None
for d in s3file_metadata_contents:
if d["Key"] == s3_key:
target_s3file_metadata = d
response = collection.insert_one(target_s3file_metadata)
break
if target_s3file_metadata == None:
return None

return response


def remove_s3metadata(collection, key):
metadata = collection.find_one({"Key": key})
if metadata == None:
return None
else:
result = collection.delete_one({"Key": key})
return result
Loading

0 comments on commit 2be5f4f

Please sign in to comment.