微前端架构如何改变企业的开发模式与效率提升
905
2022-10-30
Python Luigi 的简单使用示例
Luigi是用于工作流管理的Python工具。它是在Spotify开发的,旨在帮助构建批处理作业的复杂数据管道。
1. 安装 Luigi
pip install luigi
2. 简单例子
有两个核心概念可用来了解如何将Luigi应用于我们自己的数据管道:任务和目标。任务是工作的一个单元,通过扩展类luigi.Task并覆盖一些基本方法来设计。任务的输出是目标,可以是本地文件系统上的文件,Amazon S3上的文件,数据库中的某些数据等。
依赖关系是根据输入和输出定义的,即如果TaskB依赖TaskA,则意味着TaskA的输出将是TaskB的输入。
# Filename: run_luigi.pyimport luigi class PrintNumbers(luigi.Task): def requires(self): return [] def output(self): return luigi.LocalTarget("numbers_up_to_10.txt") def run(self): with self.output().open('w') as f: for i in range(1, 11): f.write("{}\n".format(i)) class SquaredNumbers(luigi.Task): def requires(self): return [PrintNumbers()] def output(self): return luigi.LocalTarget("squares.txt") def run(self): with self.input()[0].open() as fin, self.output().open('w') as fout: for line in fin: n = int(line.strip()) out = n * n fout.write("{}:{}\n".format(n, out)) # if __name__ == '__main__':# luigi.run() luigi.run() # 没有这一行无法运行
此代码展示了两个任务:PrintNumbers,它将1到10的数字写入一个名为Numbers_up_to_10.txt的文件(每行一个数字),以及SquaredNumbers,该文件读取该文件并将数字-平方对对的列表输出到squares.txt,每行也一对。
在命令运行:
python run_luigi.py SquaredNumbers --local-scheduler
Luigi将负责检查任务之间的依赖关系,请注意不存在SquaredNumbers的输入,因此它将首先运行PrintNumbers任务,然后继续执行。
我们传递给Luigi的第一个参数是我们要运行的管道中最后一个任务的名称。第二个参数只是告诉Luigi使用本地调度程序(稍后会详细介绍)。
输出:
DEBUG: Checking if SquaredNumbers() is completeINFO: Informed scheduler that task SquaredNumbers__99914b932b has status DONEINFO: Done scheduling tasksINFO: Running Worker with 1 processesDEBUG: Asking scheduler for work...DEBUG: DoneDEBUG: There are no more tasks to run at this timeINFO: Worker Worker(salt=899583362, workers=1, host=C02D70PKMD6R, username=jiangsu, pid=84021) was stopped. Shutting down Keep-Alive threadINFO: ===== Luigi Execution Summary =====Scheduled 1 tasks of which:* 1 complete ones were encountered: - 1 SquaredNumbers()Did not run any tasksThis progress looks :) because there were no failed tasks or missing dependencies===== Luigi Execution Summary =====
产生两个文件
numbers_up_to_10.txtsquares.txt
也可以使用luigi命令(笔者没有跑通):
luigi -m run_luigi.py SquaredNumbers --local-scheduler
3. 任务剖析
要创建Luigi任务,我们只需要创建一个父类为luigi.Task的类,并覆盖一些方法。特别是:
require()应该返回给定任务的依赖关系列表-换句话说,任务列表output()应该返回任务的目标(例如LocalTarget,S3Target等)run()应该包含要执行的逻辑
Luigi将检查require()和output()的返回值,并相应地构建依赖关系图。
4. 传递参数
硬编码的文件名和配置值通常是一种反模式。一旦了解了任务的结构和动态性,就应该研究如何对所有配置方面进行参数设置,以便可以使用不同的参数动态调用同一脚本。
luigi.Parameter()类是您研究的地方。每个Luigi任务可以具有许多参数。例如,假设我们要修改前面的示例以支持自定义数字。由于我们与range()函数一起使用的参数是整数,因此我们可以使用luigi.IntParameter而不是默认参数类。修改后的任务如下所示:
# run with a custom --n# python run_luigi.py SquaredNumbers --local-scheduler --n 20import luigiclass PrintNumbers(luigi.Task): n = luigi.IntParameter(default=10) def requires(self): return [] def output(self): return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n)) def run(self): with self.output().open('w') as f: for i in range(1, self.n+1): f.write("{}\n".format(i))class SquaredNumbers(luigi.Task): n = luigi.IntParameter(default=10) def requires(self): return [PrintNumbers(n=self.n)] def output(self): return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n)) def run(self): with self.input()[0].open() as fin, self.output().open('w') as fout: for line in fin: n = int(line.strip()) out = n * n fout.write("{}:{}\n".format(n, out))if __name__ == '__main__': luigi.run()
最多调用20个SquaredNumbers任务:
命名行运行:
python run_luigi.py SquaredNumbers --local-scheduler --n 20
参数也可以具有默认值,例如
n = luigi.IntParameter(default=10)
因此,如果不指定–n参数,它将默认为10。
5. 本地与全局调度程序
相见参考[1]、[2]
参考:
Building Data Pipelines with Python and Luigiluigid的启动与简单使用
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~