处理器

    1. 由于处理器是基于事件(event-based)的,所以可以异步处理事件,例如在某台主机完成任务后马上处理该主机的结果,不用需等待其它主机完成任务。

    2. 基于事件编写的代码更简洁,更容易理解。

    来通过几个例子来看看处理器是如何工作的,先初始化一个 nornir 对象:

    1. from nornir import InitNornir
    2. nr = InitNornir(config_file="files/config.yaml")

    编写一个处理器,它的作用是打印一些有关任务执行的信息:

    1. [2]:
    1. from nornir.core import Nornir
    2. from nornir.core.inventory import Host
    3. from nornir.core.task import Task, AggregatedResult, MultiResult, Result, Task
    4. class PrintResult:
    5. # 任务开始运行时执行的动作
    6. def task_started(self, task: Task) -> None:
    7. print(f" 任务[{task.name}] 开始执行 ".center(79, "="))
    8. # 任务运行结束后执行的动作
    9. def task_completed(self, task: Task, result: AggregatedResult) -> None:
    10. print(f" 任务[{task.name}]执行结束 ".center(79, "="))
    11. # 任务分配给单台主机运行时执行的动作
    12. def task_instance_started(self, task: Task, host: Host) -> None:
    13. print(f"任务[{task.name}]分配给主机[{host.name}]开始执行.\n")
    14. # 任务分配给单台主机运行完成后执行的动作
    15. def task_instance_completed(
    16. self, task: Task, host: Host, result: MultiResult
    17. ) -> None:
    18. print(f"任务[{task.name}]分配给主机[{host.name}]执行完成,执行结果:{result.result} \n")
    19. # 子任务开始运行时执行的动作
    20. def subtask_instance_started(self, task: Task, host: Host) -> None:
    21. pass
    22. # 子任务结束运行时执行的动作
    23. def subtask_instance_completed(
    24. self, task: Task, host: Host, result: MultiResult
    25. ) -> None:
    26. pass

    编写一个简单的任务,让自定义的处理器 PrintResult 来处理结果:

    1. [3]:
    1. def greeter(task: Task, greet: str) -> Result:
    2. return Result(
    3. host=task.host,
    4. result=f"{greet}! My name is {task.host.name}!"
    5. )

    要使用自定义的处理器,需要用到 nornir 对象的 with_processors 方法,这个方法需要传递一个 Processer 的列表对象 Processers,然后返回一个带有 Processers 的 nornir 对象:

    1. [4]:
    1. ================================= 任务[Hi] 开始执行 =================================
    2. 任务[Hi]分配给主机[spine00.bj]开始执行.
    3. 任务[Hi]分配给主机[spine00.bj]执行完成,执行结果:Hi! My name is spine00.bj!
    4. 任务[Hi]分配给主机[spine01.bj]开始执行.
    5. 任务[Hi]分配给主机[spine01.gz]开始执行.
    6. 任务[Hi]分配给主机[spine01.bj]执行完成,执行结果:Hi! My name is spine01.bj!
    7. 任务[Hi]分配给主机[spine01.gz]执行完成,执行结果:Hi! My name is spine01.gz!
    8. ================================== 任务[Hi]执行结束 =================================
    9. ================================= 任务[Bye] 开始执行 ================================
    10. 任务[Bye]分配给主机[spine00.bj]开始执行.
    11. 任务[Bye]分配给主机[spine01.bj]开始执行.
    12. 任务[Bye]分配给主机[spine01.gz]执行完成,执行结果:Bye! My name is spine01.gz!
    13. 任务[Bye]分配给主机[spine00.bj]执行完成,执行结果:Bye! My name is spine00.bj!
    14. ================================= 任务[Bye]执行结束 =================================
    1. [4]:
    1. AggregatedResult (Bye): {'spine00.bj': MultiResult: [Result: "Bye"], 'spine01.bj': MultiResult: [Result: "Bye"], 'spine01.gz': MultiResult: [Result: "Bye"]}

    打印结果是无序的,因为默认情况下 nornir 的任务是多线程异步执行的。

    前面说到 with_processors 方法需要传递一个 Processers 对象,这个对象是由 Processer 组成的列表。

    现在来再定义一个处理器,它的任务是将任务的信息保存在字典中。

    1. [5]:
    1. class SaveResultToDict:
    2. def __init__(self, data: Dict[str, None]) -> None:
    3. self.data = data
    4. def task_started(self, task: Task) -> None:
    5. self.data[task.name] = {}
    6. self.data[task.name]["started"] = True
    7. print(f"任务开始信息已经保存到 {self.data.keys()}!")
    8. def task_completed(self, task: Task, result: AggregatedResult) -> None:
    9. self.data[task.name]["completed"] = True
    10. print(f"任务完成信息已经保存到 {self.data.keys()}!")
    11. def task_instance_started(self, task: Task, host: Host) -> None:
    12. self.data[task.name][host.name] = {"started": True}
    13. print(f"主机[{host.name}]任务开始信息已经保存到 {self.data.keys()}!")
    14. def task_instance_completed(
    15. self, task: Task, host: Host, result: MultiResult
    16. ) -> None:
    17. self.data[task.name][host.name] = {
    18. "completed": True,
    19. "result": result.result,
    20. }
    21. print(f"主机[{host.name}]任务完成信息已经保存到 {self.data.keys()}!")
    22. def subtask_instance_started(self, task: Task, host: Host) -> None:
    23. pass
    24. def subtask_instance_completed(
    25. self, task: Task, host: Host, result: MultiResult
    26. ) -> None:
    27. pass

    现在来再次执行任务 greeter,这次使用两个处理器 SaveResultToDictPrintResult 来对任务进行处理:

    1. [6]:
    1. ================================= 任务[Hi] 开始执行 =================================
    2. 任务开始信息已经保存到 dict_keys(['Hi'])!
    3. 任务[Hi]分配给主机[spine00.bj]开始执行.
    4. 主机[spine00.bj]任务开始信息已经保存到 dict_keys(['Hi'])!任务[Hi]分配给主机[spine01.bj]开始执行.
    5. 任务[Hi]分配给主机[spine00.bj]执行完成,执行结果:Hi! My name is spine00.bj!
    6. 主机[spine00.bj]任务完成信息已经保存到 dict_keys(['Hi'])!
    7. 任务[Hi]分配给主机[spine01.gz]开始执行.
    8. 主机[spine01.gz]任务开始信息已经保存到 dict_keys(['Hi'])!
    9. 任务[Hi]分配给主机[spine01.gz]执行完成,执行结果:Hi! My name is spine01.gz!
    10. 主机[spine01.gz]任务完成信息已经保存到 dict_keys(['Hi'])!
    11. 主机[spine01.bj]任务开始信息已经保存到 dict_keys(['Hi'])!
    12. 任务[Hi]分配给主机[spine01.bj]执行完成,执行结果:Hi! My name is spine01.bj!
    13. ================================== 任务[Hi]执行结束 =================================
    14. 任务完成信息已经保存到 dict_keys(['Hi'])!
    15. ================================= 任务[Bye] 开始执行 ================================
    16. 任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
    17. 任务[Bye]分配给主机[spine00.bj]开始执行.
    18. 主机[spine00.bj]任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
    19. 任务[Bye]分配给主机[spine00.bj]执行完成,执行结果:Bye! My name is spine00.bj!
    20. 主机[spine00.bj]任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
    21. 任务[Bye]分配给主机[spine01.bj]开始执行.
    22. 任务[Bye]分配给主机[spine01.gz]开始执行.
    23. 主机[spine01.gz]任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
    24. 任务[Bye]分配给主机[spine01.gz]执行完成,执行结果:Bye! My name is spine01.gz!
    25. 主机[spine01.gz]任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
    26. 主机[spine01.bj]任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
    27. 任务[Bye]分配给主机[spine01.bj]执行完成,执行结果:Bye! My name is spine01.bj!
    28. 主机[spine01.bj]任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
    29. ================================= 任务[Bye]执行结束 =================================
    30. 任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
    1. [6]:
    1. AggregatedResult (Bye): {'spine00.bj': MultiResult: [Result: "Bye"], 'spine01.bj': MultiResult: [Result: "Bye"], 'spine01.gz': MultiResult: [Result: "Bye"]}

    任务已经成功执行,并且两个处理器都按照预期进行工作,任务执行的最后也打印出了最后的结果: AggregatedResult 对象,事实上如果处理器里面已经对结果进行除了,这个对象也不需要再给它赋值然后再使用 print_result 打印出来了。

    1. [7]:
    1. import json
    2. print(json.dumps(data, indent=4))
    1. {
    2. "Hi": {
    3. "started": true,
    4. "spine00.bj": {
    5. "completed": true,
    6. "result": "Hi! My name is spine00.bj!"
    7. },
    8. "spine01.gz": {
    9. "completed": true,
    10. "result": "Hi! My name is spine01.gz!"
    11. },
    12. "spine01.bj": {
    13. "completed": true,
    14. "result": "Hi! My name is spine01.bj!"
    15. },
    16. "completed": true
    17. },
    18. "Bye": {
    19. "started": true,
    20. "spine00.bj": {
    21. "completed": true,
    22. "result": "Bye! My name is spine00.bj!"
    23. },
    24. "spine01.gz": {
    25. "completed": true,
    26. "result": "Bye! My name is spine01.gz!"
    27. },
    28. "spine01.bj": {
    29. "completed": true,
    30. "result": "Bye! My name is spine01.bj!"
    31. },
    32. "completed": true
    33. }
    34. }

    通过以上两个示例,可以看到 处理器(Processers) 处理器的强大功能,通过它来操作处理任务结果更加简单,也无需通过 来查看任务结果。

    借助处理器还可以做哪些其他事情?

    1. 将任务执行事件发送到 slack/IRC/logging_system

    2. 让使用者可以关注到正在的执行的任务情况而无需等待所有主机的任务执行完成(尤其是当设备数量很多时)

    3. 根据业务场景尽情发挥吧!