处理器
由于处理器是基于事件(event-based)的,所以可以异步处理事件,例如在某台主机完成任务后马上处理该主机的结果,不用需等待其它主机完成任务。
基于事件编写的代码更简洁,更容易理解。
来通过几个例子来看看处理器是如何工作的,先初始化一个 nornir 对象:
from nornir import InitNornir
nr = InitNornir(config_file="files/config.yaml")
编写一个处理器,它的作用是打印一些有关任务执行的信息:
[2]:
from nornir.core import Nornir
from nornir.core.inventory import Host
from nornir.core.task import Task, AggregatedResult, MultiResult, Result, Task
class PrintResult:
# 任务开始运行时执行的动作
def task_started(self, task: Task) -> None:
print(f" 任务[{task.name}] 开始执行 ".center(79, "="))
# 任务运行结束后执行的动作
def task_completed(self, task: Task, result: AggregatedResult) -> None:
print(f" 任务[{task.name}]执行结束 ".center(79, "="))
# 任务分配给单台主机运行时执行的动作
def task_instance_started(self, task: Task, host: Host) -> None:
print(f"任务[{task.name}]分配给主机[{host.name}]开始执行.\n")
# 任务分配给单台主机运行完成后执行的动作
def task_instance_completed(
self, task: Task, host: Host, result: MultiResult
) -> None:
print(f"任务[{task.name}]分配给主机[{host.name}]执行完成,执行结果:{result.result} \n")
# 子任务开始运行时执行的动作
def subtask_instance_started(self, task: Task, host: Host) -> None:
pass
# 子任务结束运行时执行的动作
def subtask_instance_completed(
self, task: Task, host: Host, result: MultiResult
) -> None:
pass
编写一个简单的任务,让自定义的处理器 PrintResult
来处理结果:
[3]:
def greeter(task: Task, greet: str) -> Result:
return Result(
host=task.host,
result=f"{greet}! My name is {task.host.name}!"
)
要使用自定义的处理器,需要用到 nornir 对象的 with_processors
方法,这个方法需要传递一个 Processer
的列表对象 Processers
,然后返回一个带有 Processers
的 nornir 对象:
[4]:
================================= 任务[Hi] 开始执行 =================================
任务[Hi]分配给主机[spine00.bj]开始执行.
任务[Hi]分配给主机[spine00.bj]执行完成,执行结果:Hi! My name is spine00.bj!
任务[Hi]分配给主机[spine01.bj]开始执行.
任务[Hi]分配给主机[spine01.gz]开始执行.
任务[Hi]分配给主机[spine01.bj]执行完成,执行结果:Hi! My name is spine01.bj!
任务[Hi]分配给主机[spine01.gz]执行完成,执行结果:Hi! My name is spine01.gz!
================================== 任务[Hi]执行结束 =================================
================================= 任务[Bye] 开始执行 ================================
任务[Bye]分配给主机[spine00.bj]开始执行.
任务[Bye]分配给主机[spine01.bj]开始执行.
任务[Bye]分配给主机[spine01.gz]执行完成,执行结果:Bye! My name is spine01.gz!
任务[Bye]分配给主机[spine00.bj]执行完成,执行结果:Bye! My name is spine00.bj!
================================= 任务[Bye]执行结束 =================================
[4]:
AggregatedResult (Bye): {'spine00.bj': MultiResult: [Result: "Bye"], 'spine01.bj': MultiResult: [Result: "Bye"], 'spine01.gz': MultiResult: [Result: "Bye"]}
打印结果是无序的,因为默认情况下 nornir 的任务是多线程异步执行的。
前面说到 with_processors
方法需要传递一个 Processers
对象,这个对象是由 Processer
组成的列表。
现在来再定义一个处理器,它的任务是将任务的信息保存在字典中。
[5]:
class SaveResultToDict:
def __init__(self, data: Dict[str, None]) -> None:
self.data = data
def task_started(self, task: Task) -> None:
self.data[task.name] = {}
self.data[task.name]["started"] = True
print(f"任务开始信息已经保存到 {self.data.keys()}!")
def task_completed(self, task: Task, result: AggregatedResult) -> None:
self.data[task.name]["completed"] = True
print(f"任务完成信息已经保存到 {self.data.keys()}!")
def task_instance_started(self, task: Task, host: Host) -> None:
self.data[task.name][host.name] = {"started": True}
print(f"主机[{host.name}]任务开始信息已经保存到 {self.data.keys()}!")
def task_instance_completed(
self, task: Task, host: Host, result: MultiResult
) -> None:
self.data[task.name][host.name] = {
"completed": True,
"result": result.result,
}
print(f"主机[{host.name}]任务完成信息已经保存到 {self.data.keys()}!")
def subtask_instance_started(self, task: Task, host: Host) -> None:
pass
def subtask_instance_completed(
self, task: Task, host: Host, result: MultiResult
) -> None:
pass
现在来再次执行任务 greeter
,这次使用两个处理器 SaveResultToDict
和 PrintResult
来对任务进行处理:
[6]:
================================= 任务[Hi] 开始执行 =================================
任务开始信息已经保存到 dict_keys(['Hi'])!
任务[Hi]分配给主机[spine00.bj]开始执行.
主机[spine00.bj]任务开始信息已经保存到 dict_keys(['Hi'])!任务[Hi]分配给主机[spine01.bj]开始执行.
任务[Hi]分配给主机[spine00.bj]执行完成,执行结果:Hi! My name is spine00.bj!
主机[spine00.bj]任务完成信息已经保存到 dict_keys(['Hi'])!
任务[Hi]分配给主机[spine01.gz]开始执行.
主机[spine01.gz]任务开始信息已经保存到 dict_keys(['Hi'])!
任务[Hi]分配给主机[spine01.gz]执行完成,执行结果:Hi! My name is spine01.gz!
主机[spine01.gz]任务完成信息已经保存到 dict_keys(['Hi'])!
主机[spine01.bj]任务开始信息已经保存到 dict_keys(['Hi'])!
任务[Hi]分配给主机[spine01.bj]执行完成,执行结果:Hi! My name is spine01.bj!
================================== 任务[Hi]执行结束 =================================
任务完成信息已经保存到 dict_keys(['Hi'])!
================================= 任务[Bye] 开始执行 ================================
任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
任务[Bye]分配给主机[spine00.bj]开始执行.
主机[spine00.bj]任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
任务[Bye]分配给主机[spine00.bj]执行完成,执行结果:Bye! My name is spine00.bj!
主机[spine00.bj]任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
任务[Bye]分配给主机[spine01.bj]开始执行.
任务[Bye]分配给主机[spine01.gz]开始执行.
主机[spine01.gz]任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
任务[Bye]分配给主机[spine01.gz]执行完成,执行结果:Bye! My name is spine01.gz!
主机[spine01.gz]任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
主机[spine01.bj]任务开始信息已经保存到 dict_keys(['Hi', 'Bye'])!
任务[Bye]分配给主机[spine01.bj]执行完成,执行结果:Bye! My name is spine01.bj!
主机[spine01.bj]任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
================================= 任务[Bye]执行结束 =================================
任务完成信息已经保存到 dict_keys(['Hi', 'Bye'])!
[6]:
AggregatedResult (Bye): {'spine00.bj': MultiResult: [Result: "Bye"], 'spine01.bj': MultiResult: [Result: "Bye"], 'spine01.gz': MultiResult: [Result: "Bye"]}
任务已经成功执行,并且两个处理器都按照预期进行工作,任务执行的最后也打印出了最后的结果: AggregatedResult
对象,事实上如果处理器里面已经对结果进行除了,这个对象也不需要再给它赋值然后再使用 print_result
打印出来了。
[7]:
import json
print(json.dumps(data, indent=4))
{
"Hi": {
"started": true,
"spine00.bj": {
"completed": true,
"result": "Hi! My name is spine00.bj!"
},
"spine01.gz": {
"completed": true,
"result": "Hi! My name is spine01.gz!"
},
"spine01.bj": {
"completed": true,
"result": "Hi! My name is spine01.bj!"
},
"completed": true
},
"Bye": {
"started": true,
"spine00.bj": {
"completed": true,
"result": "Bye! My name is spine00.bj!"
},
"spine01.gz": {
"completed": true,
"result": "Bye! My name is spine01.gz!"
},
"spine01.bj": {
"completed": true,
"result": "Bye! My name is spine01.bj!"
},
"completed": true
}
}
通过以上两个示例,可以看到 处理器(Processers) 处理器的强大功能,通过它来操作处理任务结果更加简单,也无需通过 来查看任务结果。
借助处理器还可以做哪些其他事情?
将任务执行事件发送到 slack/IRC/logging_system
让使用者可以关注到正在的执行的任务情况而无需等待所有主机的任务执行完成(尤其是当设备数量很多时)
根据业务场景尽情发挥吧!