注意:python的版本必须是python3+,我用的是python3.7这个版本。
pip install apache-airflow
通过该命令即可安装成功。
这样安装好后,我的airflow的执行文件在目录下:
/Users/xhz/opt/anaconda3/envs/py37/bin
注意:由于我是使用anaconda3来管理python环境的,这里可以自己通过find命令来搜一下airflow这个命令的位置。把airflow的可执行程序的路径添加到PATH路径中,这样就可以直接使用airflow命令了。
airflow的配置文件目录是在:
/Users/xhz/airflow/
修改airflow的配置,就是修改以下文件:
/Users/xhz/airflow/airflow.cfg初始化 初始化数据库
若不修改配置,默认airflow使用的是SQLite文本数据,可以在airflow.cfg配置文件中配置该数据库的位置。也就是修改以下这一行的配置:
sql_alchemy_conn = sqlite:Users/reyun/airflow/airflow.db
注意:我这里不配置,而是使用默认的配置,这里可以修改成mysql数据库。
执行以下命令来初始化airflow运行时需要使用的数据库配置数据:
airflow db init添加一个用户
添加一个用户才能登录webserver页面,所以需要添加一个新的管理用户。
airflow users create --username admin --firstname admin --lastname admin --role Admin --email admin@example.org启动webserver和scheduler
- 启动webserver
airflow webserver -p 20001 -D
说明:-p是指定端口; -D是作为后台执行。
- 启动scheduler
airflow scheduler -D
说明:-D是把schduler放到后台执行。
现在就可以在浏览器中使用输入:http://127.0.0.1:20001查看airflow的管理页面。
使用编辑以下python文件,保存为:hello_world.py。
# -*- coding: utf-8 -*- import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from datetime import timedelta #------------------------------------------------------------------------------- # these args will get passed on to each operator # you can override them on a per-task basis during operator initialization default_args = { 'owner': 'xhz', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(2), 'email': ['xhz@qq.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'adhoc':False, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'trigger_rule': u'all_success' } #------------------------------------------------------------------------------- # dag dag = DAG( 'xhz_hello_world_dag', default_args=default_args, description='xhz first DAG', schedule_interval=timedelta(days=1)) #------------------------------------------------------------------------------- # first operator date_operator = BashOperator( task_id='date_task', bash_command='date', dag=dag) #------------------------------------------------------------------------------- # second operator sleep_operator = BashOperator( task_id='sleep_task', depends_on_past=False, bash_command='sleep 5', dag=dag) #------------------------------------------------------------------------------- # third operator def print_hello(): print("Hello world xhz!") return 'Hello world xhz!' hello_operator = PythonOperator( task_id='xhz_hello_task', python_callable=print_hello, dag=dag) #------------------------------------------------------------------------------- # dependencies # t1 # / # t2 t3 # sleep_operator.set_upstream(date_operator) hello_operator.set_upstream(date_operator)执行
以上代码写好后,通过python命令来提交和执行任务。
python hello_world.py总结
本文介绍了airflow的安装和基本的使用。以后的文章会针对任务调度比较关注的问题对其进行介绍。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)