Skip to content

dispatch job

dispatch job #58

Workflow file for this run

name: dispatch job
on:
workflow_dispatch:
inputs:
repo:
description: 'The https github url for the recipe feedstock'
required: true
ref:
description: 'The tag or branch to target in your recipe repo'
required: true
default: 'main'
feedstock_subdir:
description: 'The subdir of the feedstock directory in the repo'
required: true
default: 'feedstock'
bucket:
description: 'This job runner leverages s3fs.S3FileSystem for your recipe cache and output. Choices currently are: "default"'
required: true
default: 'default'
prune:
description: 'Only run the first two time steps'
required: true
default: '0'
parallelism:
description: 'Number of task managers to spin up'
required: true
default: '1'
jobs:
name-job:
runs-on: ubuntu-latest
outputs:
repo_name: ${{ steps.string_manipulation.outputs.result }}
steps:
- name: manipuluate strings
id: string_manipulation
run: |
repo_name=$(basename -s .git "${{ github.event.inputs.repo }}")
echo "result=$repo_name" >> $GITHUB_OUTPUT
run-job:
name: kickoff job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }}
needs: name-job
outputs:
job_name: ${{ steps.report_ids.outputs.job_name }}
job_id: ${{ steps.report_ids.outputs.job_id }}
runs-on: ubuntu-latest
steps:
- name: checkout repository
uses: actions/checkout@v3
- name: set up python 3.10
uses: actions/setup-python@v3
with:
python-version: '3.10'
- name: echo server
run: |
echo "Manually triggered workflow: \
${{ github.event.inputs.repo }} \
${{ github.event.inputs.ref }} \
${{ github.event.inputs.bucket }} \
${{ github.event.inputs.parallelism }} \
${{ github.event.inputs.prune }}"
- name: install deps
run: |
# TODO: move to requirements file
python -m pip install --upgrade pip
pip install \
fsspec \
s3fs \
boto3 \
apache-beam==2.52.0 \
pangeo-forge-recipes>=0.10.0 \
pangeo-forge-runner>=0.9.1 \
python-cmr==0.9.0
- name: set up aws credentials for job runner user
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.GH_ACTIONS_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.GH_ACTIONS_AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ secrets.GH_ACTIONS_AWS_REGION }}
- name: install kubectl
run: |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x ./kubectl
sudo mv ./kubectl /usr/local/bin/kubectl
- name: update kubeconfig with cluster
run: |
aws eks update-kubeconfig --name pangeo-forge-v3 --region ${{ secrets.GH_ACTIONS_AWS_REGION }}
- name: execute recipe on k8s cluster
id: executejob
continue-on-error: true
run: |
# NOTE: we can't use `2>&1 | tee execute.log` b/c it hangs forever
# so if the command fails (for example b/c it doesn't have the right requirements)
# then we wont' be able to see the errors until we run it without redirecting output
pangeo-forge-runner \
bake \
--repo=${{ github.event.inputs.repo }} \
--ref=${{ github.event.inputs.ref }} \
-f .github/workflows/config.py > execute.log
# export all the valuable information from the logs
JOB_NAME=$(cat execute.log | grep -oP 'flinkdeployment\.flink\.apache\.org/\K[^ ]+' | head -n1)
echo "JOB_NAME=$JOB_NAME" >> $GITHUB_ENV
JOB_ID=$(cat execute.log | grep -oP 'Started Flink job as \K[^ ]+')
echo "JOB_ID=$JOB_ID" >> $GITHUB_ENV
FLINK_DASH=$(cat execute.log | grep -oP "You can run '\K[^']+(?=')")
echo "FLINK_DASH=$FLINK_DASH" >> $GITHUB_ENV
env:
REPO: ${{ github.event.inputs.repo }}
REF: ${{ github.event.inputs.ref }}
FEEDSTOCK_SUBDIR: ${{ github.event.inputs.feedstock_subdir }}
PRUNE_OPTION: ${{ github.event.inputs.prune }}
PARALLELISM_OPTION: ${{ github.event.inputs.parallelism }}
S3_BUCKET: ${{ github.event.inputs.bucket }}
S3_DEFAULT_AWS_ACCESS_KEY_ID: ${{ secrets.S3_DEFAULT_AWS_ACCESS_KEY_ID }}
S3_DEFAULT_AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_DEFAULT_AWS_SECRET_ACCESS_KEY }}
- name: cleanup if "pangeo-forge-runner bake" failed
if: steps.executejob.outcome == 'failure'
run: |
echo "The previous 'bake' command failed or timed out. Running cleanup logic..."
# much easier to do in bash than in Python via subprocess
echo "##################### OPERATOR ######################"
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
echo "##################### JOB MANAGER ######################"
kubectl get pod | grep -v manager | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
# delete the flinkdeployment so we don't have old failures hanging around
kubectl get flinkdeployment --no-headers | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{}
# force GH action to show failed result
exit 128
- name: report running job id for user
id: report_ids
run: |
# TODO: we also need to report historyserver URL and flink dashboard URL
# but this also requires us to think how we're going to have a thin
# layer of authentication around these services so they aren't totally public
echo '############ JOB NAME ################'
echo $JOB_NAME
echo "job_name=$JOB_NAME" >> $GITHUB_OUTPUT
echo '############ JOB ID ################'
echo $JOB_ID
echo "job_id=$JOB_ID" >> $GITHUB_OUTPUT
echo '############ FLINK DASHBOARD ################'
echo $FLINK_DASH
echo "flink_dash=$FLINK_DASH" >> $GITHUB_OUTPUT
monitor-job:
runs-on: ubuntu-latest
name: monitor job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }}
needs: [name-job, run-job]
steps:
- name: set up aws credentials for job runner user
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.GH_ACTIONS_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.GH_ACTIONS_AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ secrets.GH_ACTIONS_AWS_REGION }}
- name: install kubectl
run: |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x ./kubectl
sudo mv ./kubectl /usr/local/bin/kubectl
- name: update kubeconfig with cluster
run: |
aws eks update-kubeconfig --name pangeo-forge-v3 --region ${{ secrets.GH_ACTIONS_AWS_REGION }}
- name: monitor logs of job manager
id: monitorjob
timeout-minutes: 120
continue-on-error: true
run: |
# TODO: this needs to not check the logs but the historyserver status
# but first we need think about authentication and a reverse proxy
echo "find job status on the job manager logs..."
while [[ -z "$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1)" ]]; do
echo "still waiting for a status on the job manager logs..."
sleep 1
done
input_status=$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1)
echo "##### INPUT STATUS #####"
echo $input_status
status=$(echo "$input_status" | grep -oP '\b\w+(?=\.$)')
echo "##### STATUS #####"
echo $status
if [[ "$status" == "FAILING" || "$status" == "FAILED" ]]; then
echo "job failed with '$status', will dump the logs now..."
# force exit so we can move to next step
exit 128
fi
- name: cleanup if monitor job fails
if: steps.monitorjob.outcome == 'failure'
run: |
# much easier to do in bash than in Python via subprocess
echo "##################### OPERATOR ######################"
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
echo "##################### JOB MANAGER ######################"
kubectl get pod | grep -v taskmanager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
echo "##################### TASK MANAGER ######################"
# depending on the `inputs.parallism` we can more than one taskmanager so grab the first one
# TODO: loop through them all and dump the logs
kubectl get pod | grep taskmanager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | head -n1 | xargs -I{} kubectl logs pod/{} -c flink-main-container > /tmp/taskmanager.log
cat /tmp/taskmanager.log
# delete the flinkdeployment so we don't have old failures hanging around
kubectl get flinkdeployment --no-headers | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{}
# grok if this is a JVM OOM error
jvm_error=$(cat /tmp/taskmanager.log | grep "java.lang.OutOfMemoryError")
if [[ "$jvm_error" ]]; then
RED='\033[0;31m'
GREEN='\033[0;32m'
NOCOLOR='\033[0m' # To reset the color
echo "###################################################"
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be a JVM OOM error in the task manager logs...${NOCOLOR}"
jvm_error_log_dump=$(cat /tmp/taskmanager.log | grep -a20 "java.lang.OutOfMemoryError")
echo "$jvm_error_log_dump"
fi
# force GH action to show failed result
exit 128