1from __future__ import annotations
2
3import json
4import logging
5from abc import ABC, abstractmethod
6from collections.abc import Container
7from dataclasses import dataclass, field
8from functools import reduce
9from threading import RLock
10from typing import TYPE_CHECKING, Final, List, Union, cast, final
11
12from ..cterm import CSubst, CTerm, cterm_build_claim, cterm_build_rule
13from ..kast import EMPTY_ATT
14from ..kast.inner import KApply
15from ..kast.manip import (
16 bool_to_ml_pred,
17 extract_lhs,
18 extract_rhs,
19 flatten_label,
20 inline_cell_maps,
21 rename_generated_vars,
22 sort_ac_collections,
23)
24from ..kast.outer import KFlatModule
25from ..prelude.kbool import andBool
26from ..utils import ensure_dir_path, not_none, single
27
28if TYPE_CHECKING:
29 from collections.abc import Callable, Iterable, Mapping, MutableMapping
30 from pathlib import Path
31 from types import TracebackType
32 from typing import Any
33
34 from pyk.kore.rpc import LogEntry
35
36 from ..kast import KAtt
37 from ..kast.inner import KInner
38 from ..kast.outer import KClaim, KDefinition, KImport, KRuleLike
39
40
41NodeIdLike = int | str
42
43_LOGGER: Final = logging.getLogger(__name__)
44
45
[docs]
46@dataclass(frozen=True)
47class NodeAttr:
48 value: str
49
50
[docs]
51class KCFGNodeAttr(NodeAttr):
52 VACUOUS = NodeAttr('vacuous')
53 STUCK = NodeAttr('stuck')
54
55
[docs]
56class KCFGStore:
57 store_path: Path
58
59 def __init__(self, store_path: Path) -> None:
60 self.store_path = store_path
61 ensure_dir_path(store_path)
62 ensure_dir_path(self.kcfg_node_dir)
63
64 @property
65 def kcfg_json_path(self) -> Path:
66 return self.store_path / 'kcfg.json'
67
68 @property
69 def kcfg_node_dir(self) -> Path:
70 return self.store_path / 'nodes'
71
[docs]
72 def kcfg_node_path(self, node_id: int) -> Path:
73 return self.kcfg_node_dir / f'{node_id}.json'
74
[docs]
75 def write_cfg_data(
76 self, dct: dict[str, Any], deleted_nodes: Iterable[int] = (), created_nodes: Iterable[int] = ()
77 ) -> None:
78 node_dict = {node_dct['id']: node_dct for node_dct in dct['nodes']}
79 vacuous_nodes = [
80 node_id for node_id in node_dict.keys() if KCFGNodeAttr.VACUOUS.value in node_dict[node_id]['attrs']
81 ]
82 stuck_nodes = [
83 node_id for node_id in node_dict.keys() if KCFGNodeAttr.STUCK.value in node_dict[node_id]['attrs']
84 ]
85 dct['vacuous'] = vacuous_nodes
86 dct['stuck'] = stuck_nodes
87 for node_id in deleted_nodes:
88 self.kcfg_node_path(node_id).unlink(missing_ok=True)
89 for node_id in created_nodes:
90 del node_dict[node_id]['attrs']
91 self.kcfg_node_path(node_id).write_text(json.dumps(node_dict[node_id]))
92 dct['nodes'] = list(node_dict.keys())
93 self.kcfg_json_path.write_text(json.dumps(dct))
94
[docs]
95 def read_cfg_data(self) -> dict[str, Any]:
96 dct = json.loads(self.kcfg_json_path.read_text())
97 nodes = [self.read_node_data(node_id) for node_id in dct.get('nodes') or []]
98 dct['nodes'] = nodes
99
100 new_nodes = []
101 for node in dct['nodes']:
102 attrs = []
103 if node['id'] in dct['vacuous']:
104 attrs.append(KCFGNodeAttr.VACUOUS.value)
105 if node['id'] in dct['stuck']:
106 attrs.append(KCFGNodeAttr.STUCK.value)
107 new_nodes.append({'id': node['id'], 'cterm': node['cterm'], 'attrs': attrs})
108
109 dct['nodes'] = new_nodes
110
111 del dct['vacuous']
112 del dct['stuck']
113
114 return dct
115
[docs]
116 def read_node_data(self, node_id: int) -> dict[str, Any]:
117 return json.loads(self.kcfg_node_path(node_id).read_text())
118
119
[docs]
120class KCFG(Container[Union['KCFG.Node', 'KCFG.Successor']]):
[docs]
121 @final
122 @dataclass(frozen=True, order=True)
123 class Node:
124 id: int
125 cterm: CTerm
126 attrs: frozenset[NodeAttr]
127
128 def __init__(self, id: int, cterm: CTerm, attrs: Iterable[NodeAttr] = ()) -> None:
129 object.__setattr__(self, 'id', id)
130 object.__setattr__(self, 'cterm', cterm)
131 object.__setattr__(self, 'attrs', frozenset(attrs))
132
[docs]
133 def to_dict(self) -> dict[str, Any]:
134 return {'id': self.id, 'cterm': self.cterm.to_dict(), 'attrs': [attr.value for attr in self.attrs]}
135
[docs]
136 @staticmethod
137 def from_dict(dct: dict[str, Any]) -> KCFG.Node:
138 return KCFG.Node(dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']])
139
[docs]
140 def add_attr(self, attr: NodeAttr) -> KCFG.Node:
141 return KCFG.Node(self.id, self.cterm, list(self.attrs) + [attr])
142
[docs]
143 def remove_attr(self, attr: NodeAttr) -> KCFG.Node:
144 if attr not in self.attrs:
145 raise ValueError(f'Node {self.id} does not have attribute {attr.value}')
146 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
147
[docs]
148 def discard_attr(self, attr: NodeAttr) -> KCFG.Node:
149 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
150
[docs]
151 def let(self, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None) -> KCFG.Node:
152 new_cterm = cterm if cterm is not None else self.cterm
153 new_attrs = attrs if attrs is not None else self.attrs
154 return KCFG.Node(self.id, new_cterm, new_attrs)
155
156 @property
157 def free_vars(self) -> frozenset[str]:
158 return frozenset(self.cterm.free_vars)
159
[docs]
160 class Successor(ABC):
161 source: KCFG.Node
162
163 def __lt__(self, other: Any) -> bool:
164 if not isinstance(other, KCFG.Successor):
165 return NotImplemented
166 return self.source < other.source
167
168 @property
169 def source_vars(self) -> frozenset[str]:
170 return frozenset(self.source.free_vars)
171
172 @property
173 @abstractmethod
174 def targets(self) -> tuple[KCFG.Node, ...]: ...
175
176 @property
177 def target_ids(self) -> list[int]:
178 return sorted(target.id for target in self.targets)
179
180 @property
181 def target_vars(self) -> frozenset[str]:
182 return frozenset(set.union(set(), *(target.free_vars for target in self.targets)))
183
[docs]
184 @abstractmethod
185 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: ...
186
[docs]
187 @abstractmethod
188 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: ...
189
[docs]
190 @abstractmethod
191 def to_dict(self) -> dict[str, Any]: ...
192
[docs]
193 @staticmethod
194 @abstractmethod
195 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: ...
196
[docs]
197 class EdgeLike(Successor):
198 source: KCFG.Node
199 target: KCFG.Node
200
201 @property
202 def targets(self) -> tuple[KCFG.Node, ...]:
203 return (self.target,)
204
[docs]
205 @final
206 @dataclass(frozen=True)
207 class Edge(EdgeLike):
208 source: KCFG.Node
209 target: KCFG.Node
210 depth: int
211 rules: tuple[str, ...]
212
[docs]
213 def to_dict(self) -> dict[str, Any]:
214 return {
215 'source': self.source.id,
216 'target': self.target.id,
217 'depth': self.depth,
218 'rules': list(self.rules),
219 }
220
[docs]
221 @staticmethod
222 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Edge:
223 return KCFG.Edge(nodes[dct['source']], nodes[dct['target']], dct['depth'], tuple(dct['rules']))
224
[docs]
225 def to_rule(self, label: str, claim: bool = False, priority: int | None = None) -> KRuleLike:
226 def is_ceil_condition(kast: KInner) -> bool:
227 return type(kast) is KApply and kast.label.name == '#Ceil'
228
229 def _simplify_config(config: KInner) -> KInner:
230 return sort_ac_collections(inline_cell_maps(config))
231
232 sentence_id = f'{label}-{self.source.id}-TO-{self.target.id}'
233 init_constraints = [c for c in self.source.cterm.constraints if not is_ceil_condition(c)]
234 init_cterm = CTerm(_simplify_config(self.source.cterm.config), init_constraints)
235 target_constraints = [c for c in self.target.cterm.constraints if not is_ceil_condition(c)]
236 target_cterm = CTerm(_simplify_config(self.target.cterm.config), target_constraints)
237 rule: KRuleLike
238 if claim:
239 rule, _ = cterm_build_claim(sentence_id, init_cterm, target_cterm)
240 else:
241 rule, _ = cterm_build_rule(sentence_id, init_cterm, target_cterm, priority=priority)
242 return rule
243
[docs]
244 def replace_source(self, node: KCFG.Node) -> KCFG.Edge:
245 assert node.id == self.source.id
246 return KCFG.Edge(node, self.target, self.depth, self.rules)
247
[docs]
248 def replace_target(self, node: KCFG.Node) -> KCFG.Edge:
249 assert node.id == self.target.id
250 return KCFG.Edge(self.source, node, self.depth, self.rules)
251
[docs]
252 @final
253 @dataclass(frozen=True)
254 class Cover(EdgeLike):
255 source: KCFG.Node
256 target: KCFG.Node
257 csubst: CSubst
258
[docs]
259 def to_dict(self) -> dict[str, Any]:
260 return {
261 'source': self.source.id,
262 'target': self.target.id,
263 'csubst': self.csubst.to_dict(),
264 }
265
[docs]
266 @staticmethod
267 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Cover:
268 return KCFG.Cover(nodes[dct['source']], nodes[dct['target']], CSubst.from_dict(dct['csubst']))
269
[docs]
270 def replace_source(self, node: KCFG.Node) -> KCFG.Cover:
271 assert node.id == self.source.id
272 return KCFG.Cover(node, self.target, self.csubst)
273
[docs]
274 def replace_target(self, node: KCFG.Node) -> KCFG.Cover:
275 assert node.id == self.target.id
276 return KCFG.Cover(self.source, node, self.csubst)
277
[docs]
278 @dataclass(frozen=True)
279 class MultiEdge(Successor):
280 source: KCFG.Node
281
282 def __lt__(self, other: Any) -> bool:
283 if not type(other) is type(self):
284 return NotImplemented
285 return (self.source, self.target_ids) < (other.source, other.target_ids)
286
[docs]
287 @abstractmethod
288 def with_single_target(self, target: KCFG.Node) -> KCFG.MultiEdge: ...
289
[docs]
290 @final
291 @dataclass(frozen=True)
292 class Split(MultiEdge):
293 source: KCFG.Node
294 _targets: tuple[tuple[KCFG.Node, CSubst], ...]
295
296 def __init__(self, source: KCFG.Node, _targets: Iterable[tuple[KCFG.Node, CSubst]]) -> None:
297 object.__setattr__(self, 'source', source)
298 object.__setattr__(self, '_targets', tuple(_targets))
299
300 @property
301 def targets(self) -> tuple[KCFG.Node, ...]:
302 return tuple(target for target, _ in self._targets)
303
304 @property
305 def splits(self) -> dict[int, CSubst]:
306 return {target.id: csubst for target, csubst in self._targets}
307
[docs]
308 def to_dict(self) -> dict[str, Any]:
309 return {
310 'source': self.source.id,
311 'targets': [
312 {
313 'target': target.id,
314 'csubst': csubst.to_dict(),
315 }
316 for target, csubst in self._targets
317 ],
318 }
319
[docs]
320 @staticmethod
321 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Split:
322 _targets = [(nodes[target['target']], CSubst.from_dict(target['csubst'])) for target in dct['targets']]
323 return KCFG.Split(nodes[dct['source']], tuple(_targets))
324
[docs]
325 def with_single_target(self, target: KCFG.Node) -> KCFG.Split:
326 return KCFG.Split(self.source, ((target, self.splits[target.id]),))
327
328 @property
329 def covers(self) -> tuple[KCFG.Cover, ...]:
330 return tuple(KCFG.Cover(target, self.source, csubst) for target, csubst in self._targets)
331
[docs]
332 def replace_source(self, node: KCFG.Node) -> KCFG.Split:
333 assert node.id == self.source.id
334 return KCFG.Split(node, self._targets)
335
[docs]
336 def replace_target(self, node: KCFG.Node) -> KCFG.Split:
337 assert node.id in self.target_ids
338 new_targets = [
339 (node, csubst) if node.id == target_node.id else (target_node, csubst)
340 for target_node, csubst in self._targets
341 ]
342 return KCFG.Split(self.source, tuple(new_targets))
343
[docs]
344 @final
345 @dataclass(frozen=True)
346 class NDBranch(MultiEdge):
347 source: KCFG.Node
348 _targets: tuple[KCFG.Node, ...]
349 rules: tuple[str, ...]
350
351 def __init__(self, source: KCFG.Node, _targets: Iterable[KCFG.Node], rules: tuple[str, ...]) -> None:
352 object.__setattr__(self, 'source', source)
353 object.__setattr__(self, '_targets', tuple(_targets))
354 object.__setattr__(self, 'rules', rules)
355
356 @property
357 def targets(self) -> tuple[KCFG.Node, ...]:
358 return self._targets
359
[docs]
360 def to_dict(self) -> dict[str, Any]:
361 return {
362 'source': self.source.id,
363 'targets': [target.id for target in self.targets],
364 'rules': list(self.rules),
365 }
366
[docs]
367 @staticmethod
368 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.NDBranch:
369 return KCFG.NDBranch(
370 nodes[dct['source']], tuple([nodes[target] for target in dct['targets']]), tuple(dct['rules'])
371 )
372
[docs]
373 def with_single_target(self, target: KCFG.Node) -> KCFG.NDBranch:
374 return KCFG.NDBranch(self.source, (target,), ())
375
376 @property
377 def edges(self) -> tuple[KCFG.Edge, ...]:
378 return tuple(KCFG.Edge(self.source, target, 1, ()) for target in self.targets)
379
[docs]
380 def replace_source(self, node: KCFG.Node) -> KCFG.NDBranch:
381 assert node.id == self.source.id
382 return KCFG.NDBranch(node, self._targets, self.rules)
383
[docs]
384 def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch:
385 assert node.id in self.target_ids
386 new_targets = [node if node.id == target_node.id else target_node for target_node in self._targets]
387 return KCFG.NDBranch(self.source, tuple(new_targets), self.rules)
388
389 _node_id: int
390 _nodes: MutableMapping[int, KCFG.Node]
391
392 _created_nodes: set[int]
393 _deleted_nodes: set[int]
394
395 _edges: dict[int, Edge]
396 _covers: dict[int, Cover]
397 _splits: dict[int, Split]
398 _ndbranches: dict[int, NDBranch]
399 _aliases: dict[str, int]
400 _lock: RLock
401
402 _kcfg_store: KCFGStore | None
403
404 def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) -> None:
405 self._node_id = 1
406 if optimize_memory:
407 from .store import OptimizedNodeStore
408
409 self._nodes = OptimizedNodeStore()
410 else:
411 self._nodes = {}
412 self._created_nodes = set()
413 self._deleted_nodes = set()
414 self._edges = {}
415 self._covers = {}
416 self._splits = {}
417 self._ndbranches = {}
418 self._aliases = {}
419 self._lock = RLock()
420 if cfg_dir is not None:
421 self._kcfg_store = KCFGStore(cfg_dir)
422
423 def __contains__(self, item: object) -> bool:
424 if type(item) is KCFG.Node:
425 return self.contains_node(item)
426 if type(item) is KCFG.Edge:
427 return self.contains_edge(item)
428 if type(item) is KCFG.Cover:
429 return self.contains_cover(item)
430 if type(item) is KCFG.Split:
431 return self.contains_split(item)
432 if type(item) is KCFG.NDBranch:
433 return self.contains_ndbranch(item)
434 return False
435
436 def __enter__(self) -> KCFG:
437 self._lock.acquire()
438 return self
439
440 def __exit__(
441 self,
442 exc_type: type[BaseException] | None,
443 exc_value: BaseException | None,
444 traceback: TracebackType | None,
445 ) -> bool:
446 self._lock.release()
447 if exc_type is None:
448 return True
449 return False
450
451 @property
452 def nodes(self) -> list[KCFG.Node]:
453 return list(self._nodes.values())
454
455 @property
456 def root(self) -> list[KCFG.Node]:
457 return [node for node in self.nodes if self.is_root(node.id)]
458
459 @property
460 def vacuous(self) -> list[KCFG.Node]:
461 return [node for node in self.nodes if self.is_vacuous(node.id)]
462
463 @property
464 def stuck(self) -> list[KCFG.Node]:
465 return [node for node in self.nodes if self.is_stuck(node.id)]
466
467 @property
468 def leaves(self) -> list[KCFG.Node]:
469 return [node for node in self.nodes if self.is_leaf(node.id)]
470
471 @property
472 def covered(self) -> list[KCFG.Node]:
473 return [node for node in self.nodes if self.is_covered(node.id)]
474
475 @property
476 def uncovered(self) -> list[KCFG.Node]:
477 return [node for node in self.nodes if not self.is_covered(node.id)]
478
[docs]
479 @staticmethod
480 def from_claim(
481 defn: KDefinition, claim: KClaim, cfg_dir: Path | None = None, optimize_memory: bool = True
482 ) -> tuple[KCFG, NodeIdLike, NodeIdLike]:
483 cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory)
484 claim_body = claim.body
485 claim_body = defn.instantiate_cell_vars(claim_body)
486 claim_body = rename_generated_vars(claim_body)
487
488 claim_lhs = CTerm.from_kast(extract_lhs(claim_body)).add_constraint(bool_to_ml_pred(claim.requires))
489 init_node = cfg.create_node(claim_lhs)
490
491 claim_rhs = CTerm.from_kast(extract_rhs(claim_body)).add_constraint(
492 bool_to_ml_pred(andBool([claim.requires, claim.ensures]))
493 )
494 target_node = cfg.create_node(claim_rhs)
495
496 return cfg, init_node.id, target_node.id
497
[docs]
498 @staticmethod
499 def path_length(_path: Iterable[KCFG.Successor]) -> int:
500 _path = tuple(_path)
501 if len(_path) == 0:
502 return 0
503 if type(_path[0]) is KCFG.Split or type(_path[0]) is KCFG.Cover:
504 return KCFG.path_length(_path[1:])
505 elif type(_path[0]) is KCFG.NDBranch:
506 return 1 + KCFG.path_length(_path[1:])
507 elif type(_path[0]) is KCFG.Edge:
508 return _path[0].depth + KCFG.path_length(_path[1:])
509 raise ValueError(f'Cannot handle Successor type: {type(_path[0])}')
510
[docs]
511 def extend(
512 self,
513 extend_result: KCFGExtendResult,
514 node: KCFG.Node,
515 logs: dict[int, tuple[LogEntry, ...]],
516 ) -> None:
517 match extend_result:
518 case Vacuous():
519 self.add_vacuous(node.id)
520
521 case Stuck():
522 self.add_stuck(node.id)
523
524 case Abstract(cterm):
525 new_node = self.create_node(cterm)
526 self.create_cover(node.id, new_node.id)
527
528 case Step(cterm, depth, next_node_logs, rule_labels, _):
529 next_node = self.create_node(cterm)
530 logs[next_node.id] = next_node_logs
531 self.create_edge(node.id, next_node.id, depth, rules=rule_labels)
532
533 case Branch(constraints, _):
534 self.split_on_constraints(node.id, constraints)
535
536 case NDBranch(cterms, next_node_logs, rule_labels):
537 next_ids = [self.create_node(cterm).id for cterm in cterms]
538 for i in next_ids:
539 logs[i] = next_node_logs
540 self.create_ndbranch(node.id, next_ids, rules=rule_labels)
541
542 case _:
543 raise AssertionError()
544
[docs]
545 def to_dict(self) -> dict[str, Any]:
546 nodes = [node.to_dict() for node in self.nodes]
547 edges = [edge.to_dict() for edge in self.edges()]
548 covers = [cover.to_dict() for cover in self.covers()]
549 splits = [split.to_dict() for split in self.splits()]
550 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()]
551
552 aliases = dict(sorted(self._aliases.items()))
553
554 res = {
555 'next': self._node_id,
556 'nodes': nodes,
557 'edges': edges,
558 'covers': covers,
559 'splits': splits,
560 'ndbranches': ndbranches,
561 'aliases': aliases,
562 }
563 return {k: v for k, v in res.items() if v}
564
[docs]
565 @staticmethod
566 def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG:
567 cfg = KCFG(optimize_memory=optimize_memory)
568
569 for node_dict in dct.get('nodes') or []:
570 node = KCFG.Node.from_dict(node_dict)
571 cfg.add_node(node)
572
573 max_id = max([node.id for node in cfg.nodes], default=0)
574 cfg._node_id = dct.get('next', max_id + 1)
575
576 for edge_dict in dct.get('edges') or []:
577 edge = KCFG.Edge.from_dict(edge_dict, cfg._nodes)
578 cfg.add_successor(edge)
579
580 for cover_dict in dct.get('covers') or []:
581 cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes)
582 cfg.add_successor(cover)
583
584 for split_dict in dct.get('splits') or []:
585 split = KCFG.Split.from_dict(split_dict, cfg._nodes)
586 cfg.add_successor(split)
587
588 for ndbranch_dict in dct.get('ndbranches') or []:
589 ndbranch = KCFG.NDBranch.from_dict(ndbranch_dict, cfg._nodes)
590 cfg.add_successor(ndbranch)
591
592 for alias, node_id in dct.get('aliases', {}).items():
593 cfg.add_alias(alias=alias, node_id=node_id)
594
595 return cfg
596
[docs]
597 def aliases(self, node_id: NodeIdLike) -> list[str]:
598 node_id = self._resolve(node_id)
599 return [alias for alias, value in self._aliases.items() if node_id == value]
600
[docs]
601 def to_json(self) -> str:
602 return json.dumps(self.to_dict(), sort_keys=True)
603
[docs]
604 @staticmethod
605 def from_json(s: str, optimize_memory: bool = True) -> KCFG:
606 return KCFG.from_dict(json.loads(s), optimize_memory=optimize_memory)
607
[docs]
608 def to_rules(self, priority: int = 20) -> list[KRuleLike]:
609 return [e.to_rule('BASIC-BLOCK', priority=priority) for e in self.edges()]
610
[docs]
611 def to_module(
612 self,
613 module_name: str | None = None,
614 imports: Iterable[KImport] = (),
615 priority: int = 20,
616 att: KAtt = EMPTY_ATT,
617 ) -> KFlatModule:
618 module_name = 'KCFG' if module_name is None else module_name
619 return KFlatModule(module_name, self.to_rules(priority=priority), imports=imports, att=att)
620
621 def _resolve_or_none(self, id_like: NodeIdLike) -> int | None:
622 if type(id_like) is int:
623 if id_like in self._nodes:
624 return id_like
625
626 return None
627
628 if type(id_like) is not str:
629 raise TypeError(f'Expected int or str for id_like, got: {id_like}')
630
631 if id_like.startswith('@'):
632 if id_like[1:] in self._aliases:
633 return self._aliases[id_like[1:]]
634 raise ValueError(f'Unknown alias: {id_like}')
635
636 return None
637
638 def _resolve(self, id_like: NodeIdLike) -> int:
639 match = self._resolve_or_none(id_like)
640 if not match:
641 raise ValueError(f'Unknown node: {id_like}')
642 return match
643
[docs]
644 def node(self, node_id: NodeIdLike) -> KCFG.Node:
645 node_id = self._resolve(node_id)
646 return self._nodes[node_id]
647
[docs]
648 def get_node(self, node_id: NodeIdLike) -> KCFG.Node | None:
649 resolved_id = self._resolve_or_none(node_id)
650 if resolved_id is None:
651 return None
652 return self._nodes[resolved_id]
653
[docs]
654 def contains_node(self, node: KCFG.Node) -> bool:
655 return bool(self.get_node(node.id))
656
[docs]
657 def add_node(self, node: KCFG.Node) -> None:
658 if node.id in self._nodes:
659 raise ValueError(f'Node with id already exists: {node.id}')
660 self._nodes[node.id] = node
661 self._created_nodes.add(node.id)
662
[docs]
663 def create_node(self, cterm: CTerm) -> KCFG.Node:
664 node = KCFG.Node(self._node_id, cterm)
665 self._node_id += 1
666 self._nodes[node.id] = node
667 self._created_nodes.add(node.id)
668 return node
669
[docs]
670 def remove_node(self, node_id: NodeIdLike) -> None:
671 node_id = self._resolve(node_id)
672
673 node = self._nodes.pop(node_id)
674 self._created_nodes.discard(node_id)
675 self._deleted_nodes.add(node.id)
676
677 self._edges = {k: s for k, s in self._edges.items() if k != node_id and node_id not in s.target_ids}
678 self._covers = {k: s for k, s in self._covers.items() if k != node_id and node_id not in s.target_ids}
679
680 self._splits = {k: s for k, s in self._splits.items() if k != node_id and node_id not in s.target_ids}
681 self._ndbranches = {k: b for k, b in self._ndbranches.items() if k != node_id and node_id not in b.target_ids}
682
683 for alias in [alias for alias, id in self._aliases.items() if id == node_id]:
684 self.remove_alias(alias)
685
686 def _update_refs(self, node_id: int) -> None:
687 node = self.node(node_id)
688 for succ in self.successors(node_id):
689 new_succ = succ.replace_source(node)
690 if type(new_succ) is KCFG.Edge:
691 self._edges[new_succ.source.id] = new_succ
692 if type(new_succ) is KCFG.Cover:
693 self._covers[new_succ.source.id] = new_succ
694 if type(new_succ) is KCFG.Split:
695 self._splits[new_succ.source.id] = new_succ
696 if type(new_succ) is KCFG.NDBranch:
697 self._ndbranches[new_succ.source.id] = new_succ
698
699 for pred in self.predecessors(node_id):
700 new_pred = pred.replace_target(node)
701 if type(new_pred) is KCFG.Edge:
702 self._edges[new_pred.source.id] = new_pred
703 if type(new_pred) is KCFG.Cover:
704 self._covers[new_pred.source.id] = new_pred
705 if type(new_pred) is KCFG.Split:
706 self._splits[new_pred.source.id] = new_pred
707 if type(new_pred) is KCFG.NDBranch:
708 self._ndbranches[new_pred.source.id] = new_pred
709
[docs]
710 def remove_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
711 node = self.node(node_id)
712 new_node = node.remove_attr(attr)
713 self.replace_node(new_node)
714
[docs]
715 def discard_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
716 node = self.node(node_id)
717 new_node = node.discard_attr(attr)
718 self.replace_node(new_node)
719
[docs]
720 def add_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
721 node = self.node(node_id)
722 new_node = node.add_attr(attr)
723 self.replace_node(new_node)
724
[docs]
725 def let_node(
726 self, node_id: NodeIdLike, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None
727 ) -> None:
728 node = self.node(node_id)
729 new_node = node.let(cterm=cterm, attrs=attrs)
730 self.replace_node(new_node)
731
[docs]
732 def replace_node(self, node: KCFG.Node) -> None:
733 self._nodes[node.id] = node
734 self._created_nodes.add(node.id)
735 self._update_refs(node.id)
736
[docs]
737 def successors(self, source_id: NodeIdLike) -> list[Successor]:
738 out_edges: Iterable[KCFG.Successor] = self.edges(source_id=source_id)
739 out_covers: Iterable[KCFG.Successor] = self.covers(source_id=source_id)
740 out_splits: Iterable[KCFG.Successor] = self.splits(source_id=source_id)
741 out_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(source_id=source_id)
742 return list(out_edges) + list(out_covers) + list(out_splits) + list(out_ndbranches)
743
[docs]
744 def predecessors(self, target_id: NodeIdLike) -> list[Successor]:
745 in_edges: Iterable[KCFG.Successor] = self.edges(target_id=target_id)
746 in_covers: Iterable[KCFG.Successor] = self.covers(target_id=target_id)
747 in_splits: Iterable[KCFG.Successor] = self.splits(target_id=target_id)
748 in_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(target_id=target_id)
749 return list(in_edges) + list(in_covers) + list(in_splits) + list(in_ndbranches)
750
751 def _check_no_successors(self, source_id: NodeIdLike) -> None:
752 if len(self.successors(source_id)) > 0:
753 raise ValueError(f'Node already has successors: {source_id} -> {self.successors(source_id)}')
754
755 def _check_no_zero_loops(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> None:
756 for target_id in target_ids:
757 path = self.shortest_path_between(target_id, source_id)
758 if path is not None and KCFG.path_length(path) == 0:
759 raise ValueError(
760 f'Adding successor would create zero-length loop with backedge: {source_id} -> {target_id}'
761 )
762
[docs]
763 def add_successor(self, succ: KCFG.Successor) -> None:
764 self._check_no_successors(succ.source.id)
765 self._check_no_zero_loops(succ.source.id, succ.target_ids)
766 if type(succ) is KCFG.Edge:
767 self._edges[succ.source.id] = succ
768 elif type(succ) is KCFG.Cover:
769 self._covers[succ.source.id] = succ
770 else:
771 if len(succ.target_ids) <= 1:
772 raise ValueError(
773 f'Cannot create {type(succ)} node with less than 2 targets: {succ.source.id} -> {succ.target_ids}'
774 )
775 if type(succ) is KCFG.Split:
776 self._splits[succ.source.id] = succ
777 elif type(succ) is KCFG.NDBranch:
778 self._ndbranches[succ.source.id] = succ
779
[docs]
780 def edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Edge | None:
781 source_id = self._resolve(source_id)
782 target_id = self._resolve(target_id)
783 edge = self._edges.get(source_id, None)
784 return edge if edge is not None and edge.target.id == target_id else None
785
[docs]
786 def edges(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Edge]:
787 source_id = self._resolve(source_id) if source_id is not None else None
788 target_id = self._resolve(target_id) if target_id is not None else None
789 return [
790 edge
791 for edge in self._edges.values()
792 if (source_id is None or source_id == edge.source.id) and (target_id is None or target_id == edge.target.id)
793 ]
794
[docs]
795 def contains_edge(self, edge: Edge) -> bool:
796 if other := self.edge(source_id=edge.source.id, target_id=edge.target.id):
797 return edge == other
798 return False
799
[docs]
800 def create_edge(self, source_id: NodeIdLike, target_id: NodeIdLike, depth: int, rules: Iterable[str] = ()) -> Edge:
801 if depth <= 0:
802 raise ValueError(f'Cannot build KCFG Edge with non-positive depth: {depth}')
803 source = self.node(source_id)
804 target = self.node(target_id)
805 edge = KCFG.Edge(source, target, depth, tuple(rules))
806 self.add_successor(edge)
807 return edge
808
[docs]
809 def remove_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
810 source_id = self._resolve(source_id)
811 target_id = self._resolve(target_id)
812 edge = self.edge(source_id, target_id)
813 if not edge:
814 raise ValueError(f'Edge does not exist: {source_id} -> {target_id}')
815 self._edges.pop(source_id)
816
[docs]
817 def cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Cover | None:
818 source_id = self._resolve(source_id)
819 target_id = self._resolve(target_id)
820 cover = self._covers.get(source_id, None)
821 return cover if cover is not None and cover.target.id == target_id else None
822
[docs]
823 def covers(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Cover]:
824 source_id = self._resolve(source_id) if source_id is not None else None
825 target_id = self._resolve(target_id) if target_id is not None else None
826 return [
827 cover
828 for cover in self._covers.values()
829 if (source_id is None or source_id == cover.source.id)
830 and (target_id is None or target_id == cover.target.id)
831 ]
832
[docs]
833 def contains_cover(self, cover: Cover) -> bool:
834 if other := self.cover(source_id=cover.source.id, target_id=cover.target.id):
835 return cover == other
836 return False
837
[docs]
838 def create_cover(self, source_id: NodeIdLike, target_id: NodeIdLike, csubst: CSubst | None = None) -> Cover:
839 source = self.node(source_id)
840 target = self.node(target_id)
841 if csubst is None:
842 csubst = target.cterm.match_with_constraint(source.cterm)
843 if csubst is None:
844 raise ValueError(f'No matching between: {source.id} and {target.id}')
845 cover = KCFG.Cover(source, target, csubst=csubst)
846 self.add_successor(cover)
847 return cover
848
[docs]
849 def remove_cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
850 source_id = self._resolve(source_id)
851 target_id = self._resolve(target_id)
852 cover = self.cover(source_id, target_id)
853 if not cover:
854 raise ValueError(f'Cover does not exist: {source_id} -> {target_id}')
855 self._covers.pop(source_id)
856
[docs]
857 def edge_likes(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[EdgeLike]:
858 return cast('List[KCFG.EdgeLike]', self.edges(source_id=source_id, target_id=target_id)) + cast(
859 'List[KCFG.EdgeLike]', self.covers(source_id=source_id, target_id=target_id)
860 )
861
[docs]
862 def add_vacuous(self, node_id: NodeIdLike) -> None:
863 self.add_attr(node_id, KCFGNodeAttr.VACUOUS)
864
[docs]
865 def remove_vacuous(self, node_id: NodeIdLike) -> None:
866 self.remove_attr(node_id, KCFGNodeAttr.VACUOUS)
867
[docs]
868 def discard_vacuous(self, node_id: NodeIdLike) -> None:
869 self.discard_attr(node_id, KCFGNodeAttr.VACUOUS)
870
[docs]
871 def add_stuck(self, node_id: NodeIdLike) -> None:
872 self.add_attr(node_id, KCFGNodeAttr.STUCK)
873
[docs]
874 def remove_stuck(self, node_id: NodeIdLike) -> None:
875 self.remove_attr(node_id, KCFGNodeAttr.STUCK)
876
[docs]
877 def discard_stuck(self, node_id: NodeIdLike) -> None:
878 self.discard_attr(node_id, KCFGNodeAttr.STUCK)
879
[docs]
880 def splits(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Split]:
881 source_id = self._resolve(source_id) if source_id is not None else None
882 target_id = self._resolve(target_id) if target_id is not None else None
883 return [
884 s
885 for s in self._splits.values()
886 if (source_id is None or source_id == s.source.id) and (target_id is None or target_id in s.target_ids)
887 ]
888
[docs]
889 def contains_split(self, split: Split) -> bool:
890 return split in self._splits.values()
891
[docs]
892 def create_split(self, source_id: NodeIdLike, splits: Iterable[tuple[NodeIdLike, CSubst]]) -> KCFG.Split:
893 source_id = self._resolve(source_id)
894 split = KCFG.Split(self.node(source_id), tuple((self.node(nid), csubst) for nid, csubst in list(splits)))
895 self.add_successor(split)
896 return split
897
[docs]
898 def ndbranches(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[NDBranch]:
899 source_id = self._resolve(source_id) if source_id is not None else None
900 target_id = self._resolve(target_id) if target_id is not None else None
901 return [
902 b
903 for b in self._ndbranches.values()
904 if (source_id is None or source_id == b.source.id) and (target_id is None or target_id in b.target_ids)
905 ]
906
[docs]
907 def contains_ndbranch(self, ndbranch: NDBranch) -> bool:
908 return ndbranch in self._ndbranches
909
[docs]
910 def create_ndbranch(
911 self, source_id: NodeIdLike, ndbranches: Iterable[NodeIdLike], rules: Iterable[str] = ()
912 ) -> KCFG.NDBranch:
913 source_id = self._resolve(source_id)
914 ndbranch = KCFG.NDBranch(self.node(source_id), tuple(self.node(nid) for nid in list(ndbranches)), tuple(rules))
915 self.add_successor(ndbranch)
916 return ndbranch
917
[docs]
918 def split_on_constraints(self, source_id: NodeIdLike, constraints: Iterable[KInner]) -> list[int]:
919 source = self.node(source_id)
920 branch_node_ids = [self.create_node(source.cterm.add_constraint(c)).id for c in constraints]
921 csubsts = [not_none(source.cterm.match_with_constraint(self.node(id).cterm)) for id in branch_node_ids]
922 csubsts = [
923 reduce(CSubst.add_constraint, flatten_label('#And', constraint), csubst)
924 for (csubst, constraint) in zip(csubsts, constraints, strict=True)
925 ]
926 self.create_split(source.id, zip(branch_node_ids, csubsts, strict=True))
927 return branch_node_ids
928
[docs]
929 def lift_edge(self, b_id: NodeIdLike) -> None:
930 """Lift an edge up another edge directly preceding it.
931
932 Input:
933 - b_id: the identifier of the central node `B` of a sequence of edges `A --> B --> C`.
934
935 Effect: `A --M steps--> B --N steps--> C` becomes `A --(M + N) steps--> C`. Node `B` is removed.
936
937 Output: None
938
939 Raises: An `AssertionError` if the edges in question are not in place.
940 """
941
942 # Obtain edges `A -> B`, `B -> C`
943 a_to_b = single(self.edges(target_id=b_id))
944 b_to_c = single(self.edges(source_id=b_id))
945 # Remove the node `B`, effectively removing the entire initial structure
946 self.remove_node(b_id)
947 # Create edge `A -> C`
948 self.create_edge(a_to_b.source.id, b_to_c.target.id, a_to_b.depth + b_to_c.depth, a_to_b.rules + b_to_c.rules)
949
[docs]
950 def lift_edges(self) -> bool:
951 """Perform all possible edge lifts across the KCFG.
952
953 Given the KCFG design, it is not possible for one edge lift to either disallow another or
954 allow another that was not previously possible. Therefore, this function is guaranteed to
955 lift all possible edges without having to loop.
956
957 Input: None
958
959 Effect: The KCFG is transformed to an equivalent in which no further edge lifts are possible.
960
961 Output:
962 - bool: An indicator of whether or not at least one edge lift was performed.
963 """
964
965 edges_to_lift = sorted(
966 [
967 node.id
968 for node in self.nodes
969 if len(self.edges(source_id=node.id)) > 0 and len(self.edges(target_id=node.id)) > 0
970 ]
971 )
972 for node_id in edges_to_lift:
973 self.lift_edge(node_id)
974 return len(edges_to_lift) > 0
975
[docs]
976 def lift_split_edge(self, b_id: NodeIdLike) -> None:
977 """Lift a split up an edge directly preceding it.
978
979 Input:
980 - b_id: the identifier of the central node `B` of the structure `A --> B --> [C_1, ..., C_N]`.
981
982 Effect:
983 `A --M steps--> B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` becomes
984 `A --[cond_1, ..., cond_N]--> [A #And cond_1 --M steps--> C_1, ..., A #And cond_N --M steps--> C_N]`.
985 Node `B` is removed.
986
987 Output: None
988
989 Raises:
990 - `AssertionError`, if the structure in question is not in place.
991 - `AssertionError`, if any of the `cond_i` contain variables not present in `A`.
992 """
993
994 # Obtain edge `A -> B`, split `[cond_I, C_I | I = 1..N ]`
995 a_to_b = single(self.edges(target_id=b_id))
996 a = a_to_b.source
997 split_from_b = single(self.splits(source_id=b_id))
998 ci, csubsts = list(split_from_b.splits.keys()), list(split_from_b.splits.values())
999 # Ensure split can be lifted soundly (i.e., that it does not introduce fresh variables)
1000 assert (
1001 len(split_from_b.source_vars.difference(a.free_vars)) == 0
1002 and len(split_from_b.target_vars.difference(split_from_b.source_vars)) == 0
1003 )
1004 # Create CTerms and CSubsts corresponding to the new targets of the split
1005 new_cterms_with_constraints = [
1006 (CTerm(a.cterm.config, a.cterm.constraints + csubst.constraints), csubst.constraint) for csubst in csubsts
1007 ]
1008 # Generate substitutions for new targets, which all exist by construction
1009 new_csubsts = [
1010 not_none(a.cterm.match_with_constraint(cterm)).add_constraint(constraint)
1011 for (cterm, constraint) in new_cterms_with_constraints
1012 ]
1013 # Remove the node `B`, effectively removing the entire initial structure
1014 self.remove_node(b_id)
1015 # Create the nodes `[ A #And cond_I | I = 1..N ]`.
1016 ai: list[NodeIdLike] = [self.create_node(cterm).id for (cterm, _) in new_cterms_with_constraints]
1017 # Create the edges `[A #And cond_1 --M steps--> C_I | I = 1..N ]`
1018 for i in range(len(ai)):
1019 self.create_edge(ai[i], ci[i], a_to_b.depth, a_to_b.rules)
1020 # Create the split `A --[cond_1, ..., cond_N]--> [A #And cond_1, ..., A #And cond_N]
1021 self.create_split(a.id, zip(ai, new_csubsts, strict=True))
1022
[docs]
1023 def lift_split_split(self, b_id: NodeIdLike) -> None:
1024 """Lift a split up a split directly preceding it, joining them into a single split.
1025
1026 Input:
1027 - b_id: the identifier of the node `B` of the structure
1028 `A --[..., cond_B, ...]--> [..., B, ...]` with `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]`
1029
1030 Effect:
1031 `A --[..., cond_B, ...]--> [..., B, ...]` with `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` becomes
1032 `A --[..., cond_B #And cond_1, ..., cond_B #And cond_N, ...]--> [..., C_1, ..., C_N, ...]`.
1033 Node `B` is removed.
1034
1035 Output: None
1036
1037 Raises:
1038 - `AssertionError`, if the structure in question is not in place.
1039 """
1040 # Obtain splits `A --[..., cond_B, ...]--> [..., B, ...]` and
1041 # `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]-> [C_1, ..., C_N]`
1042 split_from_a, split_from_b = single(self.splits(target_id=b_id)), single(self.splits(source_id=b_id))
1043 splits_from_a, splits_from_b = split_from_a.splits, split_from_b.splits
1044 a = split_from_a.source
1045 ci = list(splits_from_b.keys())
1046 # Ensure split can be lifted soundly (i.e., that it does not introduce fresh variables)
1047 assert (
1048 len(split_from_b.source_vars.difference(a.free_vars)) == 0
1049 and len(split_from_b.target_vars.difference(split_from_b.source_vars)) == 0
1050 )
1051 # Get the substitution for `B`, at the same time removing 'B' from the targets of `A`.
1052 csubst_b = splits_from_a.pop(self._resolve(b_id))
1053 # Generate substitutions for additional targets `C_I`, which all exist by construction;
1054 # the constraints are cumulative, resulting in `cond_B #And cond_I`
1055 additional_csubsts = [
1056 not_none(a.cterm.match_with_constraint(self.node(ci).cterm))
1057 .add_constraint(csubst.constraint)
1058 .add_constraint(csubst_b.constraint)
1059 for ci, csubst in splits_from_b.items()
1060 ]
1061 # Create the targets of the new split
1062 ci = list(splits_from_b.keys())
1063 new_splits = zip(
1064 list(splits_from_a.keys()) + ci, list(splits_from_a.values()) + additional_csubsts, strict=True
1065 )
1066 # Remove the node `B`, thereby removing the two splits as well
1067 self.remove_node(b_id)
1068 # Create the new split `A --[..., cond_B #And cond_1, ..., cond_B #And cond_N, ...]--> [..., C_1, ..., C_N, ...]`
1069 self.create_split(a.id, new_splits)
1070
[docs]
1071 def lift_splits(self) -> bool:
1072 """Perform all possible split liftings.
1073
1074 Input: None
1075
1076 Effect: The KCFG is transformed to an equivalent in which no further split lifts are possible.
1077
1078 Output:
1079 - bool: An indicator of whether or not at least one split lift was performed.
1080 """
1081
1082 def _lift_split(finder: Callable, lifter: Callable) -> bool:
1083 result = False
1084 while True:
1085 splits_to_lift = sorted(
1086 [
1087 node.id
1088 for node in self.nodes
1089 if (splits := self.splits(source_id=node.id)) != []
1090 and (sources := finder(target_id=node.id)) != []
1091 and (source := single(sources).source)
1092 and (split := single(splits))
1093 and len(split.source_vars.difference(source.free_vars)) == 0
1094 and len(split.target_vars.difference(split.source_vars)) == 0
1095 ]
1096 )
1097 for id in splits_to_lift:
1098 lifter(id)
1099 result = True
1100 if len(splits_to_lift) == 0:
1101 break
1102 return result
1103
1104 def _fold_lift(result: bool, finder_lifter: tuple[Callable, Callable]) -> bool:
1105 return _lift_split(finder_lifter[0], finder_lifter[1]) or result
1106
1107 return reduce(_fold_lift, [(self.edges, self.lift_split_edge), (self.splits, self.lift_split_split)], False)
1108
[docs]
1109 def minimize(self) -> None:
1110 """Minimize KCFG by repeatedly performing the lifting transformations.
1111
1112 The loop is designed so that each transformation is performed once in each iteration.
1113
1114 Input: None
1115
1116 Effect: The KCFG is transformed to an equivalent in which no further lifting transformations are possible.
1117
1118 Output: None
1119 """
1120
1121 repeat = True
1122 while repeat:
1123 repeat = self.lift_edges()
1124 repeat = self.lift_splits() or repeat
1125
[docs]
1126 def add_alias(self, alias: str, node_id: NodeIdLike) -> None:
1127 if '@' in alias:
1128 raise ValueError('Alias may not contain "@"')
1129 if alias in self._aliases:
1130 raise ValueError(f'Duplicate alias: {alias}')
1131 node_id = self._resolve(node_id)
1132 self._aliases[alias] = node_id
1133
[docs]
1134 def remove_alias(self, alias: str) -> None:
1135 if alias not in self._aliases:
1136 raise ValueError(f'Alias does not exist: {alias}')
1137 self._aliases.pop(alias)
1138
[docs]
1139 def is_root(self, node_id: NodeIdLike) -> bool:
1140 node_id = self._resolve(node_id)
1141 return len(self.predecessors(node_id)) == 0
1142
[docs]
1143 def is_vacuous(self, node_id: NodeIdLike) -> bool:
1144 return KCFGNodeAttr.VACUOUS in self.node(node_id).attrs
1145
[docs]
1146 def is_stuck(self, node_id: NodeIdLike) -> bool:
1147 return KCFGNodeAttr.STUCK in self.node(node_id).attrs
1148
[docs]
1149 def is_split(self, node_id: NodeIdLike) -> bool:
1150 node_id = self._resolve(node_id)
1151 return node_id in self._splits
1152
[docs]
1153 def is_ndbranch(self, node_id: NodeIdLike) -> bool:
1154 node_id = self._resolve(node_id)
1155 return node_id in self._ndbranches
1156
[docs]
1157 def is_leaf(self, node_id: NodeIdLike) -> bool:
1158 return len(self.successors(node_id)) == 0
1159
[docs]
1160 def is_covered(self, node_id: NodeIdLike) -> bool:
1161 node_id = self._resolve(node_id)
1162 return node_id in self._covers
1163
[docs]
1164 def prune(self, node_id: NodeIdLike, keep_nodes: Iterable[NodeIdLike] = ()) -> list[int]:
1165 nodes = self.reachable_nodes(node_id)
1166 keep_nodes = [self._resolve(nid) for nid in keep_nodes]
1167 pruned_nodes: list[int] = []
1168 for node in nodes:
1169 if node.id not in keep_nodes:
1170 self.remove_node(node.id)
1171 pruned_nodes.append(node.id)
1172 return pruned_nodes
1173
[docs]
1174 def shortest_path_between(
1175 self, source_node_id: NodeIdLike, target_node_id: NodeIdLike
1176 ) -> tuple[Successor, ...] | None:
1177 paths = self.paths_between(source_node_id, target_node_id)
1178 if len(paths) == 0:
1179 return None
1180 return sorted(paths, key=(lambda path: KCFG.path_length(path)))[0]
1181
[docs]
1182 def shortest_distance_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> int | None:
1183 path_1 = self.shortest_path_between(node_1_id, node_2_id)
1184 path_2 = self.shortest_path_between(node_2_id, node_1_id)
1185 distance: int | None = None
1186 if path_1 is not None:
1187 distance = KCFG.path_length(path_1)
1188 if path_2 is not None:
1189 distance_2 = KCFG.path_length(path_2)
1190 if distance is None or distance_2 < distance:
1191 distance = distance_2
1192 return distance
1193
[docs]
1194 def zero_depth_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> bool:
1195 shortest_distance = self.shortest_distance_between(node_1_id, node_2_id)
1196 return shortest_distance is not None and shortest_distance == 0
1197
[docs]
1198 def paths_between(self, source_id: NodeIdLike, target_id: NodeIdLike) -> list[tuple[Successor, ...]]:
1199 source_id = self._resolve(source_id)
1200 target_id = self._resolve(target_id)
1201
1202 if source_id == target_id:
1203 return [()]
1204
1205 source_successors = list(self.successors(source_id))
1206 assert len(source_successors) <= 1
1207 if len(source_successors) == 0:
1208 return []
1209
1210 paths: list[tuple[KCFG.Successor, ...]] = []
1211 worklist: list[list[KCFG.Successor]] = [[source_successors[0]]]
1212
1213 def _in_path(_nid: int, _path: list[KCFG.Successor]) -> bool:
1214 for succ in _path:
1215 if _nid == succ.source.id:
1216 return True
1217 if len(_path) > 0:
1218 if isinstance(_path[-1], KCFG.EdgeLike) and _path[-1].target.id == _nid:
1219 return True
1220 elif isinstance(_path[-1], KCFG.MultiEdge) and _nid in _path[-1].target_ids:
1221 return True
1222 return False
1223
1224 while worklist:
1225 curr_path = worklist.pop()
1226 curr_successor = curr_path[-1]
1227 successors: list[KCFG.Successor] = []
1228
1229 if isinstance(curr_successor, KCFG.EdgeLike):
1230 if curr_successor.target.id == target_id:
1231 paths.append(tuple(curr_path))
1232 continue
1233 else:
1234 successors = list(self.successors(curr_successor.target.id))
1235
1236 elif isinstance(curr_successor, KCFG.MultiEdge):
1237 if len(list(curr_successor.targets)) == 1:
1238 target = list(curr_successor.targets)[0]
1239 if target.id == target_id:
1240 paths.append(tuple(curr_path))
1241 continue
1242 else:
1243 successors = list(self.successors(target.id))
1244 if len(list(curr_successor.targets)) > 1:
1245 curr_path = curr_path[0:-1]
1246 successors = [curr_successor.with_single_target(target) for target in curr_successor.targets]
1247
1248 for successor in successors:
1249 if isinstance(successor, KCFG.EdgeLike) and not _in_path(successor.target.id, curr_path):
1250 worklist.append(curr_path + [successor])
1251 elif isinstance(successor, KCFG.MultiEdge):
1252 if len(list(successor.targets)) == 1:
1253 target = list(successor.targets)[0]
1254 if not _in_path(target.id, curr_path):
1255 worklist.append(curr_path + [successor])
1256 elif len(list(successor.targets)) > 1:
1257 worklist.append(curr_path + [successor])
1258
1259 return paths
1260
[docs]
1261 def reachable_nodes(self, source_id: NodeIdLike, *, reverse: bool = False) -> set[KCFG.Node]:
1262 visited: set[KCFG.Node] = set()
1263 worklist: list[KCFG.Node] = [self.node(source_id)]
1264
1265 while worklist:
1266 node = worklist.pop()
1267
1268 if node in visited:
1269 continue
1270
1271 visited.add(node)
1272
1273 if not reverse:
1274 worklist.extend(target for succ in self.successors(source_id=node.id) for target in succ.targets)
1275 else:
1276 worklist.extend(succ.source for succ in self.predecessors(target_id=node.id))
1277
1278 return visited
1279
[docs]
1280 def write_cfg_data(self) -> None:
1281 assert self._kcfg_store is not None
1282 self._kcfg_store.write_cfg_data(
1283 self.to_dict(), deleted_nodes=self._deleted_nodes, created_nodes=self._created_nodes
1284 )
1285 self._deleted_nodes.clear()
1286 self._created_nodes.clear()
1287
[docs]
1288 @staticmethod
1289 def read_cfg_data(cfg_dir: Path) -> KCFG:
1290 store = KCFGStore(cfg_dir)
1291 cfg = KCFG.from_dict(store.read_cfg_data())
1292 cfg._kcfg_store = store
1293 return cfg
1294
[docs]
1295 @staticmethod
1296 def read_node_data(cfg_dir: Path, node_id: int) -> KCFG.Node:
1297 store = KCFGStore(cfg_dir)
1298 return KCFG.Node.from_dict(store.read_node_data(node_id))
1299
1300
[docs]
1301class KCFGExtendResult(ABC): ...
1302
1303
[docs]
1304@final
1305@dataclass(frozen=True)
1306class Vacuous(KCFGExtendResult): ...
1307
1308
[docs]
1309@final
1310@dataclass(frozen=True)
1311class Stuck(KCFGExtendResult): ...
1312
1313
[docs]
1314@final
1315@dataclass(frozen=True)
1316class Abstract(KCFGExtendResult):
1317 cterm: CTerm
1318
1319
[docs]
1320@final
1321@dataclass(frozen=True)
1322class Step(KCFGExtendResult):
1323 cterm: CTerm
1324 depth: int
1325 logs: tuple[LogEntry, ...]
1326 rule_labels: list[str]
1327 cut: bool = field(default=False)
1328
1329
[docs]
1330@final
1331@dataclass(frozen=True)
1332class Branch(KCFGExtendResult):
1333 constraints: tuple[KInner, ...]
1334 heuristic: bool
1335
1336 def __init__(self, constraints: Iterable[KInner], *, heuristic: bool = False):
1337 object.__setattr__(self, 'constraints', tuple(constraints))
1338 object.__setattr__(self, 'heuristic', heuristic)
1339
1340
[docs]
1341@final
1342@dataclass(frozen=True)
1343class NDBranch(KCFGExtendResult):
1344 cterms: tuple[CTerm, ...]
1345 logs: tuple[LogEntry, ...]
1346 rule_labels: tuple[str, ...]
1347
1348 def __init__(self, cterms: Iterable[CTerm], logs: Iterable[LogEntry,], rule_labels: Iterable[str]):
1349 object.__setattr__(self, 'cterms', tuple(cterms))
1350 object.__setattr__(self, 'logs', tuple(logs))
1351 object.__setattr__(self, 'rule_labels', tuple(rule_labels))