如何使用Airflow调度数据科学工作流

如何使用Airflow调度数据科学工作流,第1张

步骤一:安装Airflow

docker pull airflow

1

docker pull airflow

步骤二:修改默认路径

这里如果不修改路径,默认的配置就是 ~/airflow

临时修改 AIRFLOW_HOME 环境变量, 这里的 /home/airflow 可以替换成你想要的文件夹目录

export AIRFLOW_HOME=/home/airflow

1

export AIRFLOW_HOME=/home/airflow

永久修改环境变量

echo "export AIRFLOW_HOME=/home/airflow" >>/etc/profile

source /etc/profile

1

2

echo "export AIRFLOW_HOME=/home/airflow" >>/etc/profile

source /etc/profile

系统默认的配置文件是从 airflow包的configuration.py文件中获取的,在设置了路径和airflow.cfg之后,配置将由airflow.cfg所替代。

步骤三:修改默认数据

找到配置文件

vi /home/airflow/airflow.cfg

1

vi /home/airflow/airflow.cfg

修改sql配置

sql_alchemy_conn = mysql://root:FinanceR@localhost:3306/airflow

1

sql_alchemy_conn = mysql://root:FinanceR@localhost:3306/airflow

注意到,之前使用的 mysql+driver://root:FinanceR@localhost:3306/airflow的方式是行不通的。

初始化服务器的数据库

airflow initdb

1

airflow initdb

airflow webserver

1

airflow webserver

就可以开启后台管理界面,默认访问localhost:8080即可。

步骤四:修改数据源配置

通过create按钮,添加相应的参数

步骤五:解决中文字符集显示问题

注意到 airflow 的所有.py文件都声明了 utf-8 字符集

# -*- coding: utf-8 -*-

1

# -*- coding: utf-8 -*-

如果需要显示中文,需要在extra选项中添加

{"charset":"utf8"}的配置

步骤:安装Airflow

docker pull airflow

1

docker pull airflow

步骤二:修改默认路径

修改路径默认配置 ~/airflow

临修改 AIRFLOW_HOME 环境变量, /home/airflow 替换想要文件夹目录

export AIRFLOW_HOME=/home/airflow

1

export AIRFLOW_HOME=/home/airflow

永久修改环境变量

echo "export AIRFLOW_HOME=/home/airflow" >>/etc/profile

source /etc/profile

1

2

echo "export AIRFLOW_HOME=/home/airflow" >>/etc/profile

source /etc/profile

系统默认配置文件 airflow包configuration.py文件获取设置路径airflow.cfg配置由airflow.cfg所替代

步骤三:修改默认数据库

找配置文件

vi /home/airflow/airflow.cfg

1

vi /home/airflow/airflow.cfg

修改sql配置

sql_alchemy_conn = mysql://root:FinanceR@localhost:3306/airflow

1

sql_alchemy_conn = mysql://root:FinanceR@localhost:3306/airflow

注意前使用 mysql+driver://root:FinanceR@localhost:3306/airflow式行通

初始化服务器数据库

airflow initdb

1

airflow initdb

airflow webserver

1

airflow webserver

启台管理界面默认访问localhost:8080即

步骤四:修改数据源配置

通create按钮添加相应参数

步骤五:解决文字符集显示问题

注意 airflow 所.py文件都声明 utf-8 字符集

# -*- coding: utf-8 -*-

1

# -*- coding: utf-8 -*-

需要显示文需要extra选项添加

{"charset":"utf8"}配置

在生产环境运行Airflow一段时间后,由于定时Job会在DagRun,TaskInstance等表插入大量的数据,会逐渐拖慢Airflow系统的内部SQL查询,进一步会影响前端管理页面的响应速度,所以需要定时清理不需要的 历史 数据,来保证前端管理页面的响应速度。

根据Airflow版本的不同,分为1.10(V1)版本和2.0之后(V2)的版本两种,代码有细微差异。

V2

V1

使用方法:

1. 将上面的代码复制到一个新py文件airflow_db_cleanup_dag.py,保存在DAG目录下。

2. 可以通过在Airflow变量里增加一个变量max_metadb_storage_days来配置元数据保留天数,如果不配置这个变量,默认是90天。

3. 可以修改 schedule_interval变量来设置DAG执行时间,目前是每天执行一次,在UTC时间的8点半,北京时间下午4点半。

注意事项:

1. 请根据你Airflow实际上线时间来判断,将要被删除的数据量的大小,如果数据量很大,会导致Job卡住或者响应变慢,建议在调度的低峰时间或者分批删除数据。分批删除的方法是,通过调整变量max_metadb_storage_days来控制删除的数据的时间范围,比如先删除1年前的,再删除6个月-1年之间的,最后删除6个月到3个月的数据。

2. 如果有每季度执行一次的任务,需要将max_metadb_storage_days调大至120天,否则可能会导致最近一次执行的DagRun被清理后,Dag又重新被触发一次。原因是Scheduler会持续检查每个DAG是否满足执行条件,如果找不到DagRun记录,会认为该Dag还没有被执行过,从而又执行一次。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/8087666.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-13
下一篇 2023-04-13

发表评论

登录后才能评论

评论列表(0条)

保存