在开发完每一个画像标签对应的脚本后,需要将该脚本提上调度流,每天定时作业刷昨天产生的新标签。
在开发迭代的过程中,开发初期会使用crontab 命令调度开发任务定时执行,但随着调度任务规模的增加,使用kettle、Airflow 这样的工具替代crontab 做定时调度会提高集群工作效率。一方面可以帮助厘清任务之间的依赖关系,另一方面当调度出现异常时可快速定位出现问题的位置。
6.1 crontab 命令调度
画像开发初期阶段,为了数据尽快上线迭代,对于标签调度作业规范化的要求或许会放在次要位置.在这一阶段中,通过Shell脚本、Python 脚本和crontab 调度命令即可完成简单的ETL任务。
下面通过一个案例来看这种ETL的实现方式,如图6-1 所示。
通过crontab 命令每天7点定时调起workflow_work.sh 脚本。
在workflow_work.sh 脚本中配置好环境变量后,掉起Python脚本 main_workflow执行。代码示例如下:
注:
sh 中2>&1 的含义:
sh 脚本中2>&1 表示把错误信息也输出到屏幕,前面有操作把屏幕信息输出到.log,因此表示把屏幕信息,包括错误信息都输出到.log文件。
可参考网文“https://segmentfault.com/a/1190000040086046
在main_workflow 脚本中,分别提交Spark任务去执行画像标签的计算脚本。代码示例如下:
注:函数展示了下实际today,oneday,yesterday,datestr的含义
sys.argv 这里的sys.argv[0]表示代码本身文件路径。 sys.argv[]是一个从程序外部获取参数的桥梁,从外部取得的参数可以是多个,所以获得的是一个列表(list),也就是说sys. 其第一个元素是程序本身,随后才依次是外部给予的参数含义:
Sys.argv[]是用来获取命令行参数的,它是个列表.
len(sys.argv)得到参数的个数
sys.argv[0]获得的是脚本的名字
上面的逻辑就是:当sys.argv[] 参数个数为1 的时候,(只有一个参数,sys.argv[0]),这个时候,主动去获取datestr,否则,datestr 就是脚本名字后的第一个参数。可见输入要按照年月日输入:比如20220325
大数据统计从昨天开始统计。
注:
os.system() 表示将字符串化成命令在服务器上运行:https://blog.csdn.net/weixin_39682177/article/details/84985012
Airflow 工作平台:
Airflow 是Airbnb 内部发起的一个工作流平台,如下图所示。使用Python 编写实现的任务管理、调度 监控工作流平台。Airflow 的调度依赖于crontab 命令,与crontab 相比,Airflow 可以方便地查看任务的执行状况(执行是否成功、执行时间、执行依赖等),
可追踪任务历史执行情况,任务执行失败时可以收到邮件通知、查看错误日志。对于管理调度任务有很大的帮助:
conrtab 命令管理调度的方式总结来看存在一下几个方面的弊端:
1)在多任务调度执行的情况下,难以厘清任务间的依赖关系。
2)不便于查看当前执行到哪一个任务;
3)不便于查看调度流下每个任务执行的起止消耗时间,而这对于优化task作业是非常重要的。
4)不便于记录历史调度任务的执行情况,而这对于优化作业和排查错误是非常重要的。
5)执行任务失败时不便于查看执行日志,不方便定位报错的任务和接收错误告警邮件。
基础概念
在介绍Airflow 这个调度工具前先介绍几个相关的基础概念:
- DAG(Directed Acyclic Graph,有向无环图):用于描述数据流的计算过程
- Operators:描述了DAG中一个具体的task要执行的任务,如BashOperator 为执行一条bash命令,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求,PythonOperator 用于调用任意的Python 函数。
- Task:是Operator的一个实例,也就是DAG的一个节点。
- Task instance: 记录Task 的一次运行。Task instance 有自己的状态,包括"running","success","failed""skipped""up for retry"等。
- Triggher Rules:指task的触发条件。
- 在图6-3中,没一个节点可视为一个task,每个task用于执行一条任务,比如执行某个表的ETL加工。这些task调度任务按执行顺序的先后连接起来形成一个有向无环图。
Airflow 服务构成:
一个正常运行的Airflow 系统一般由以下几个服务构成:
- WebServer
Airflow 提供了一个可视化的Web界面,启动WebServer后,可以再Web界面上查看定义好的DAG并监控及改变其运行状况。也可以在Web界面中对一些变量进行配置。
2.Worker(Celery模式)
一般地,我们使用Celery Worker来执行具体作业。Worker可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker就会接收这个作业任务并开始执行。Airflow会自动在每个部署Worker的机器上同时部署一个Server logs服务,这样就可以在Web界面上方便地查看分布在不同机器上的日志了。
3.Scheduler
整个Airflow的调度由Scheduler负责发起,每隔一段时间Scheduler就会检查所有定义完成得DAG和定义在其中的作业,如果有符合运行条件的作业,Scheduler就会发起相应的作业任务以供Worker接收。
4.flower(Celery模式)
flower 提供了一个可视化界面用于监控所有Celery Worker的运行状况。
Airflow安装
Airflow 安装很简单,如图6-4 所示,在命令下输入"pip install airflow" 即可完成Airflow的下载和安装:安装完成后,输入airflow webserver 命令启动Airflow的Web端服务。
主要模块功能
通过Airflow的管理界面,可以了解其主要覆盖的功能模块。下面介绍Airflow主要覆盖的功能模块,这些模块在Airflow官网上有详细介绍:
http://airflow.incubator.apache.org.
DAG 任务列表:
首页中的DAG模块可以查看当前DAG 的任务列表,包括当前有哪些DAG调度任务、哪些任务运行成功、哪些任务运行失败、哪些任务正在运行中。
如下图所示,从图中可看到当前有3个DAG 调度任务,任务所有者(Owner)为airflow,在Task by state 可以查看当前各任务的调度状态。
DAG 调度状态图:
在Tree View模块(如下图所示)可以查看当前DAG每个task任务的调度状态,是执行成功、正在执行、执行失败还是等待执行等,便于快速定位到执行失败的任务,重新调启执行。
DAG有向无环图:
在Graph View 模块(如下图所示)可以看到当前DAG中各task任务之间的依赖关系,以及各任务的执行状态。
甘特图:
在Gantt模块(如下图所示)中可以查看DAG调度的甘特图,通过甘特图可以查看每个task调度任务的起止时间、持续时长。方便查找到调度时间长的task任务,以便后续进行优化。
DAG 执行脚本:
在code模块可以查看当前DAG任务的执行脚本,包括任务的起始调度时间、调度失败后重试机制、各task任务之间的依赖关系等。当某个task执行出现问题时可通过查看该调度脚本定位原因。
执行日志:
当某个task执行失败时,点击该task任务,在弹出的选择框中选中“view log”可查看该任务的执行日志,如图6-10所示。
工作流调度:
Airflow 的工作流设计是有向无环图(DAG),如下图所示,在编写工作流时,需要考虑如何将任务划分为多个可独立执行的任务,然后将这些任务合并为一个逻辑整体,从而实现任务调度的结果。
Airflow中DAG文件的功能类似于定时开启的Shell脚本,负责管理定时任务中每一个子任务(task)的调度依赖。画像系统中的调度流是先执行标签计算任务,跑完后校验标签数据,如果数据校验无误则发送邮件,向后继续执行人群计算任务,否则发送失败邮件,任务暂停。在人群计算完成并通过校验后将数据任务同步到业务系统中。
过程中任一环节没有通过监控校验都会暂停任务,待开发人员处理并保证数据通过校验后开启后面的调度环节。过程中详细的调度环节见6.2.8节所述内容。
看到6.2.6. 待续。