momo's Blog.

【未完成】蓝鲸标准运维应用pipeline源码分析

字数统计: 3.8k阅读时长: 20 min
2020/11/19 Share

pipeline

pipeline 是标准运维 v3 内部使用的任务调度引擎,其主要职责是解析,执行,管理由用户创建的流程任务,并提供了如暂停,撤销,跳过和重试等灵活的控制能力和并行、子流程等进阶特性,并可通过水平扩展来进一步提升任务的并发处理能力。

插件注册

一切都需要从git上源码文档讲起.

下面是官方的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# coding=utf-8
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings")
import django
django.setup()

from pipeline import builder
from pipeline.builder import EmptyStartEvent, ServiceActivity, EmptyEndEvent
from pipeline.parser import PipelineParser
from pipeline.service import task_service
# 使用 builder 构造出流程描述结构
start = EmptyStartEvent()
act = ServiceActivity(component_code='example_component')
end = EmptyEndEvent()

start.extend(act).extend(end)

tree = builder.build_tree(start)
# 根据流程描述结构创建流程对象
parser = PipelineParser(pipeline_tree=tree)
pipeline = parser.parse()

# 执行流程对象
task_service.run_pipeline(pipeline)

结果执行以后,报错.

1
2
3
4
5
6
7
8
9
10
11
12
Traceback (most recent call last):
File "/data/bkops/tpip.py", line 21, in <module>
pipeline = parser.parse()
File "/data/bkops/pipeline/parser/pipeline_parser.py", line 87, in parse
return self._parse(root_pipeline_data, root_pipeline_context)
File "/data/bkops/pipeline/parser/pipeline_parser.py", line 139, in _parse
component = ComponentLibrary.get_component(act[PE.component][PE.code], act[PE.component][PE.inputs])
File "/data/bkops/pipeline/component_framework/library.py", line 39, in get_component
raise ComponentNotExistException('component %s does not exist.' % component_code)
pipeline.exceptions.ComponentNotExistException: component example_component does not exist.

Process finished with exit code 1

好吧,出师未捷身先死,正好看看为什么会出现这样的错误。

定位报错

我们可以看到,报错是这一行出现的

1
2
File "/data/bkops/pipeline/component_framework/library.py", line 39, in get_component
raise ComponentNotExistException('component %s does not exist.' % component_code)

ComponentLibrary这个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class ComponentLibrary(object):
components = {}

def __new__(cls, *args, **kwargs):
component_code = kwargs.get('component_code', None)
if args:
component_code = args[0]
if not component_code:
raise ValueError('please pass a component_code in args or kwargs: '
'ComponentLibrary(\'code\') or ComponentLibrary(component_code=\'code\')')
if component_code not in cls.components:
raise ComponentNotExistException('component %s does not exist.' % component_code)
return cls.components[component_code]

@classmethod
def get_component_class(cls, component_code):
return cls.components.get(component_code)

@classmethod
def get_component(cls, component_code, data_dict):
component_cls = cls.get_component_class(component_code)
if component_cls is None:
raise ComponentNotExistException('component %s does not exist.' % component_code)
return component_cls(data_dict)

通过类方法,来调用类变量,如果没有找到,就返回错误.

那么也就是说,我们写的插件,并没有注册到这个变量里面,那么他是如何注册的呢?

我尝试直接导入这个类,发现导入以后变量已经被注册了

1
2
3
4
5
from pipeline.component_framework.library import ComponentLibrary
print ComponentLibrary.components


{'cc_update_set': <class 'pipeline_plugins.components.collections.sites.open.cc.CCUpdateSetComponent'>, 'cmdb_transfer_host_resource': <class 'pipeline_plugins.components.collections.sites.open.cc.CmdbTransferHostResourceModuleComponent'>, 'bk_http_request': <class 'pipeline_plugins.components.collections.common.HttpComponent'>, 'job_execute_task': <class 'pipeline_plugins.components.collections.sites.open.job.JobExecuteTaskComponent'>, 'cc_replace_fault_machine': <class 'pipeline_plugins.components.collections.sites.open.cc.CCReplaceFaultMachineComponent'>, 'cc_update_module': <class 'pipeline_plugins.components.collections.sites.open.cc.CCUpdateModuleComponent'>, 'cc_update_set_service_status': <class 'pipeline_plugins.components.collections.sites.open.cc.CCUpdateSetServiceStatusComponent'>, 'cmdb_transfer_fault_host': <class 'pipeline_plugins.components.collections.sites.open.cc.CmdbTransferFaultHostComponent'>, 'bk_notify': <class 'pipeline_plugins.components.collections.sites.open.bk.NotifyComponent'>, 'pause_node': <class 'pipeline_plugins.components.collections.controller.PauseComponent'>, 'job_push_local_files': <class 'pipeline_plugins.components.collections.sites.open.job.JobPushLocalFilesComponent'>, 'job_fast_push_file': <class 'pipeline_plugins.components.collections.sites.open.job.JobFastPushFileComponent'>, 'cc_batch_delete_set': <class 'pipeline_plugins.components.collections.sites.open.cc.CCBatchDeleteSetComponent'>, 'nodeman_create_task': <class 'pipeline_plugins.components.collections.sites.open.nodeman.NodemanCreateTaskComponent'>, 'job_fast_execute_script': <class 'pipeline_plugins.components.collections.sites.open.job.JobFastExecuteScriptComponent'>, 'job_cron_task': <class 'pipeline_plugins.components.collections.sites.open.job.JobCronTaskComponent'>, 'cc_update_host': <class 'pipeline_plugins.components.collections.sites.open.cc.CCUpdateHostComponent'>, 'sleep_timer': <class 'pipeline_plugins.components.collections.controller.SleepTimerComponent'>, 'cc_transfer_to_idle': <class 'pipeline_plugins.components.collections.sites.open.cc.CCTransferHostToIdleComponent'>, 'cc_empty_set_hosts': <class 'pipeline_plugins.components.collections.sites.open.cc.CCEmptySetHostsComponent'>, 'cc_create_set': <class 'pipeline_plugins.components.collections.sites.open.cc.CCCreateSetComponent'>, 'cc_transfer_host_module': <class 'pipeline_plugins.components.collections.sites.open.cc.CCTransferHostModuleComponent'>}

而Django会在setup()的时候去做一些注册初始化的工作.通过重写AppConfigready方法可以实现Django加载后的一些操作

pipeline/component_framework/apps.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# -*- coding: utf-8 -*-
import logging

from django.apps import AppConfig
from django.db.utils import ProgrammingError, OperationalError

from pipeline.conf import settings
from pipeline.utils.register import autodiscover_collections

logger = logging.getLogger('root')


class ComponentFrameworkConfig(AppConfig):
name = 'pipeline.component_framework'
verbose_name = 'PipelineComponentFramework'

def ready(self):
"""
@summary: 注册公共部分和当前RUN_VER下的标准插件到数据库
@return:
"""
for path in settings.COMPONENT_AUTO_DISCOVER_PATH:
autodiscover_collections(path)

from pipeline.component_framework.models import ComponentModel
from pipeline.component_framework.library import ComponentLibrary
try:
ComponentModel.objects.exclude(code__in=ComponentLibrary.components.keys()).update(status=False)
except (ProgrammingError, OperationalError) as e:
# first migrate
logger.exception(e)

其中,COMPONENT_AUTO_DISCOVER_PATH的类型为列表,默认配置的值为:

1
2
3
COMPONENT_AUTO_DISCOVER_PATH = [
'components.collections',
]

那么这个路径就是模块自动发现的默认路径了,后面交付给了autodiscover_collections(path) 让我们看一下这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import pkgutil
import sys

from importlib import import_module
from django.utils._os import npath, upath

def autodiscover_collections(path):
"""
Auto-discover INSTALLED_APPS modules and fail silently when
not present. This forces an import on them to register any admin bits they
may want.
"""
from django.apps import apps

for app_config in apps.get_app_configs():
# Attempt to import the app's module.
try:
_module = import_module('%s.%s' %
(app_config.name, path))
autodiscover_items(_module)
except ImportError as e:
if not e.message == 'No module named %s' % path:
pass
  1. autodiscover_collections函数,通过django.apps,apps.get_app_configs() 方法,返回了一个列表,列表中是每个APP的配置对象.

  2. 通过app_config.name 拿到APP路径,并且拼接上默认查找的目录如:components.collections 最终变成如: import_module('django.contrib.auth.components.collections')

  3. 如果导入报错,则忽略异常

如果导入包成功,则调用autodiscover_items函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import pkgutil
import sys

from importlib import import_module
from django.utils._os import npath, upath

def autodiscover_items(module):
"""
Given a path to discover, auto register all items
"""
# Workaround for a Python 3.2 bug with pkgutil.iter_modules
"""
module为包的位置,如:/test/app
module有如下方法: '__builtins__', '__cached__', '__doc__', '__file__', '__loader__',
'__name__', '__package__', '__path__', '__spec__'


"""
module_dir = upath(module.__path__[0])
# 清除程序对象缓存
sys.path_importer_cache.pop(module_dir, None)

# 通过iter_modules方法获取包内的模块,返回filefiner, name, ispkg.
# 如果是包,则跳过.如果是模块则导入列表
modules = [name for _, name, is_pkg in
pkgutil.iter_modules([npath(module_dir)])
if not is_pkg and not name.startswith('_')]
for name in modules:
# 循环导入模块
__import__("%s.%s" % (module.__name__, name))

此时,django在初始化APP过程后,就走了一遍上述的流程. ComponentLibrary类中components的属性就已经被注册了.

那么问题又来了,他是怎么做到在导入包的时候就实例化对象并且赋值给这个字典呢?

我们随便看一个已经写好的组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017-2019 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import datetime
import re
import logging

from django.utils import translation
from django.utils.translation import ugettext_lazy as _
from django.utils import timezone

from pipeline.conf import settings
from pipeline.core.flow.activity import Service, StaticIntervalGenerator
from pipeline.core.flow.io import StringItemSchema
from pipeline.component_framework.component import Component

from gcloud.core.models import Project

__group_name__ = _(u"蓝鲸服务(BK)")

LOGGER = logging.getLogger('celery')


class PauseService(Service):
__need_schedule__ = True

def execute(self, data, parent_data):
return True

def schedule(self, data, parent_data, callback_data=None):
return True

def outputs_format(self):
return []


class PauseComponent(Component):
name = _(u'暂停')
code = 'pause_node'
bound_service = PauseService
form = settings.STATIC_URL + 'components/atoms/bk/pause.js'


class SleepTimerService(Service):
__need_schedule__ = True
interval = StaticIntervalGenerator(0)
# 匹配年月日 时分秒 正则 yyyy-MM-dd HH:mm:ss
date_regex = re.compile(r"%s %s" %
(r'^(((\d{3}[1-9]|\d{2}[1-9]\d{1}|\d{1}[1-9]\d{2}|[1-9]\d{3}))|'
r'(29/02/((\d{2})(0[48]|[2468][048]|[13579][26])|((0[48]|[2468][048]|[3579][26])00))))-'
r'((0[13578]|1[02])-((0[1-9]|[12]\d|3[01]))|'
r'((0[469]|11)-(0[1-9]|[12]\d|30))|(02)-(0[1-9]|[1]\d|2[0-8]))',
r'((0|[1])\d|2[0-3]):(0|[1-5])\d:(0|[1-5])\d$'))

seconds_regex = re.compile(r'^\d{1,8}$')

def inputs_format(self):
return [
self.InputItem(name=_(u'定时时间'),
key='bk_timing',
type='string',
schema=StringItemSchema(description=_(u'定时时间,格式为秒(s) 或 (%%Y-%%m-%%d %%H:%%M:%%S)')))]

def outputs_format(self):
return []

def execute(self, data, parent_data):
if parent_data.get_one_of_inputs('language'):
translation.activate(parent_data.get_one_of_inputs('language'))

timing = str(data.inputs.bk_timing)

# 项目时区获取
project = Project.objects.get(id=parent_data.inputs.project_id)

project_tz = timezone.pytz.timezone(project.time_zone)
data.outputs.business_tz = project_tz

if self.date_regex.match(timing):
eta = project_tz.localize(datetime.datetime.strptime(timing, "%Y-%m-%d %H:%M:%S"))
elif self.seconds_regex.match(timing):
# 如果写成+号 可以输入无限长,或考虑前端修改
eta = datetime.datetime.now(tz=project_tz) + datetime.timedelta(
seconds=int(timing))
else:
message = _(u"输入参数%s不符合【秒(s) 或 时间(%%Y-%%m-%%d %%H:%%M:%%S)】格式") % timing
data.set_outputs('ex_data', message)
return False

data.outputs.timing_time = eta

return True

def schedule(self, data, parent_data, callback_data=None):
timing_time = data.outputs.timing_time
business_tz = data.outputs.business_tz

now = datetime.datetime.now(tz=business_tz)
t_delta = timing_time - now
if t_delta.total_seconds() < 1:
self.finish_schedule()

# 这里减去 0.5s 的目的是尽可能的减去 execute 执行带来的误差
self.interval.interval = t_delta.total_seconds() - 0.5

return True


class SleepTimerComponent(Component):
name = _(u'定时')
code = 'sleep_timer'
bound_service = SleepTimerService
form = settings.STATIC_URL + 'components/atoms/bk/timer.js'

其中, 所有的Component都会继承Component这个父类,那我们看一下这个父类的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# -*- coding: utf-8 -*-
from pipeline.core.data.base import DataObject
from pipeline.core.data.converter import get_variable
from pipeline.exceptions import ComponentDataLackException
from pipeline.component_framework.base import ComponentMeta


class Component(object):
__metaclass__ = ComponentMeta

def __init__(self, data_dict):
self.data_dict = data_dict

@classmethod
def outputs_format(cls):
outputs = cls.bound_service().outputs()
outputs = map(lambda oi: oi.as_dict(), outputs)
return outputs

@classmethod
def inputs_format(cls):
inputs = cls.bound_service().inputs()
inputs = map(lambda ii: ii.as_dict(), inputs)
return inputs

@classmethod
def _get_item_schema(cls, type, key):
items = getattr(cls.bound_service(), type)()
for item in items:
if item.key == key:
return item

return None

@classmethod
def get_output_schema(cls, key):
return cls._get_item_schema(type='outputs', key=key).schema

@classmethod
def get_input_schema(cls, key):
return cls._get_item_schema(type='inputs', key=key).schema

@classmethod
def form_is_embedded(cls):
return getattr(cls, 'embedded_form', False)

def clean_execute_data(self, context):
"""
@summary: hook for subclass of Component to clean execute data with context
@param context:
@return:
"""
return self.data_dict

def data_for_execution(self, context, pipeline_data):
data_dict = self.clean_execute_data(context)
inputs = {}

for key, tag_info in data_dict.items():
if tag_info is None:
raise ComponentDataLackException('Lack of inputs: %s' % key)

inputs[key] = get_variable(key, tag_info, context, pipeline_data)

return DataObject(inputs)

def service(self):
return self.bound_service()

实现方法

上面的类,可以看到定义了一个metaclass

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# -*- coding: utf-8 -*-
import importlib
import logging

from django.db.utils import ProgrammingError

from pipeline.component_framework.library import ComponentLibrary
from pipeline.core.flow.activity import Service
from pipeline.component_framework.models import ComponentModel

logger = logging.getLogger(__name__)


class ComponentMeta(type):
def __new__(cls, name, bases, attrs):
super_new = super(ComponentMeta, cls).__new__
# Also ensure initialization is only performed for subclasses of Model
# (excluding Model class itself).
parents = [b for b in bases if isinstance(b, ComponentMeta)]
if not parents:
return super_new(cls, name, bases, attrs)

# Create the class
module_name = attrs.pop('__module__')
new_class = super_new(cls, name, bases, {'__module__': module_name})
module = importlib.import_module(new_class.__module__)

# Add all attributes to the class
attrs.setdefault('desc', '')
for obj_name, obj in attrs.items():
setattr(new_class, obj_name, obj)

# check
if not getattr(new_class, 'name', None):
raise ValueError("component %s name can't be empty" %
new_class.__name__)

if not getattr(new_class, 'code', None):
raise ValueError("component %s code can't be empty" %
new_class.__name__)

if not getattr(new_class, 'bound_service', None) or not issubclass(new_class.bound_service, Service):
raise ValueError("component %s service can't be empty and must be subclass of Service" %
new_class.__name__)

if not getattr(new_class, 'form', None):
setattr(new_class, 'form', None)

# category/group name
group_name = getattr(
module, "__group_name__",
new_class.__module__.split(".")[-1].title()
)
setattr(new_class, 'group_name', group_name)
new_name = u"%s-%s" % (group_name, new_class.name)

# category/group name
group_icon = getattr(
module, "__group_icon__",
''
)
setattr(new_class, 'group_icon', group_icon)

if not getattr(module, '__register_ignore__', False):
ComponentLibrary.components[new_class.code] = new_class
try:
ComponentModel.objects.update_or_create(
code=new_class.code,
defaults={
'name': new_name,
'status': __debug__,
}
)
except Exception as e:
if not isinstance(e, ProgrammingError):
logging.exception(e)

return new_class

我们知道,object是所有类的父类,而所有的类都是type的实例

因此,定义metaclass,重写type的new方法,可以完成在类创建的过程中一些操作

  • new方法需要三个参数
  • 类的名称, 继承关系, (类变量,方法)
1
2
3
def __new__(cls, name, bases, attrs):
# 返回父类__new__方法的引用
super_new = super(ComponentMeta, cls).__new__

因为考虑到多继承的关系, type会被 __new__ 多次,所以需要区分不会去初始化父类,保证仅初始化子类。

bases 是一个元组, 包含继承关系

1
2
3
4
# 如果bases为空数组,或者bases继承的所有类都不是ComponentMeta的实例,则直接返回
parents = [b for b in bases if isinstance(b, ComponentMeta)]
if not parents:
return super_new(cls, name, bases, attrs)

下面开始创建这个类

1
2
3
4
5
6
7
8
9
10
11
# Create the class
# 从类属性字典中pop出 __module__ 值
module_name = attrs.pop('__module__')
# 创建出新类
new_class = super_new(cls, name, bases, {'__module__': module_name})
module = importlib.import_module(new_class.__module__)
# Add all attributes to the class
# 将类属性全部赋值给新类
attrs.setdefault('desc', '')
for obj_name, obj in attrs.items():
setattr(new_class, obj_name, obj)

说实话,我不是很明白为什么手动创建类,又把原来的值重新赋值给它.
直接这样不好吗~

1
new_class = super_new(cls, name, bases, attrs)

下面就是一些检查属性的操作,目前还不清楚用途,所以先不研究.

赋值的代码在这一段

1
2
3
4
5
6
7
8
9
10
11
12
13
if not getattr(module, '__register_ignore__', False):
ComponentLibrary.components[new_class.code] = new_class
try:
ComponentModel.objects.update_or_create(
code=new_class.code,
defaults={
'name': new_name,
'status': __debug__,
}
)
except Exception as e:
if not isinstance(e, ProgrammingError):
logging.exception(e)

这里可以明显的看到赋值了,那我们回到之前的报错,为什么会没有赋值?

要想赋值,则必须模块中 __register_ignore__ 属性等于 False

那我们看看源码中的属性值是什么?

1
__register_ignore__ = not settings.ENABLE_EXAMPLE_COMPONENTS

在看一下config的配置

1
ENABLE_EXAMPLE_COMPONENTS = False

默认配置是False

我们改回来!!

到此,component自动发现的过程就讲完了.

简单的流程创建

Element基类

标准运维中的流程事件,都会继承这个基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# -*- coding: utf-8 -*-

from pipeline.utils.uniqid import uniqid
from pipeline.core.constants import PE

__all__ = [
'Element',
'PE'
]


class Element(object):
def __init__(self, id=None, name=None, outgoing=None):
self.id = id or uniqid()
self.name = name
self.outgoing = outgoing or []

def extend(self, element):
"""
build a connection from self to element and return element
:param element: target
:rtype: Element
"""
self.outgoing.append(element)
return element

def connect(self, *args):
"""
build connections from self to elements in args and return self
:param args: target elements
:rtype: Element
"""
for e in args:
self.outgoing.append(e)
return self

def converge(self, element):
"""
converge all connection those diverge from self to element and return element
:param element: target
:rtype: Element
"""
for e in self.outgoing:
e.tail().connect(element)
return element

def to(self, element):
return element

def tail(self):
"""
get tail element for self
:rtype: Element
"""
is_tail = len(self.outgoing) == 0
e = self

while not is_tail:
e = e.outgoing[0]
is_tail = len(e.outgoing) == 0

return e

def type(self):
raise NotImplementedError()

def __eq__(self, other):
return self.id == other.id

def __repr__(self):
return u"<{cls} {name}:{id}>".format(cls=type(self).__name__,
name=self.name,
id=self.id)

返回唯一UUID的函数

1
2
3
4
5
6
7
8
import uuid


def uniqid():
return uuid.uuid3(
uuid.uuid1(),
uuid.uuid4().hex
).hex

PE对象

这个对象设置了一些静态数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class PipelineElement(object):
ServiceActivity = 'ServiceActivity'
LoopServiceActivity = 'LoopServiceActivity'
SubProcess = 'SubProcess'
ExclusiveGateway = 'ExclusiveGateway'
ParallelGateway = 'ParallelGateway'
ConditionalParallelGateway = 'ConditionalParallelGateway'
ConvergeGateway = 'ConvergeGateway'
EmptyStartEvent = 'EmptyStartEvent'
EmptyEndEvent = 'EmptyEndEvent'

TaskNodes = {ServiceActivity, LoopServiceActivity, SubProcess}
BranchGateways = {ExclusiveGateway, ParallelGateway, ConditionalParallelGateway}
Gateways = {ExclusiveGateway, ParallelGateway, ConditionalParallelGateway, ConvergeGateway}

pipeline = 'pipeline'
id = 'id'
type = 'type'
start_event = 'start_event'
end_event = 'end_event'
activities = 'activities'
flows = 'flows'
gateways = 'gateways'
constants = 'constants'
conditions = 'conditions'
incoming = 'incoming'
outgoing = 'outgoing'
source = 'source'
target = 'target'
data = 'data'
component = 'component'
evaluate = 'evaluate'
name = 'name'
stage_name = 'stage_name'
failure_handler = 'failure_handler'
inputs = 'inputs'
outputs = 'outputs'
source_act = 'source_act'
source_key = 'source_key'
code = 'code'
error_ignorable = 'error_ignorable'
skippable = 'skippable'
# 兼容3.3.X不规范的命名
skippable_old = 'isSkipped'
retryable = 'retryable'
# 兼容3.3.X不规范的命名
retryable_old = 'can_retry'
timeout = 'timeout'
loop_times = 'loop_times'
converge_gateway_id = 'converge_gateway_id'
is_param = 'is_param'
value = 'value'
params = 'params'
is_default = 'is_default'
optional = 'optional'
template_id = 'template_id'
plain = 'plain'
splice = 'splice'
lazy = 'lazy'


PE = PipelineElement()

grow 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def __grow(tree, elem):
if elem.type() in __start_elem:
outgoing = uniqid()
tree[PE.start_event] = {
PE.incoming: '',
PE.outgoing: outgoing,
PE.type: elem.type(),
PE.id: elem.id,
PE.name: elem.name
}

next_elem = elem.outgoing[0]
__grow_flow(tree, outgoing, elem, next_elem)

elif elem.type() in __end_elem or isinstance(elem, ExecutableEndEvent):
tree[PE.end_event] = {
PE.incoming: tree[__incoming][elem.id],
PE.outgoing: '',
PE.type: elem.type(),
PE.id: elem.id,
PE.name: elem.name
}

elif elem.type() == PE.ServiceActivity:
outgoing = uniqid()

tree[PE.activities][elem.id] = {
PE.incoming: tree[__incoming][elem.id],
PE.outgoing: outgoing,
PE.type: elem.type(),
PE.id: elem.id,
PE.name: elem.name,
PE.error_ignorable: elem.error_ignorable,
PE.timeout: elem.timeout,
PE.skippable: elem.skippable,
PE.retryable: elem.retryable,
PE.component: elem.component_dict(),
PE.optional: False,
PE.failure_handler: elem.failure_handler
}

next_elem = elem.outgoing[0]
__grow_flow(tree, outgoing, elem, next_elem)

elif elem.type() == PE.SubProcess:
outgoing = uniqid()

subprocess_param = elem.params.to_dict() if isinstance(elem.params, Params) else elem.params

subprocess = {
PE.id: elem.id,
PE.incoming: tree[__incoming][elem.id],
PE.name: elem.name,
PE.outgoing: outgoing,
PE.type: elem.type(),
PE.params: subprocess_param
}

if elem.template_id:
subprocess[PE.template_id] = elem.template_id
else:
subprocess[PE.pipeline] = build_tree(
start_elem=elem.start,
id=elem.id,
data=elem.data,
replace_id=elem.replace_id
)

tree[PE.activities][elem.id] = subprocess

next_elem = elem.outgoing[0]
__grow_flow(tree, outgoing, elem, next_elem)

elif elem.type() == PE.ParallelGateway:
outgoing = [uniqid() for _ in xrange(len(elem.outgoing))]

tree[PE.gateways][elem.id] = {
PE.id: elem.id,
PE.incoming: tree[__incoming][elem.id],
PE.outgoing: outgoing,
PE.type: elem.type(),
PE.name: elem.name
}

for i, next_elem in enumerate(elem.outgoing):
__grow_flow(tree, outgoing[i], elem, next_elem)

elif elem.type() in {PE.ExclusiveGateway, PE.ConditionalParallelGateway}:
outgoing = [uniqid() for _ in xrange(len(elem.outgoing))]

tree[PE.gateways][elem.id] = {
PE.id: elem.id,
PE.incoming: tree[__incoming][elem.id],
PE.outgoing: outgoing,
PE.type: elem.type(),
PE.name: elem.name,
PE.conditions: elem.link_conditions_with(outgoing)
}

for i, next_elem in enumerate(elem.outgoing):
__grow_flow(tree, outgoing[i], elem, next_elem)

elif elem.type() == PE.ConvergeGateway:
outgoing = uniqid()

tree[PE.gateways][elem.id] = {
PE.id: elem.id,
PE.incoming: tree[__incoming][elem.id],
PE.outgoing: outgoing,
PE.type: elem.type(),
PE.name: elem.name
}

next_elem = elem.outgoing[0]
__grow_flow(tree, outgoing, elem, next_elem)

else:
raise Exception()
CATALOG
  1. 1. pipeline
  2. 2. 插件注册
    1. 2.1. 定位报错
    2. 2.2. 实现方法
  3. 3. 简单的流程创建
    1. 3.1. Element基类
      1. 3.1.1. 返回唯一UUID的函数
      2. 3.1.2. PE对象
      3. 3.1.3. grow 函数