airflow
airflow是什么
airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。airflow 将workflow编排为tasks组成的DAGs,调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统。
airflow 核心概念
1. DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序。
2. Operators:可以简单理解为一个class,描述了DAG中一个具体的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令…同时,用户可以自定义Operator,这给用户提供了极大的便利性。
3. Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
4. Task Instance:task的一次运行。task instance 有自己的状态,包括“running”, “success”, “failed”, “skipped”, “up for retry”等。
5. Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 TaskA >> TaskB,表明TaskB依赖于TaskA。
通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 workflow了。
airflow + celery
Airflow consist of several components:
Workers - Execute the assigned tasks Scheduler - Responsible for adding the necessary tasks to the queue Web server - HTTP Server provides access to DAG/task status information Database - Contains information about the status of tasks, DAGs, Variables, connections, etc.
Celery - Queue mechanism
Please note that the queue at Celery consists of two components:
Broker - Stores commands for execution Result backend - Stores status of completed commands
The components communicate with each other in many places
[1] Web server –> Workers - Fetches task execution logs [2] Web server –> DAG files - Reveal the DAG structure [3] Web server –> Database - Fetch the status of the tasks [4] Workers –> DAG files - Reveal the DAG structure and execute the tasks [5] Workers –> Database - Gets and stores information about connection configuration, variables and XCOM. [6] Workers –> Celery’s result backend - Saves the status of tasks [7] Workers –> Celery’s broker - Stores commands for execution [8] Scheduler –> Database - Store a DAG run and related tasks [9] Scheduler –> DAG files - Reveal the DAG structure and execute the tasks [10] Scheduler –> Celery’s result backend - Gets information about the status of completed tasks [11] Scheduler –> Celery’s broker - Put the commands to be executed
airflow 命令
- 数据库操作
- initdb 初始化db
- resetdb 重置数据库
- upgradedb 更新数据库
- checkdb 检查数据库
- shell 命令行操作数据库(python)
- 用户权限操作
- delete_user 删除用户
- create_user 创建用户
- 核心操作
- webserver 运行airflow的UI
例如:airflow webserver -p 55555 -D - scheduler airflow的调度
例如: airflow scheduler –daemon - worker Celery的node节点
例如: airflow worker –daemon - flower Celery的UI
例如: airflow flower -hn 192.168.0.156 -p 55556 -D
更多操作参考帮助文档
- webserver 运行airflow的UI
- dag控制
- list_dags 列出所有的dag
- show_dag 展示一个dag的依赖
- trigger_dag 终端运行dag
- clear 清除数据
- list_tasks 列出某dag的所有task
- run 运行某个dag下的某个任务
- backfill 会查开始时间节点到现在定时任务遗漏未执行任务,执行
- list_dag_runs 根据给与的dag_id展示正在跑的任务
- 测试
- test