用于检查hive表的分区是否存在,在某些场景下可以使用该sensor来替代ExternalTaskSensor,且使用起来更加便捷。
场景描述在数仓中这么两种表:
表1 :daily_table,该表是按天分区的表,一天跑一次。
表2 :hour_table,该表是按照小时分区,每个小时跑一次。
其中daily_table 依赖于 hour_table的执行,对于这种执行周期不同的任务,如果通过airflow的ExternalTaskSensor来声明它们之间的依赖会非常麻烦,为此可以使用HivePartitionSensor来解决
简版:
# 用于检查小时级任务,每天23点的分区 check_hour_table= HivePartitionSensor( task_id='check_task', metastore_conn_id='hive-conn', # hive的hive_metastore连接,可点击ariflow web界面的Connection进行配置 table='库名.hour_table', # 需要检查的hive表名。注意:需要加上数据库名。 mode='reschedule', # reschedule: 该模式在休眠期间不会占用slot,只有在执行时才会占用 poke_interval=300, # 两次检查的间隔时间,单位秒。使用reschedule模式时,建议该值不小于60。 partition='year=2021 and month=12 and day=31 and hour=23', # 需要检测的分区,分区格式需要实际情况 # timeout=600, # 超时时间,单位秒。可根据情况选择是否使用。 # soft_fail=false, # 如果设置为true,则失败时将任务标记为跳过。默认false ) # 天级任务 daily_table = DummyOperator( task_id='server_db_member_wallet_consumable_tx_merge', ) check_hour_table >> daily_table
详版
from airflow import DAG from datetime import datetime, timedelta from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor # 该方法用于将时间格式化为hive表的分区格式。其中interval=9,是用来转换时区的。logical_date默认是utc时区 def get_hour_partition(logical_date, hour, interval=9): new_date = (logical_date + timedelta(hours=interval)) y = str(new_date)[0:4] m = str(new_date)[5:7] d = str(new_date)[8:10] h = str(hour) partition = 'year=%s and month=%s and day=%s and hour=%s' % (y, m, d, h) return partition # [START instantiate_dag] with DAG( dag_id='spark_sql', schedule_interval=None, user_defined_macros={ "get_hour_partition": get_hour_partition, }, # 定义get_hour_partition方法无法直接被airflow的sensor或operator使用,需要将其注册为'宏变量',(自定义宏变量) ) as dag: # 用于检查小时级任务,每天23点的分区 check_hour_table= HivePartitionSensor( task_id='check_task', metastore_conn_id='hive-conn', table='库名.hour_table', mode='reschedule', poke_interval=300, partition='{{ get_hour_partition(logical_date,23) }}', # 调用自定义的方法格式化分区 ) # 天级任务 daily_table = DummyOperator( task_id='server_db_member_wallet_consumable_tx_merge', ) check_hour_table >> daily_table注意
- 需要装的python module:apache-airflow-providers-apache-hive
- 导包:from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
HivePartitionSensor
https://airflow.apache.org/docs/apache-airflow-providers-apache-hive/stable/_api/airflow/providers/apache/hive/sensors/hive_partition/index.html
baseSensorOperator
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/base/index.html#module-airflow.sensors.base
说明: baseSensorOperator中的参数在所有Sensor中都通用,包括HivePartitionSensor
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)