Airflow实战--安装配置(单机)

Airflow实战--安装配置(单机),第1张

Airflow实战--安装配置(单机) Airflow安装配置(单机) 安装配置

注意:python的版本必须是python3+,我用的是python3.7这个版本。

pip install apache-airflow

通过该命令即可安装成功。

这样安装好后,我的airflow的执行文件在目录下:

/Users/xhz/opt/anaconda3/envs/py37/bin

注意:由于我是使用anaconda3来管理python环境的,这里可以自己通过find命令来搜一下airflow这个命令的位置。把airflow的可执行程序的路径添加到PATH路径中,这样就可以直接使用airflow命令了。

airflow的配置文件目录是在:

/Users/xhz/airflow/

修改airflow的配置,就是修改以下文件:

/Users/xhz/airflow/airflow.cfg
初始化 初始化数据库

若不修改配置,默认airflow使用的是SQLite文本数据,可以在airflow.cfg配置文件中配置该数据库的位置。也就是修改以下这一行的配置:

sql_alchemy_conn = sqlite:Users/reyun/airflow/airflow.db

注意:我这里不配置,而是使用默认的配置,这里可以修改成mysql数据库。

执行以下命令来初始化airflow运行时需要使用的数据库配置数据:

airflow db init
添加一个用户

添加一个用户才能登录webserver页面,所以需要添加一个新的管理用户。

airflow users create 
    --username admin 
    --firstname admin 
    --lastname admin 
    --role Admin 
    --email admin@example.org
启动webserver和scheduler
  • 启动webserver
airflow webserver -p 20001 -D

说明:-p是指定端口; -D是作为后台执行。

  • 启动scheduler
airflow scheduler -D

说明:-D是把schduler放到后台执行。

现在就可以在浏览器中使用输入:http://127.0.0.1:20001查看airflow的管理页面。

使用

编辑以下python文件,保存为:hello_world.py。

# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
 
#-------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
 
default_args = {
    'owner': 'xhz',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['xhz@qq.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'adhoc':False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'
}
 
#-------------------------------------------------------------------------------
# dag
 
dag = DAG(
    'xhz_hello_world_dag',
    default_args=default_args,
    description='xhz first DAG',
    schedule_interval=timedelta(days=1))
 
#-------------------------------------------------------------------------------
# first operator
 
date_operator = BashOperator(
    task_id='date_task',
    bash_command='date',
    dag=dag)
 
#-------------------------------------------------------------------------------
# second operator
 
sleep_operator = BashOperator(
    task_id='sleep_task',
    depends_on_past=False,
    bash_command='sleep 5',
    dag=dag)
 
#-------------------------------------------------------------------------------
# third operator
 
def print_hello():
    print("Hello world xhz!")
    return 'Hello world xhz!'
 
hello_operator = PythonOperator(
    task_id='xhz_hello_task',
    python_callable=print_hello,
    dag=dag)
 
#-------------------------------------------------------------------------------
# dependencies
#    t1
#   / 
#  t2  t3
#

sleep_operator.set_upstream(date_operator) 
hello_operator.set_upstream(date_operator)

执行

以上代码写好后,通过python命令来提交和执行任务。

python hello_world.py
总结

本文介绍了airflow的安装和基本的使用。以后的文章会针对任务调度比较关注的问题对其进行介绍。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5491109.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存