diff --git "a/2019.11/2019_11_05_Airflow\345\256\211\350\243\205\351\205\215\347\275\256\350\256\260\345\275\225.md" "b/2019.11/2019_11_05_Airflow\345\256\211\350\243\205\351\205\215\347\275\256\350\256\260\345\275\225.md" new file mode 100644 index 0000000..571fb2e --- /dev/null +++ "b/2019.11/2019_11_05_Airflow\345\256\211\350\243\205\351\205\215\347\275\256\350\256\260\345\275\225.md" @@ -0,0 +1,318 @@ +# Airflow 1.10+安装 + +Airflow是Airbnb开源的任务调度系统,我主要用调度一些爬虫任务和数据清洗任务。 + +本次安装Airflow版本为1.10+,其需要依赖Python和DB,本次选择的DB为Mysql。 +本次安装组件及版本如下: + +Airflow == 1.10.0 +Python == 3.6.5 +MySQL == 5.7 + +### 常用命令 + +``` +# 启动schedule +airflow scheduler -D +# 启动webserver +airflow webserver -D + +# 异常状况下,杀掉Airflow相关进程 +ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {} +ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {} +``` + +## 整体安装流程 +1. 建表 +2. 安装Airflow +3. 配置 +4. 运行 +5. 配置任务 + + +### 建表 +``` +# 库名为airflow +create database airflow; + +# 建用户用户名为airflow,并且设置所有ip均可以访问。 +create user 'airflow'@'%' identified by 'airflow'; +create user 'airflow'@'localhost' identified by 'airflow'; + +# 为新建的airflow用户授予airflow库的所有权限 +grant all on airflow.* to 'airflow'@'%'; +flush privileges +``` + +### Airflow安装 + +``` +# 创建虚拟环境,通过virtualenv +mkdir /usr/local/virtual_env && cd /usr/local/virtual_env # 创建目录 +virtualenv --no-site-packages airflow --python=python # 创建虚拟环境 +source /usr/local/virtual_env/airflow/bin/activate # 激活虚拟环境 + +# 指定豆瓣作为pip源安装Airflow +pip install apache-airflow -i https://pypi.douban.com/simple +``` +在安装完一堆的依赖后,就需要配置 AIRFLOW_HOME 环境变量,后续的 DAG 和 Plugin 都将以该目录作为根目录查找,如上,可以直接设置为 /tmp/project 。 + + +安装时出现报错,可参考以下操作: +``` +ERROR: flask 1.1.1 has requirement Jinja2>=2.10.1, but you'll have jinja2 2.10 which is incompatible. +ERROR: flask 1.1.1 has requirement Werkzeug>=0.15, but you'll have werkzeug 0.14.1 which is incompatible. + +# 执行 +pip3 install -U Flask==1.0.4 +# 执行 +pip3 install -U pika==0.13.1 + +# 重新执行 +pip install apache-airflow -i https://pypi.douban.com/simple +``` + +设置环境变量 +``` +# 设置Airflow的工作目录,之后的日志、任务文件都需要存在这个目录之下 +export AIRFLOW_HOME=/airflow + +## 注意:有部分教程将airflow的工作目录设置为tmp/目录下,这是错误的,因为tmp是临时目录,系统会定时清理这个目录下的文件。 +``` + +查看其版本信息 +``` +airflow version +# 如果能够正常显示版本信息,则表明,Airflow已经安装成功 +``` + +执行了上述的命令后,会生成 airflow.cfg 和 unittests.cfg 两个文件,其中前者是一个配置文件 。 + +### airflow 配置 +修改Airflow DB配置 + +#### 安装Mysql模块 +``` +# 这里可以简单说下,airflow依赖的其他组件均可以此方式安装。在之后安装password组件同样是通过此方式。 +pip install "apache-airflow[mysql]" + +# 修改Airflow DB配置 +${AIRFLOW_HOME}/airflow.cfg + +# 修改数据的链接,参数的格式为mysql://帐号:密码@ip:port/db +sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow +``` + +配置完成后需要初始化db,Airflow会自动新建所依赖的表 +``` +# 初始化数据库 +airflow initdb + +# 如报错 +Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' + +# 配置文件按如下格式修改 +sql_alchemy_conn = mysql+mysqldb://airflow:airflow@127.0.0.1:3306/airflow +``` + +### 2. 用户认证 +Airflow拥有可视化的界面,并且需要登录才能查看和管理 + +``` +# 安装passsword组件 +pip install "apache-airflow[password]" + +# 修改 airflow.cfg +[webserver] +authenticate = True +auth_backend = airflow.contrib.auth.backends.password_auth +``` +##### 在python环境中执行如下代码以添加账户: + +``` +import airflow +from airflow import models, settings +from airflow.contrib.auth.backends.password_auth import PasswordUser +user = PasswordUser(models.User()) +user.username = 'admin' # 用户名 +user.email = 'emailExample@163.com' # 用户邮箱 +user.password = 'password' # 用户密码 +session = settings.Session() +session.add(user) +session.commit() +session.close() +exit() +``` + +##### 配置邮件服务 +此配置设置的是dag的task失败或者重试时发送邮件的发送者。配置如下: + +``` +[smtp] +# If you want airflow to send emails on retries, failure, and you want to use +# the airflow.utils.email.send_email_smtp function, you have to configure an +smtp_host = smtp.163.com +smtp_starttls = True +smtp_ssl = False +# Uncomment and set the user/pass settings if you want to use SMTP AUTH +smtp_user = mailExample@163.com +smtp_password = password +smtp_port = 25 +smtp_mail_from = mailExample@163.com +接下来简单把dag的Python代码列出来,以供参考: + +default_args = { + 'owner': 'ownerExample', + 'start_date': datetime(2018, 9, 18), + 'email': ['mailReceiver@163.com'], # 出问题时,发送报警Email的地址,可以填多个,用逗号隔开。 + 'email_on_failure': ['mailReceiver@163.com'], # 任务失败且重试次数用完时发送Email。 + 'email_on_retry': True, # 任务重试时是否发送Email + 'depends_on_past': False, # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行。 + 'retries': 3, + 'retry_delay': timedelta(minutes=3), +} +``` + +### 4、配置Executor +``` +# 设置Executor 修改:airflow.cfg +executor = LocalExecutor + +# 我使用的只有有单节点所以使用的是LocalExecutor模式。多节点请参照文档进行配置 +``` + + +### 5. 修改log地址 +``` +# 在配置文件中,找到相应位置,并修改 +[core] +base_log_folder = /servers/logs/airflow +[scheduler] +child_process_log_directory = servers/logs/airflow/scheduler +``` + +### 6. 修改webserver地址 +``` +# 在配置文件中,找到相应位置,修改webserver地址 +[webserver] +base_url = http://host:port +可以通过上面配置的地址访问webserver。 +``` + +### 7. 可选配置 + +(可选)修改Scheduler线程数 +如果调度任务不多的话可以把线程数调小,默认为32。参数为:parallelism + +(可选)不加载example dag +如果不想加载示例dag可以把load_examples配置改为False,默认为True。这个配置只有在第一次启动airflow之前设置才有效。 + +如果此方法不生效,可以删除${PYTHON_HOME}/site-packages/airflow/example_dags目录,也是同样的效果。 + +(可选)修改检测新dag间隔 +修改min_file_process_interval参数为10,每10s识别一次新的dag。默认为0,没有时间间隔。 + +## 运行airflow + +``` +# 启动schedule +airflow scheduler +# 启动webserver +airflow webserver +``` + + +## 安装问题汇总 + +#### Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql + +修改Mysql配置文件my.cnf,具体步骤如下: + +查找my.cnf文件位置 +mysql --help | grep my.cnf + +修改文件 +explicit_defaults_for_timestamp=true +注意:必须写在【mysqld】下 + +重启Mysql +sudo systemctl restart mysqld.service +查看修改是否生效。执行如下SQL,如果值为1则为生效。 + +#### pip install "apache-airflow[mysql]"报错: +mysql_config not found + +安装mysql-devel: +首先查看是否有mysql_config文件。 +find / -name mysql_config + +如果没有安装mysql-devel +yum install mysql-devel + +## 配置任务 + +在 AirFlow 中,每个节点都是一个任务,可以是一条命令行 (BashOperator),可以是一段 Python 脚本 (PythonOperator) 等等,然后这些节点根据依赖关系构成了一条流程,一个图,称为一个 DAG 。 + +默认会到 ${AIRFLOW_HOME}/dags 目录下查找,可以直接在该目录下创建相应的文件。 + +如下是一个简单的示例。 + +``` +import airflow +from airflow import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator +from datetime import timedelta, datetime +import pytz + +# ------------------------------------------------------------------------------- +# these args will get passed on to each operator +# you can override them on a per-task basis during operator initialization + +default_args = { + 'owner': 'qxy', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + } + +tz = pytz.timezone('Asia/Shanghai') +# naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S") +# local_dt = tz.localize(naive, is_dst=None) +# utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None) + +dt = datetime(2019, 7, 16, 16, 30, tzinfo=tz) +utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None) + + +dag = DAG( +'airflow_interval_test', +default_args=default_args, +description='airflow_interval_test', +schedule_interval='35 17 * * *', +start_date=utc_dt +) + +t1 = BashOperator( + task_id='sleep', + bash_command='sleep 5', + dag=dag) + +t2 = BashOperator( + task_id='print_date', + bash_command='date', + dag=dag) + + t1 >> t2 + +``` + +该文件创建一个简单的 DAG,只有三个运算符,两个 BaseOperator ,也就是执行 Bash 命令分别打印日期以及休眠 5 秒;另一个为 PythonOperator 在执行任务时调用 print_hello() 函数。 +文件创建好后,放置到 ${AIRFLOW_HOME}/dags,airflow 自动读取该DAG。 + +测试是否正常,如果无报错那么就说明正常 +python /tmp/project/dags/hello_world.py + +在安装和使用Airflow过程中,查阅了大量的博客和文档,本文也是在此基础上整理并完善。部分内容来源由于时间久远,未能一一记录,若有引用或侵权之处,请及时联系我。 \ No newline at end of file diff --git "a/2019.11/2019_11_05_Superset\345\256\211\350\243\205\351\205\215\347\275\256\350\256\260\345\275\225.md" "b/2019.11/2019_11_05_Superset\345\256\211\350\243\205\351\205\215\347\275\256\350\256\260\345\275\225.md" new file mode 100644 index 0000000..375a10e --- /dev/null +++ "b/2019.11/2019_11_05_Superset\345\256\211\350\243\205\351\205\215\347\275\256\350\256\260\345\275\225.md" @@ -0,0 +1,53 @@ +Superset是Airbnb开源的BI工具,我们在线上环境用它进行数据的展示和分析。 +本文记录了安装和使用Superset的一些过程 + +## 安装Superset +``` +# 通过虚拟环境,与其他环境隔离 +# 创建目录 +cd /usr/local/virtual_env/superset +# 创建虚拟环境 +virtualenv --no-site-packages superset --python=python +# 激活虚拟环境 +source /usr/local/virtual_env/superset/bin/activate +# 安装Seperset 选择了清华的pip源 +pip install superset -i https://pypi.tuna.tsinghua.edu.cn/simple +``` + +## 创建管理员账号 +``` +fabmanager create-admin --app superset +``` +若报错和pandas相关,执行 pip install pandas==0.23.4 -i https://pypi.tuna.tsinghua.edu.cn/simple +安装完成后,重新执行 +``` +fabmanager create-admin --app superset +``` + +## 初始化数据库 +``` +superset db upgrade +``` +若报错和sqlalchemy相关,执行 pip install SQLAlchemy==1.2.18 +安装完成后,重新执行 +``` +superset db upgrade +``` + +## 载入案例数据 +通过案例数据可以了解Superset可以展示什么类型的工具,以及有什么功能,第一次使用,建议载入。熟悉之后,可以不载入。 +``` +superset load_examples +``` + +## 初始化角色和权限 +``` +superset init +``` + +## 启动服务,端口号 8088,使用 -p 更改端口号 +``` +superset runserver -p 8089 +``` + +Superset的安装还是比较简单的,这里吐槽一下Airflow,真的太麻烦了,坑太多了!!! \ No newline at end of file diff --git "a/2019.11/2019_11_17_\344\270\272\344\273\200\344\271\210\344\274\232\346\234\211\350\277\231\344\270\252\345\215\232\345\256\242\357\274\237.md" "b/2019.11/2019_11_17_\344\270\272\344\273\200\344\271\210\344\274\232\346\234\211\350\277\231\344\270\252\345\215\232\345\256\242\357\274\237.md" new file mode 100644 index 0000000..d0dec94 --- /dev/null +++ "b/2019.11/2019_11_17_\344\270\272\344\273\200\344\271\210\344\274\232\346\234\211\350\277\231\344\270\252\345\215\232\345\256\242\357\274\237.md" @@ -0,0 +1,19 @@ +## 开发这个博客的原动力 + +### 彻底转型开发 +之前一直都是做数据分析、爬虫相关的工作,时间久了发现写代码确实是自己喜欢做的事情。 + +首先,写代码这件事能够给我带来成就感,写漂亮,舒服的代码让自己很心安。 + +其次,学习写代码的过程让我感觉非常有确定性,自己掌握了没有,掌握到什么程度,都是有明显感受的。 + +所以,既然喜欢做这件事,索性就向纯粹的开发转型吧! + +### 真实的阅读和学习需要 +很多人都会觉得看看博客很低效,可能是有一定道理。但对于现阶段的我来讲,博客依然是一个重要的学习来源。定期看博客是我的一个强需求。 + +所以在这两个目标驱使下,我逐渐有了做一个博客兼内容聚合这样一个应用的想法。 + +从有这个想法到这个博客成型,中间一共花费了大概90个小时,收获很多。 + +这个博客我也会一直维护下去,学习编程时间太晚了,唯有持续努力,才能迎头赶上。 \ No newline at end of file