class Blueprint(object):
"""Blueprint containing bootsteps that can be applied to objects.
Arguments:
steps Sequence[Union[str, Step]]: List of steps.
name (str): Set explicit name for this blueprint.
on_start (Callable): Optional callback applied after blueprint start.
on_close (Callable): Optional callback applied before blueprint close.
on_stopped (Callable): Optional callback applied after
blueprint stopped.
"""
GraphFormatter = StepFormatter
name = None
state = None
started = 0
default_steps = set()
state_to_name = {
0: 'initializing',
RUN: 'running',
CLOSE: 'closing',
TERMINATE: 'terminating',
}
def __init__(self, steps=None, name=None,
on_start=None, on_close=None, on_stopped=None):
self.name = name or self.name or qualname(type(self))
self.types = set(steps or []) | set(self.default_steps)
self.on_start = on_start
self.on_close = on_close
self.on_stopped = on_stopped
self.shutdown_complete = Event()
self.steps = {}
def start(self, parent): # 重点关注!
self.state = RUN
if self.on_start:
self.on_start()
for i, step in enumerate(s for s in parent.steps if s is not None):
self._debug('Starting %s', step.alias)
self.started = i + 1
step.start(parent)
logger.debug('^-- substep ok')
for proc in self._pool: # 重点
# create initial mappings, these will be updated
# as processes are recycled, or found lost elsewhere.
self._fileno_to_outq[proc.outqR_fd] = proc
self._fileno_to_synq[proc.synqW_fd] = proc
# 省略
# ... 省略
看到这里,可能有的小伙伴就懵了,说好的 fork 呢?说好的 进程间通讯呢?
别急,其实 fork 和进程间通讯都藏在上面那一坨代码里了
processes = self.cpu_count() if processes is None else processes 这个 processes 的值,就是需要 fork 的子进程数量,默认是 cpu 核数,如果在命令行制定了 -c 参数,则是 -c 参数的值,在本例子中,为 2。
self.create_process_queues(): None for _ in range(processes) 其实就是创建出来了一堆读和写的管道,具体逻辑在 billiard/connection.py 文件中,因为逻辑较复杂,所以本文就省略了。根据流向的不同和主进程与子进程的不同,之后会分别关闭对应的的一端的管道,比如父进程把写关闭,子进程就把读关闭。并会用抽象的数据结构进行封装以便于管理。这个数据结构的实例用来为主进程和即将 fork 的子进程提供双向的数据传输。同样的,会根据子进程的数量创建出多个管道实例来。其中有个比较奇怪的一点就是,我在父进程关闭了一端的管道,fork 了之后,结果在子进程还是可以用这一端。这个也许是 fork 的子进程不继承父进程的管道关闭状态?