-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
3 changed files
with
390 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,318 @@ | ||
# Airflow 1.10+安装 | ||
|
||
Airflow是Airbnb开源的任务调度系统,我主要用调度一些爬虫任务和数据清洗任务。 | ||
|
||
本次安装Airflow版本为1.10+,其需要依赖Python和DB,本次选择的DB为Mysql。 | ||
本次安装组件及版本如下: | ||
|
||
Airflow == 1.10.0 | ||
Python == 3.6.5 | ||
MySQL == 5.7 | ||
|
||
### 常用命令 | ||
|
||
``` | ||
# 启动schedule | ||
airflow scheduler -D | ||
# 启动webserver | ||
airflow webserver -D | ||
# 异常状况下,杀掉Airflow相关进程 | ||
ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {} | ||
ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {} | ||
``` | ||
|
||
## 整体安装流程 | ||
1. 建表 | ||
2. 安装Airflow | ||
3. 配置 | ||
4. 运行 | ||
5. 配置任务 | ||
|
||
|
||
### 建表 | ||
``` | ||
# 库名为airflow | ||
create database airflow; | ||
# 建用户用户名为airflow,并且设置所有ip均可以访问。 | ||
create user 'airflow'@'%' identified by 'airflow'; | ||
create user 'airflow'@'localhost' identified by 'airflow'; | ||
# 为新建的airflow用户授予airflow库的所有权限 | ||
grant all on airflow.* to 'airflow'@'%'; | ||
flush privileges | ||
``` | ||
|
||
### Airflow安装 | ||
|
||
``` | ||
# 创建虚拟环境,通过virtualenv | ||
mkdir /usr/local/virtual_env && cd /usr/local/virtual_env # 创建目录 | ||
virtualenv --no-site-packages airflow --python=python # 创建虚拟环境 | ||
source /usr/local/virtual_env/airflow/bin/activate # 激活虚拟环境 | ||
# 指定豆瓣作为pip源安装Airflow | ||
pip install apache-airflow -i https://pypi.douban.com/simple | ||
``` | ||
在安装完一堆的依赖后,就需要配置 AIRFLOW_HOME 环境变量,后续的 DAG 和 Plugin 都将以该目录作为根目录查找,如上,可以直接设置为 /tmp/project 。 | ||
|
||
|
||
安装时出现报错,可参考以下操作: | ||
``` | ||
ERROR: flask 1.1.1 has requirement Jinja2>=2.10.1, but you'll have jinja2 2.10 which is incompatible. | ||
ERROR: flask 1.1.1 has requirement Werkzeug>=0.15, but you'll have werkzeug 0.14.1 which is incompatible. | ||
# 执行 | ||
pip3 install -U Flask==1.0.4 | ||
# 执行 | ||
pip3 install -U pika==0.13.1 | ||
# 重新执行 | ||
pip install apache-airflow -i https://pypi.douban.com/simple | ||
``` | ||
|
||
设置环境变量 | ||
``` | ||
# 设置Airflow的工作目录,之后的日志、任务文件都需要存在这个目录之下 | ||
export AIRFLOW_HOME=/airflow | ||
## 注意:有部分教程将airflow的工作目录设置为tmp/目录下,这是错误的,因为tmp是临时目录,系统会定时清理这个目录下的文件。 | ||
``` | ||
|
||
查看其版本信息 | ||
``` | ||
airflow version | ||
# 如果能够正常显示版本信息,则表明,Airflow已经安装成功 | ||
``` | ||
|
||
执行了上述的命令后,会生成 airflow.cfg 和 unittests.cfg 两个文件,其中前者是一个配置文件 。 | ||
|
||
### airflow 配置 | ||
修改Airflow DB配置 | ||
|
||
#### 安装Mysql模块 | ||
``` | ||
# 这里可以简单说下,airflow依赖的其他组件均可以此方式安装。在之后安装password组件同样是通过此方式。 | ||
pip install "apache-airflow[mysql]" | ||
# 修改Airflow DB配置 | ||
${AIRFLOW_HOME}/airflow.cfg | ||
# 修改数据的链接,参数的格式为mysql://帐号:密码@ip:port/db | ||
sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow | ||
``` | ||
|
||
配置完成后需要初始化db,Airflow会自动新建所依赖的表 | ||
``` | ||
# 初始化数据库 | ||
airflow initdb | ||
# 如报错 | ||
Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' | ||
# 配置文件按如下格式修改 | ||
sql_alchemy_conn = mysql+mysqldb://airflow:[email protected]:3306/airflow | ||
``` | ||
|
||
### 2. 用户认证 | ||
Airflow拥有可视化的界面,并且需要登录才能查看和管理 | ||
|
||
``` | ||
# 安装passsword组件 | ||
pip install "apache-airflow[password]" | ||
# 修改 airflow.cfg | ||
[webserver] | ||
authenticate = True | ||
auth_backend = airflow.contrib.auth.backends.password_auth | ||
``` | ||
##### 在python环境中执行如下代码以添加账户: | ||
|
||
``` | ||
import airflow | ||
from airflow import models, settings | ||
from airflow.contrib.auth.backends.password_auth import PasswordUser | ||
user = PasswordUser(models.User()) | ||
user.username = 'admin' # 用户名 | ||
user.email = '[email protected]' # 用户邮箱 | ||
user.password = 'password' # 用户密码 | ||
session = settings.Session() | ||
session.add(user) | ||
session.commit() | ||
session.close() | ||
exit() | ||
``` | ||
|
||
##### 配置邮件服务 | ||
此配置设置的是dag的task失败或者重试时发送邮件的发送者。配置如下: | ||
|
||
``` | ||
[smtp] | ||
# If you want airflow to send emails on retries, failure, and you want to use | ||
# the airflow.utils.email.send_email_smtp function, you have to configure an | ||
smtp_host = smtp.163.com | ||
smtp_starttls = True | ||
smtp_ssl = False | ||
# Uncomment and set the user/pass settings if you want to use SMTP AUTH | ||
smtp_user = [email protected] | ||
smtp_password = password | ||
smtp_port = 25 | ||
smtp_mail_from = [email protected] | ||
接下来简单把dag的Python代码列出来,以供参考: | ||
default_args = { | ||
'owner': 'ownerExample', | ||
'start_date': datetime(2018, 9, 18), | ||
'email': ['[email protected]'], # 出问题时,发送报警Email的地址,可以填多个,用逗号隔开。 | ||
'email_on_failure': ['[email protected]'], # 任务失败且重试次数用完时发送Email。 | ||
'email_on_retry': True, # 任务重试时是否发送Email | ||
'depends_on_past': False, # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行。 | ||
'retries': 3, | ||
'retry_delay': timedelta(minutes=3), | ||
} | ||
``` | ||
|
||
### 4、配置Executor | ||
``` | ||
# 设置Executor 修改:airflow.cfg | ||
executor = LocalExecutor | ||
# 我使用的只有有单节点所以使用的是LocalExecutor模式。多节点请参照文档进行配置 | ||
``` | ||
|
||
|
||
### 5. 修改log地址 | ||
``` | ||
# 在配置文件中,找到相应位置,并修改 | ||
[core] | ||
base_log_folder = /servers/logs/airflow | ||
[scheduler] | ||
child_process_log_directory = servers/logs/airflow/scheduler | ||
``` | ||
|
||
### 6. 修改webserver地址 | ||
``` | ||
# 在配置文件中,找到相应位置,修改webserver地址 | ||
[webserver] | ||
base_url = http://host:port | ||
可以通过上面配置的地址访问webserver。 | ||
``` | ||
|
||
### 7. 可选配置 | ||
|
||
(可选)修改Scheduler线程数 | ||
如果调度任务不多的话可以把线程数调小,默认为32。参数为:parallelism | ||
|
||
(可选)不加载example dag | ||
如果不想加载示例dag可以把load_examples配置改为False,默认为True。这个配置只有在第一次启动airflow之前设置才有效。 | ||
|
||
如果此方法不生效,可以删除${PYTHON_HOME}/site-packages/airflow/example_dags目录,也是同样的效果。 | ||
|
||
(可选)修改检测新dag间隔 | ||
修改min_file_process_interval参数为10,每10s识别一次新的dag。默认为0,没有时间间隔。 | ||
|
||
## 运行airflow | ||
|
||
``` | ||
# 启动schedule | ||
airflow scheduler | ||
# 启动webserver | ||
airflow webserver | ||
``` | ||
|
||
|
||
## 安装问题汇总 | ||
|
||
#### Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql | ||
|
||
修改Mysql配置文件my.cnf,具体步骤如下: | ||
|
||
查找my.cnf文件位置 | ||
mysql --help | grep my.cnf | ||
|
||
修改文件 | ||
explicit_defaults_for_timestamp=true | ||
注意:必须写在【mysqld】下 | ||
|
||
重启Mysql | ||
sudo systemctl restart mysqld.service | ||
查看修改是否生效。执行如下SQL,如果值为1则为生效。 | ||
|
||
#### pip install "apache-airflow[mysql]"报错: | ||
mysql_config not found | ||
|
||
安装mysql-devel: | ||
首先查看是否有mysql_config文件。 | ||
find / -name mysql_config | ||
|
||
如果没有安装mysql-devel | ||
yum install mysql-devel | ||
|
||
## 配置任务 | ||
|
||
在 AirFlow 中,每个节点都是一个任务,可以是一条命令行 (BashOperator),可以是一段 Python 脚本 (PythonOperator) 等等,然后这些节点根据依赖关系构成了一条流程,一个图,称为一个 DAG 。 | ||
|
||
默认会到 ${AIRFLOW_HOME}/dags 目录下查找,可以直接在该目录下创建相应的文件。 | ||
|
||
如下是一个简单的示例。 | ||
|
||
``` | ||
import airflow | ||
from airflow import DAG | ||
from airflow.operators.bash_operator import BashOperator | ||
from airflow.operators.python_operator import PythonOperator | ||
from datetime import timedelta, datetime | ||
import pytz | ||
# ------------------------------------------------------------------------------- | ||
# these args will get passed on to each operator | ||
# you can override them on a per-task basis during operator initialization | ||
default_args = { | ||
'owner': 'qxy', | ||
'depends_on_past': False, | ||
'email_on_failure': False, | ||
'email_on_retry': False, | ||
'retries': 1, | ||
'retry_delay': timedelta(minutes=5), | ||
} | ||
tz = pytz.timezone('Asia/Shanghai') | ||
# naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S") | ||
# local_dt = tz.localize(naive, is_dst=None) | ||
# utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None) | ||
dt = datetime(2019, 7, 16, 16, 30, tzinfo=tz) | ||
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None) | ||
dag = DAG( | ||
'airflow_interval_test', | ||
default_args=default_args, | ||
description='airflow_interval_test', | ||
schedule_interval='35 17 * * *', | ||
start_date=utc_dt | ||
) | ||
t1 = BashOperator( | ||
task_id='sleep', | ||
bash_command='sleep 5', | ||
dag=dag) | ||
t2 = BashOperator( | ||
task_id='print_date', | ||
bash_command='date', | ||
dag=dag) | ||
t1 >> t2 | ||
``` | ||
|
||
该文件创建一个简单的 DAG,只有三个运算符,两个 BaseOperator ,也就是执行 Bash 命令分别打印日期以及休眠 5 秒;另一个为 PythonOperator 在执行任务时调用 print_hello() 函数。 | ||
文件创建好后,放置到 ${AIRFLOW_HOME}/dags,airflow 自动读取该DAG。 | ||
|
||
测试是否正常,如果无报错那么就说明正常 | ||
python /tmp/project/dags/hello_world.py | ||
|
||
在安装和使用Airflow过程中,查阅了大量的博客和文档,本文也是在此基础上整理并完善。部分内容来源由于时间久远,未能一一记录,若有引用或侵权之处,请及时联系我。 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
Superset是Airbnb开源的BI工具,我们在线上环境用它进行数据的展示和分析。 | ||
本文记录了安装和使用Superset的一些过程 | ||
|
||
## 安装Superset | ||
``` | ||
# 通过虚拟环境,与其他环境隔离 | ||
# 创建目录 | ||
cd /usr/local/virtual_env/superset | ||
# 创建虚拟环境 | ||
virtualenv --no-site-packages superset --python=python | ||
# 激活虚拟环境 | ||
source /usr/local/virtual_env/superset/bin/activate | ||
# 安装Seperset 选择了清华的pip源 | ||
pip install superset -i https://pypi.tuna.tsinghua.edu.cn/simple | ||
``` | ||
|
||
## 创建管理员账号 | ||
``` | ||
fabmanager create-admin --app superset | ||
``` | ||
若报错和pandas相关,执行 pip install pandas==0.23.4 -i https://pypi.tuna.tsinghua.edu.cn/simple | ||
安装完成后,重新执行 | ||
``` | ||
fabmanager create-admin --app superset | ||
``` | ||
|
||
## 初始化数据库 | ||
``` | ||
superset db upgrade | ||
``` | ||
若报错和sqlalchemy相关,执行 pip install SQLAlchemy==1.2.18 | ||
安装完成后,重新执行 | ||
``` | ||
superset db upgrade | ||
``` | ||
|
||
## 载入案例数据 | ||
通过案例数据可以了解Superset可以展示什么类型的工具,以及有什么功能,第一次使用,建议载入。熟悉之后,可以不载入。 | ||
``` | ||
superset load_examples | ||
``` | ||
|
||
## 初始化角色和权限 | ||
``` | ||
superset init | ||
``` | ||
|
||
## 启动服务,端口号 8088,使用 -p 更改端口号 | ||
``` | ||
superset runserver -p 8089 | ||
``` | ||
|
||
Superset的安装还是比较简单的,这里吐槽一下Airflow,真的太麻烦了,坑太多了!!! |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
## 开发这个博客的原动力 | ||
|
||
### 彻底转型开发 | ||
之前一直都是做数据分析、爬虫相关的工作,时间久了发现写代码确实是自己喜欢做的事情。 | ||
|
||
首先,写代码这件事能够给我带来成就感,写漂亮,舒服的代码让自己很心安。 | ||
|
||
其次,学习写代码的过程让我感觉非常有确定性,自己掌握了没有,掌握到什么程度,都是有明显感受的。 | ||
|
||
所以,既然喜欢做这件事,索性就向纯粹的开发转型吧! | ||
|
||
### 真实的阅读和学习需要 | ||
很多人都会觉得看看博客很低效,可能是有一定道理。但对于现阶段的我来讲,博客依然是一个重要的学习来源。定期看博客是我的一个强需求。 | ||
|
||
所以在这两个目标驱使下,我逐渐有了做一个博客兼内容聚合这样一个应用的想法。 | ||
|
||
从有这个想法到这个博客成型,中间一共花费了大概90个小时,收获很多。 | ||
|
||
这个博客我也会一直维护下去,学习编程时间太晚了,唯有持续努力,才能迎头赶上。 |