Skip to content

Latest commit

 

History

History
318 lines (242 loc) · 9.34 KB

2019_11_05_Airflow安装配置记录.md

File metadata and controls

318 lines (242 loc) · 9.34 KB

Airflow 1.10+安装

Airflow是Airbnb开源的任务调度系统,我主要用调度一些爬虫任务和数据清洗任务。

本次安装Airflow版本为1.10+,其需要依赖Python和DB,本次选择的DB为Mysql。 本次安装组件及版本如下:

Airflow == 1.10.0 Python == 3.6.5 MySQL == 5.7

常用命令

# 启动schedule
airflow scheduler -D
# 启动webserver
airflow webserver -D

# 异常状况下,杀掉Airflow相关进程
ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {}

整体安装流程

  1. 建表
  2. 安装Airflow
  3. 配置
  4. 运行
  5. 配置任务

建表

# 库名为airflow
create database airflow;

# 建用户用户名为airflow,并且设置所有ip均可以访问。
create user 'airflow'@'%' identified by 'airflow';
create user 'airflow'@'localhost' identified by 'airflow';

# 为新建的airflow用户授予airflow库的所有权限
grant all on airflow.* to 'airflow'@'%';
flush privileges

Airflow安装

# 创建虚拟环境,通过virtualenv
mkdir /usr/local/virtual_env && cd /usr/local/virtual_env # 创建目录
virtualenv --no-site-packages airflow --python=python  # 创建虚拟环境
source /usr/local/virtual_env/airflow/bin/activate # 激活虚拟环境

# 指定豆瓣作为pip源安装Airflow
pip install apache-airflow -i https://pypi.douban.com/simple

在安装完一堆的依赖后,就需要配置 AIRFLOW_HOME 环境变量,后续的 DAG 和 Plugin 都将以该目录作为根目录查找,如上,可以直接设置为 /tmp/project 。

安装时出现报错,可参考以下操作:

ERROR: flask 1.1.1 has requirement Jinja2>=2.10.1, but you'll have jinja2 2.10 which is incompatible.
ERROR: flask 1.1.1 has requirement Werkzeug>=0.15, but you'll have werkzeug 0.14.1 which is incompatible.

# 执行
pip3 install -U Flask==1.0.4
# 执行
pip3 install -U pika==0.13.1

# 重新执行
pip install apache-airflow -i https://pypi.douban.com/simple

设置环境变量

# 设置Airflow的工作目录,之后的日志、任务文件都需要存在这个目录之下
export AIRFLOW_HOME=/airflow

## 注意:有部分教程将airflow的工作目录设置为tmp/目录下,这是错误的,因为tmp是临时目录,系统会定时清理这个目录下的文件。

查看其版本信息

airflow version
# 如果能够正常显示版本信息,则表明,Airflow已经安装成功

执行了上述的命令后,会生成 airflow.cfg 和 unittests.cfg 两个文件,其中前者是一个配置文件 。

airflow 配置

修改Airflow DB配置

安装Mysql模块

# 这里可以简单说下,airflow依赖的其他组件均可以此方式安装。在之后安装password组件同样是通过此方式。
pip install "apache-airflow[mysql]"

# 修改Airflow DB配置
${AIRFLOW_HOME}/airflow.cfg

# 修改数据的链接,参数的格式为mysql://帐号:密码@ip:port/db
sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow

配置完成后需要初始化db,Airflow会自动新建所依赖的表

# 初始化数据库
airflow initdb

# 如报错
Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock'

# 配置文件按如下格式修改
sql_alchemy_conn = mysql+mysqldb://airflow:[email protected]:3306/airflow

2. 用户认证

Airflow拥有可视化的界面,并且需要登录才能查看和管理

# 安装passsword组件
pip install "apache-airflow[password]"

# 修改 airflow.cfg
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
在python环境中执行如下代码以添加账户:
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin'  # 用户名
user.email = '[email protected]' # 用户邮箱
user.password = 'password'   # 用户密码
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
配置邮件服务

此配置设置的是dag的task失败或者重试时发送邮件的发送者。配置如下:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = [email protected]
smtp_password = password
smtp_port = 25
smtp_mail_from = [email protected]
接下来简单把dag的Python代码列出来,以供参考:

default_args = {
  'owner': 'ownerExample',
  'start_date': datetime(2018, 9, 18),
  'email': ['[email protected]'], # 出问题时,发送报警Email的地址,可以填多个,用逗号隔开。
  'email_on_failure': ['[email protected]'], # 任务失败且重试次数用完时发送Email。
  'email_on_retry': True, # 任务重试时是否发送Email
  'depends_on_past': False, # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行。
  'retries': 3,
  'retry_delay': timedelta(minutes=3),
}

4、配置Executor

# 设置Executor 修改:airflow.cfg
executor = LocalExecutor

# 我使用的只有有单节点所以使用的是LocalExecutor模式。多节点请参照文档进行配置

5. 修改log地址

# 在配置文件中,找到相应位置,并修改
[core]
base_log_folder = /servers/logs/airflow
[scheduler]
child_process_log_directory = servers/logs/airflow/scheduler

6. 修改webserver地址

# 在配置文件中,找到相应位置,修改webserver地址
[webserver]
base_url = http://host:port
可以通过上面配置的地址访问webserver。

7. 可选配置

(可选)修改Scheduler线程数 如果调度任务不多的话可以把线程数调小,默认为32。参数为:parallelism

(可选)不加载example dag 如果不想加载示例dag可以把load_examples配置改为False,默认为True。这个配置只有在第一次启动airflow之前设置才有效。

如果此方法不生效,可以删除${PYTHON_HOME}/site-packages/airflow/example_dags目录,也是同样的效果。

(可选)修改检测新dag间隔 修改min_file_process_interval参数为10,每10s识别一次新的dag。默认为0,没有时间间隔。

运行airflow

# 启动schedule
airflow scheduler
# 启动webserver
airflow webserver

安装问题汇总

Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql

修改Mysql配置文件my.cnf,具体步骤如下:

查找my.cnf文件位置 mysql --help | grep my.cnf

修改文件 explicit_defaults_for_timestamp=true 注意:必须写在【mysqld】下

重启Mysql sudo systemctl restart mysqld.service 查看修改是否生效。执行如下SQL,如果值为1则为生效。

pip install "apache-airflow[mysql]"报错:

mysql_config not found

安装mysql-devel: 首先查看是否有mysql_config文件。 find / -name mysql_config

如果没有安装mysql-devel yum install mysql-devel

配置任务

在 AirFlow 中,每个节点都是一个任务,可以是一条命令行 (BashOperator),可以是一段 Python 脚本 (PythonOperator) 等等,然后这些节点根据依赖关系构成了一条流程,一个图,称为一个 DAG 。

默认会到 ${AIRFLOW_HOME}/dags 目录下查找,可以直接在该目录下创建相应的文件。

如下是一个简单的示例。

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta, datetime
import pytz

# -------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization

default_args = {
	'owner': 'qxy',
	'depends_on_past': False,
	'email_on_failure': False,
	'email_on_retry': False,
	'retries': 1,
	'retry_delay': timedelta(minutes=5),
	}

tz = pytz.timezone('Asia/Shanghai')
# naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
# local_dt = tz.localize(naive, is_dst=None)
# utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None)

dt = datetime(2019, 7, 16, 16, 30, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)


dag = DAG(
'airflow_interval_test',
default_args=default_args,
description='airflow_interval_test',
schedule_interval='35 17 * * *',
start_date=utc_dt
)

t1 = BashOperator(
	task_id='sleep',
	bash_command='sleep 5',
	dag=dag)

t2 = BashOperator(
	task_id='print_date',
	bash_command='date',
	dag=dag)

	t1 >> t2

该文件创建一个简单的 DAG,只有三个运算符,两个 BaseOperator ,也就是执行 Bash 命令分别打印日期以及休眠 5 秒;另一个为 PythonOperator 在执行任务时调用 print_hello() 函数。 文件创建好后,放置到 ${AIRFLOW_HOME}/dags,airflow 自动读取该DAG。

测试是否正常,如果无报错那么就说明正常 python /tmp/project/dags/hello_world.py

在安装和使用Airflow过程中,查阅了大量的博客和文档,本文也是在此基础上整理并完善。部分内容来源由于时间久远,未能一一记录,若有引用或侵权之处,请及时联系我。