-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AGENT-5360] Create Airflow Operator for Custom Job #86
Draft
oleksandr-saienko
wants to merge
9
commits into
main
Choose a base branch
from
oleksandr.saienko/AGENT-5360-Create-Airflow-Operator-for-Custom-Job
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 8 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
e54aa05
implemented operators for custom job
oleksandr-saienko 8c1cc99
fixes for operatos
oleksandr-saienko 2697525
files path fix
oleksandr-saienko 8a613a6
added sample dag and custom job
oleksandr-saienko cfda3ee
fixes
oleksandr-saienko 9abcb84
black fixes
oleksandr-saienko bfe39c9
lint fix
oleksandr-saienko 23dc53b
lint fix2
oleksandr-saienko 30ac340
updated version
oleksandr-saienko File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
import os | ||
from datarobot import Deployment | ||
|
||
|
||
def main(): | ||
print(f"Running python code: {__file__}") | ||
|
||
# Using this job runtime parameters | ||
print() | ||
print("Runtime parameters:") | ||
print("-------------------") | ||
string_param = os.environ.get("STRING_PARAMETER", None) | ||
print(f"string param: {string_param}") | ||
|
||
deployment_param = os.environ.get("DEPLOYMENT", None) | ||
print(f"deployment_param: {deployment_param}") | ||
|
||
model_package_param = os.environ.get("MODEL_PACKAGE", None) | ||
print(f"model_package_param: {model_package_param}") | ||
|
||
# An example of using the python client to list deployments | ||
deployments = Deployment.list() | ||
print() | ||
print("List of all deployments") | ||
print("-----------------------") | ||
for deployment in deployments: | ||
print(deployment) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
name: runtime-params | ||
|
||
runtimeParameterDefinitions: | ||
- fieldName: MODEL_PACKAGE | ||
type: modelPackage | ||
description: Model package that will be used to store key values | ||
- fieldName: DEPLOYMENT | ||
type: deployment | ||
description: Deployment that will be used to make predictions | ||
- fieldName: STRING_PARAMETER | ||
type: string | ||
description: An example of a string parameter |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
#!/bin/bash | ||
|
||
echo "Job Starting: ($0)" | ||
|
||
echo "===== Runtime Parameters ======" | ||
echo "Model Package: $MODEL_PACKAGE" | ||
echo "Deployment: $DEPLOYMENT" | ||
echo "STRING_PARAMETER: $STRING_PARAMETER" | ||
echo | ||
echo | ||
echo "===== Generic Variables ===========================" | ||
echo "CURRENT_CUSTOM_JOB_RUN_ID: $CURRENT_CUSTOM_JOB_RUN_ID" | ||
echo "CURRENT_CUSTOM_JOB_ID: $CURRENT_CUSTOM_JOB_ID" | ||
echo "DATAROBOT_ENDPOINT: $DATAROBOT_ENDPOINT" | ||
echo "DATAROBOT_API_TOKEN: Use the environment variable $DATAROBOT_API_TOKEN" | ||
echo "===================================================" | ||
|
||
echo | ||
echo "How to check how much memory your job has" | ||
memory_limit_bytes=$(cat /sys/fs/cgroup/memory/memory.limit_in_bytes) | ||
memory_limit_megabytes=$((memory_limit_bytes / 1024 / 1024)) | ||
echo "Memory Limit (in Megabytes): $memory_limit_megabytes" | ||
echo | ||
|
||
# Uncomment the following if you want to check if the job has network access | ||
## Define the IP address of an external server to ping (e.g., Google's DNS) | ||
#external_server="8.8.8.8" | ||
#echo "Checking internet connection" | ||
## Try to ping the external server | ||
#ping -c 1 $external_server > /dev/null 2>&1 | ||
# | ||
## Check the exit status of the ping command | ||
#if [ $? -eq 0 ]; then | ||
# echo "Internet connection is available." | ||
#else | ||
# echo "No internet connection." | ||
#fi | ||
#echo | ||
#echo | ||
|
||
# Run the code in job.py | ||
dir_path=$(dirname $0) | ||
echo "Entrypoint is at $dir_path - cd into it" | ||
cd $dir_path | ||
|
||
if command -v python3 &>/dev/null; then | ||
echo "python3 is installed and available." | ||
else | ||
echo "Error: python3 is not installed or not available." | ||
exit 1 | ||
fi | ||
|
||
python_file="job.py" | ||
if [ -f "$python_file" ]; then | ||
echo "Found $python_file .. running it" | ||
python3 ./job.py | ||
else | ||
echo "File $python_file does not exist" | ||
exit 1 | ||
fi |
92 changes: 92 additions & 0 deletions
92
datarobot_provider/example_dags/datarobot_custom_job_dag.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
# Copyright 2024 DataRobot, Inc. and its affiliates. | ||
# | ||
# All rights reserved. | ||
# | ||
# This is proprietary source code of DataRobot, Inc. and its affiliates. | ||
# | ||
# Released under the terms of DataRobot Tool and Utility Agreement. | ||
|
||
from datetime import datetime | ||
|
||
from airflow.decorators import dag | ||
|
||
from datarobot_provider.operators.custom_job import CreateCustomJobOperator | ||
from datarobot_provider.operators.custom_job import AddFilesToCustomJobOperator | ||
from datarobot_provider.operators.custom_job import SetCustomJobExecutionEnvironmentOperator | ||
from datarobot_provider.operators.custom_job import SetCustomJobRuntimeParametersOperator | ||
from datarobot_provider.operators.custom_job import RunCustomJobOperator | ||
from datarobot_provider.sensors.client import BaseAsyncResolutionSensor | ||
|
||
|
||
@dag( | ||
schedule=None, | ||
start_date=datetime(2023, 1, 1), | ||
tags=['example', 'custom job'], | ||
params={}, | ||
) | ||
def create_custom_custom_job(): | ||
create_custom_job_op = CreateCustomJobOperator( | ||
task_id='create_custom_job', | ||
name="airflow-test-create-custom-job-v556", | ||
description="demo-test-demonstration", | ||
) | ||
|
||
add_files_to_custom_job_op = AddFilesToCustomJobOperator( | ||
task_id='add_files_to_custom_job', | ||
custom_job_id=create_custom_job_op.output, | ||
files_path="custom_job/", | ||
) | ||
|
||
# list_execution_env_op = ListExecutionEnvironmentOperator( | ||
# task_id='list_execution_env', | ||
# search_for="Python 3.9 PyTorch Drop-In" | ||
# ) | ||
|
||
set_env_to_custom_job_op = SetCustomJobExecutionEnvironmentOperator( | ||
task_id='set_env_to_custom_job', | ||
custom_job_id=create_custom_job_op.output, | ||
environment_id='5e8c888007389fe0f466c72b', | ||
environment_version_id='65c1db901800cd9782d7ac07', | ||
) | ||
|
||
set_runtime_parameters_op = SetCustomJobRuntimeParametersOperator( | ||
task_id='set_runtime_parameters', | ||
custom_job_id=create_custom_job_op.output, | ||
runtime_parameter_values=[ | ||
{"fieldName": "DEPLOYMENT", "type": "deployment", "value": "650ef15944f21ea1a3c91a25"}, | ||
{ | ||
"fieldName": "MODEL_PACKAGE", | ||
"type": "modelPackage", | ||
"value": "654b9b228404a39b5c8da5b2", | ||
}, | ||
{"fieldName": "STRING_PARAMETER", "type": "string", "value": 'my test string'}, | ||
], | ||
) | ||
|
||
run_custom_job_op = RunCustomJobOperator( | ||
task_id='run_custom_job', | ||
custom_job_id=create_custom_job_op.output, | ||
) | ||
|
||
custom_job_complete_sensor = BaseAsyncResolutionSensor( | ||
task_id="check_custom_job_complete", | ||
job_id=run_custom_job_op.output, | ||
poke_interval=5, | ||
mode="reschedule", | ||
timeout=3600, | ||
) | ||
|
||
( | ||
create_custom_job_op | ||
>> add_files_to_custom_job_op | ||
>> set_env_to_custom_job_op | ||
>> set_runtime_parameters_op | ||
>> run_custom_job_op | ||
>> custom_job_complete_sensor | ||
) | ||
|
||
|
||
create_custom_job_dag = create_custom_custom_job() | ||
|
||
if __name__ == "__main__": | ||
create_custom_job_dag.test() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to create another kind of create operator which includes files, environment and runtime parameters all in one? This way the DAG could be simplified. Having these more specific operators makes sense, but I think a super-creator operator could be useful.