Default collector for the ContextDiagram.
Collection of
ELKInputData on
diagrams that involve ports.
collector
collector(diagram: ContextDiagram) -> cabc.Iterator[m.ModelElement]
Collect context data from exchanges of centric box.
Source code in src/capellambse_context_diagrams/collectors/default.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 | def collector(
diagram: context.ContextDiagram,
) -> cabc.Iterator[m.ModelElement]:
"""Collect context data from exchanges of centric box."""
visited_comps: set[str] = set()
visited_ports: set[str] = set()
visited_exchanges: set[str] = set()
port_to_exchanges = get_port_to_exchange_mapping(diagram)
stack: list[tuple[str, m.ModelElement]] = []
stack.append(("comp", diagram.target))
while stack:
kind, current = stack.pop()
if kind == "comp":
if current.uuid in visited_comps:
continue
visited_comps.add(current.uuid)
inc, out = _generic.port_collector(current, diagram.type)
for port in itertools.chain(inc.values(), out.values()):
if (
port.uuid not in visited_ports
and diagram.target.uuid
in set(_generic.get_all_owners(port))
):
visited_ports.add(port.uuid)
stack.append(("port", port))
child_attr = get_child_attribute_name(current)
for child in getattr(current, child_attr, []):
if child.uuid not in visited_comps:
stack.append(("comp", child))
elif kind == "port":
for ex in port_context_collector(current, port_to_exchanges):
if ex.uuid in visited_exchanges:
continue
assert ex.source is not None
assert ex.target is not None
source_visited = ex.source.uuid in visited_ports
target_visited = ex.target.uuid in visited_ports
if source_visited and not target_visited:
visited_exchanges.add(ex.uuid)
yield ex
visited_ports.add(ex.target.uuid)
stack.append(("port", ex.target))
if ex.target.owner.uuid not in visited_comps:
stack.append(("comp", ex.target.owner))
elif target_visited and not source_visited:
visited_exchanges.add(ex.uuid)
yield ex
visited_ports.add(ex.source.uuid)
stack.append(("port", ex.source))
if ex.source.owner.uuid not in visited_comps:
stack.append(("comp", ex.source.owner))
elif source_visited and target_visited:
visited_exchanges.add(ex.uuid)
yield ex
|
functional_chain_collector
functional_chain_collector(
diagram: ContextDiagram,
) -> cabc.Iterator[m.ModelElement]
Collect context data for a FunctionalChain.
Source code in src/capellambse_context_diagrams/collectors/default.py
155
156
157
158
159
160
161
162 | def functional_chain_collector(
diagram: context.ContextDiagram,
) -> cabc.Iterator[m.ModelElement]:
"""Collect context data for a FunctionalChain."""
if not isinstance(diagram.target, fa.FunctionalChain):
return
yield from diagram.target.involved
|
get_port_to_exchange_mapping
get_port_to_exchange_mapping(
diagram: ContextDiagram,
) -> dict[str, list[fa.FunctionalExchange | fa.ComponentExchange]]
Get a mapping of port UUIDs to their exchanges.
Source code in src/capellambse_context_diagrams/collectors/default.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 | def get_port_to_exchange_mapping(
diagram: context.ContextDiagram,
) -> dict[str, list[fa.FunctionalExchange | fa.ComponentExchange]]:
"""Get a mapping of port UUIDs to their exchanges."""
all_exchanges = diagram.target._model.search(
"ComponentExchange", "FunctionalExchange", "PhysicalLink"
)
port_to_exchanges: dict[
str, list[fa.FunctionalExchange | fa.ComponentExchange]
] = collections.defaultdict(list)
for exchange in all_exchanges:
if exchange.source:
port_uuid = exchange.source.uuid
port_to_exchanges[port_uuid].append(exchange)
if exchange.target:
port_uuid = exchange.target.uuid
port_to_exchanges[port_uuid].append(exchange)
return port_to_exchanges
|
physical_port_context_collector
physical_port_context_collector(
diagram: ContextDiagram,
) -> cabc.Iterator[fa.FunctionalExchange | fa.ComponentExchange]
Collect context data from a physical port.
Source code in src/capellambse_context_diagrams/collectors/default.py
165
166
167
168
169
170
171
172
173 | def physical_port_context_collector(
diagram: context.ContextDiagram,
) -> cabc.Iterator[fa.FunctionalExchange | fa.ComponentExchange]:
"""Collect context data from a physical port."""
if not isinstance(diagram.target, cs.PhysicalPort):
return
port_to_exchanges = get_port_to_exchange_mapping(diagram)
yield from port_context_collector(diagram.target, port_to_exchanges)
|
port_context_collector
port_context_collector(
obj: ModelElement,
port_to_exchanges: dict[str, list[FunctionalExchange | ComponentExchange]],
) -> cabc.Iterator[fa.FunctionalExchange | fa.ComponentExchange]
Collect context data from a physical port.
Source code in src/capellambse_context_diagrams/collectors/default.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 | def port_context_collector(
obj: m.ModelElement,
port_to_exchanges: dict[
str, list[fa.FunctionalExchange | fa.ComponentExchange]
],
) -> cabc.Iterator[fa.FunctionalExchange | fa.ComponentExchange]:
"""Collect context data from a physical port."""
ports = set()
links = set()
def _collect(
target: m.ModelElement,
) -> cabc.Iterator[fa.FunctionalExchange | fa.ComponentExchange]:
if target.uuid in ports:
return
ports.add(target.uuid)
for link in port_to_exchanges.get(target.uuid, []):
if link.uuid in links:
continue
links.add(link.uuid)
yield link
assert link.source is not None
yield from _collect(link.source)
assert link.target is not None
yield from _collect(link.target)
yield from _collect(obj)
|