forked from knowsuchagency/docker-compose-airflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtasks.py
82 lines (56 loc) · 1.88 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from pathlib import Path
import logging
import time
from invoke import task
@task
def wait(c, seconds=10):
logging.info(f"waiting {seconds} seconds")
time.sleep(seconds)
@task
def initdb(c):
c.run("airflow initdb", warn=True)
@task
def add_user(c):
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
import sqlalchemy
user = PasswordUser(models.User())
user.username = c.config.auth.username
user.email = c.config.auth.email
user.password = c.config.auth.password
session = settings.Session()
session.add(user)
try:
session.commit()
except sqlalchemy.exc.IntegrityError as e:
logging.exception(e)
finally:
session.close()
@task
def set_airflow_variables(c):
"""Configure airflow variables from configuration."""
for key, value in c.config.airflow.variables.items():
c.run(f"airflow variables --set {key} {value}")
@task
def configure_aws(c):
"""Copy aws secrets file to default aws credentials location."""
secret_path = Path("/run/secrets/aws-credentials")
credentials_root = Path(Path.home(), ".aws")
credentials_path = Path(credentials_root, "credentials")
if secret_path.exists() and not credentials_path.exists():
logging.info("aws secrets file found; writing to default path")
credentials_root.mkdir(exist_ok=True)
credentials_path.write_text(secret_path.read_text())
logging.info("aws secrets file not found. skipping configuration")
@task(initdb, add_user, set_airflow_variables, configure_aws)
def initialize(c):
"""Initialize db and anything else necessary prior to webserver, scheduler, workers etc."""
@task(initialize)
def webserver(c):
c.run("airflow webserver")
@task(wait)
def scheduler(c):
c.run("airflow scheduler")
@task(wait)
def worker(c):
c.run("airflow worker")