Source code for pyk.kcfg.kcfg

   1from __future__ import annotations
   2
   3import json
   4import logging
   5from abc import ABC, abstractmethod
   6from collections.abc import Container
   7from dataclasses import dataclass, field
   8from functools import reduce
   9from threading import RLock
  10from typing import TYPE_CHECKING, Final, List, Union, cast, final
  11
  12from ..cterm import CSubst, CTerm, cterm_build_claim, cterm_build_rule
  13from ..kast import EMPTY_ATT
  14from ..kast.inner import KApply
  15from ..kast.manip import (
  16    bool_to_ml_pred,
  17    extract_lhs,
  18    extract_rhs,
  19    flatten_label,
  20    inline_cell_maps,
  21    rename_generated_vars,
  22    sort_ac_collections,
  23)
  24from ..kast.outer import KFlatModule
  25from ..prelude.kbool import andBool
  26from ..utils import ensure_dir_path, not_none, single
  27
  28if TYPE_CHECKING:
  29    from collections.abc import Callable, Iterable, Mapping, MutableMapping
  30    from pathlib import Path
  31    from types import TracebackType
  32    from typing import Any
  33
  34    from pyk.kore.rpc import LogEntry
  35
  36    from ..kast import KAtt
  37    from ..kast.inner import KInner
  38    from ..kast.outer import KClaim, KDefinition, KImport, KRuleLike
  39
  40
  41NodeIdLike = int | str
  42
  43_LOGGER: Final = logging.getLogger(__name__)
  44
  45
[docs] 46@dataclass(frozen=True) 47class NodeAttr: 48 value: str
49 50
[docs] 51class KCFGNodeAttr(NodeAttr): 52 VACUOUS = NodeAttr('vacuous') 53 STUCK = NodeAttr('stuck')
54 55
[docs] 56class KCFGStore: 57 store_path: Path 58 59 def __init__(self, store_path: Path) -> None: 60 self.store_path = store_path 61 ensure_dir_path(store_path) 62 ensure_dir_path(self.kcfg_node_dir) 63 64 @property 65 def kcfg_json_path(self) -> Path: 66 return self.store_path / 'kcfg.json' 67 68 @property 69 def kcfg_node_dir(self) -> Path: 70 return self.store_path / 'nodes' 71
[docs] 72 def kcfg_node_path(self, node_id: int) -> Path: 73 return self.kcfg_node_dir / f'{node_id}.json'
74
[docs] 75 def write_cfg_data( 76 self, dct: dict[str, Any], deleted_nodes: Iterable[int] = (), created_nodes: Iterable[int] = () 77 ) -> None: 78 node_dict = {node_dct['id']: node_dct for node_dct in dct['nodes']} 79 vacuous_nodes = [ 80 node_id for node_id in node_dict.keys() if KCFGNodeAttr.VACUOUS.value in node_dict[node_id]['attrs'] 81 ] 82 stuck_nodes = [ 83 node_id for node_id in node_dict.keys() if KCFGNodeAttr.STUCK.value in node_dict[node_id]['attrs'] 84 ] 85 dct['vacuous'] = vacuous_nodes 86 dct['stuck'] = stuck_nodes 87 for node_id in deleted_nodes: 88 self.kcfg_node_path(node_id).unlink(missing_ok=True) 89 for node_id in created_nodes: 90 del node_dict[node_id]['attrs'] 91 self.kcfg_node_path(node_id).write_text(json.dumps(node_dict[node_id])) 92 dct['nodes'] = list(node_dict.keys()) 93 self.kcfg_json_path.write_text(json.dumps(dct))
94
[docs] 95 def read_cfg_data(self) -> dict[str, Any]: 96 dct = json.loads(self.kcfg_json_path.read_text()) 97 nodes = [self.read_node_data(node_id) for node_id in dct.get('nodes') or []] 98 dct['nodes'] = nodes 99 100 new_nodes = [] 101 for node in dct['nodes']: 102 attrs = [] 103 if node['id'] in dct['vacuous']: 104 attrs.append(KCFGNodeAttr.VACUOUS.value) 105 if node['id'] in dct['stuck']: 106 attrs.append(KCFGNodeAttr.STUCK.value) 107 new_nodes.append({'id': node['id'], 'cterm': node['cterm'], 'attrs': attrs}) 108 109 dct['nodes'] = new_nodes 110 111 del dct['vacuous'] 112 del dct['stuck'] 113 114 return dct
115
[docs] 116 def read_node_data(self, node_id: int) -> dict[str, Any]: 117 return json.loads(self.kcfg_node_path(node_id).read_text())
118 119
[docs] 120class KCFG(Container[Union['KCFG.Node', 'KCFG.Successor']]):
[docs] 121 @final 122 @dataclass(frozen=True, order=True) 123 class Node: 124 id: int 125 cterm: CTerm 126 attrs: frozenset[NodeAttr] 127 128 def __init__(self, id: int, cterm: CTerm, attrs: Iterable[NodeAttr] = ()) -> None: 129 object.__setattr__(self, 'id', id) 130 object.__setattr__(self, 'cterm', cterm) 131 object.__setattr__(self, 'attrs', frozenset(attrs)) 132
[docs] 133 def to_dict(self) -> dict[str, Any]: 134 return {'id': self.id, 'cterm': self.cterm.to_dict(), 'attrs': [attr.value for attr in self.attrs]}
135
[docs] 136 @staticmethod 137 def from_dict(dct: dict[str, Any]) -> KCFG.Node: 138 return KCFG.Node(dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']])
139
[docs] 140 def add_attr(self, attr: NodeAttr) -> KCFG.Node: 141 return KCFG.Node(self.id, self.cterm, list(self.attrs) + [attr])
142
[docs] 143 def remove_attr(self, attr: NodeAttr) -> KCFG.Node: 144 if attr not in self.attrs: 145 raise ValueError(f'Node {self.id} does not have attribute {attr.value}') 146 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
147
[docs] 148 def discard_attr(self, attr: NodeAttr) -> KCFG.Node: 149 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
150
[docs] 151 def let(self, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None) -> KCFG.Node: 152 new_cterm = cterm if cterm is not None else self.cterm 153 new_attrs = attrs if attrs is not None else self.attrs 154 return KCFG.Node(self.id, new_cterm, new_attrs)
155 156 @property 157 def free_vars(self) -> frozenset[str]: 158 return frozenset(self.cterm.free_vars)
159
[docs] 160 class Successor(ABC): 161 source: KCFG.Node 162 163 def __lt__(self, other: Any) -> bool: 164 if not isinstance(other, KCFG.Successor): 165 return NotImplemented 166 return self.source < other.source 167 168 @property 169 def source_vars(self) -> frozenset[str]: 170 return frozenset(self.source.free_vars) 171 172 @property 173 @abstractmethod 174 def targets(self) -> tuple[KCFG.Node, ...]: ... 175 176 @property 177 def target_ids(self) -> list[int]: 178 return sorted(target.id for target in self.targets) 179 180 @property 181 def target_vars(self) -> frozenset[str]: 182 return frozenset(set.union(set(), *(target.free_vars for target in self.targets))) 183
[docs] 184 @abstractmethod 185 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: ...
186
[docs] 187 @abstractmethod 188 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: ...
189
[docs] 190 @abstractmethod 191 def to_dict(self) -> dict[str, Any]: ...
192
[docs] 193 @staticmethod 194 @abstractmethod 195 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: ...
196
[docs] 197 class EdgeLike(Successor): 198 source: KCFG.Node 199 target: KCFG.Node 200 201 @property 202 def targets(self) -> tuple[KCFG.Node, ...]: 203 return (self.target,)
204
[docs] 205 @final 206 @dataclass(frozen=True) 207 class Edge(EdgeLike): 208 source: KCFG.Node 209 target: KCFG.Node 210 depth: int 211 rules: tuple[str, ...] 212
[docs] 213 def to_dict(self) -> dict[str, Any]: 214 return { 215 'source': self.source.id, 216 'target': self.target.id, 217 'depth': self.depth, 218 'rules': list(self.rules), 219 }
220
[docs] 221 @staticmethod 222 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Edge: 223 return KCFG.Edge(nodes[dct['source']], nodes[dct['target']], dct['depth'], tuple(dct['rules']))
224
[docs] 225 def to_rule(self, label: str, claim: bool = False, priority: int | None = None) -> KRuleLike: 226 def is_ceil_condition(kast: KInner) -> bool: 227 return type(kast) is KApply and kast.label.name == '#Ceil' 228 229 def _simplify_config(config: KInner) -> KInner: 230 return sort_ac_collections(inline_cell_maps(config)) 231 232 sentence_id = f'{label}-{self.source.id}-TO-{self.target.id}' 233 init_constraints = [c for c in self.source.cterm.constraints if not is_ceil_condition(c)] 234 init_cterm = CTerm(_simplify_config(self.source.cterm.config), init_constraints) 235 target_constraints = [c for c in self.target.cterm.constraints if not is_ceil_condition(c)] 236 target_cterm = CTerm(_simplify_config(self.target.cterm.config), target_constraints) 237 rule: KRuleLike 238 if claim: 239 rule, _ = cterm_build_claim(sentence_id, init_cterm, target_cterm) 240 else: 241 rule, _ = cterm_build_rule(sentence_id, init_cterm, target_cterm, priority=priority) 242 return rule
243
[docs] 244 def replace_source(self, node: KCFG.Node) -> KCFG.Edge: 245 assert node.id == self.source.id 246 return KCFG.Edge(node, self.target, self.depth, self.rules)
247
[docs] 248 def replace_target(self, node: KCFG.Node) -> KCFG.Edge: 249 assert node.id == self.target.id 250 return KCFG.Edge(self.source, node, self.depth, self.rules)
251
[docs] 252 @final 253 @dataclass(frozen=True) 254 class Cover(EdgeLike): 255 source: KCFG.Node 256 target: KCFG.Node 257 csubst: CSubst 258
[docs] 259 def to_dict(self) -> dict[str, Any]: 260 return { 261 'source': self.source.id, 262 'target': self.target.id, 263 'csubst': self.csubst.to_dict(), 264 }
265
[docs] 266 @staticmethod 267 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Cover: 268 return KCFG.Cover(nodes[dct['source']], nodes[dct['target']], CSubst.from_dict(dct['csubst']))
269
[docs] 270 def replace_source(self, node: KCFG.Node) -> KCFG.Cover: 271 assert node.id == self.source.id 272 return KCFG.Cover(node, self.target, self.csubst)
273
[docs] 274 def replace_target(self, node: KCFG.Node) -> KCFG.Cover: 275 assert node.id == self.target.id 276 return KCFG.Cover(self.source, node, self.csubst)
277
[docs] 278 @dataclass(frozen=True) 279 class MultiEdge(Successor): 280 source: KCFG.Node 281 282 def __lt__(self, other: Any) -> bool: 283 if not type(other) is type(self): 284 return NotImplemented 285 return (self.source, self.target_ids) < (other.source, other.target_ids) 286
[docs] 287 @abstractmethod 288 def with_single_target(self, target: KCFG.Node) -> KCFG.MultiEdge: ...
289
[docs] 290 @final 291 @dataclass(frozen=True) 292 class Split(MultiEdge): 293 source: KCFG.Node 294 _targets: tuple[tuple[KCFG.Node, CSubst], ...] 295 296 def __init__(self, source: KCFG.Node, _targets: Iterable[tuple[KCFG.Node, CSubst]]) -> None: 297 object.__setattr__(self, 'source', source) 298 object.__setattr__(self, '_targets', tuple(_targets)) 299 300 @property 301 def targets(self) -> tuple[KCFG.Node, ...]: 302 return tuple(target for target, _ in self._targets) 303 304 @property 305 def splits(self) -> dict[int, CSubst]: 306 return {target.id: csubst for target, csubst in self._targets} 307
[docs] 308 def to_dict(self) -> dict[str, Any]: 309 return { 310 'source': self.source.id, 311 'targets': [ 312 { 313 'target': target.id, 314 'csubst': csubst.to_dict(), 315 } 316 for target, csubst in self._targets 317 ], 318 }
319
[docs] 320 @staticmethod 321 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Split: 322 _targets = [(nodes[target['target']], CSubst.from_dict(target['csubst'])) for target in dct['targets']] 323 return KCFG.Split(nodes[dct['source']], tuple(_targets))
324
[docs] 325 def with_single_target(self, target: KCFG.Node) -> KCFG.Split: 326 return KCFG.Split(self.source, ((target, self.splits[target.id]),))
327 328 @property 329 def covers(self) -> tuple[KCFG.Cover, ...]: 330 return tuple(KCFG.Cover(target, self.source, csubst) for target, csubst in self._targets) 331
[docs] 332 def replace_source(self, node: KCFG.Node) -> KCFG.Split: 333 assert node.id == self.source.id 334 return KCFG.Split(node, self._targets)
335
[docs] 336 def replace_target(self, node: KCFG.Node) -> KCFG.Split: 337 assert node.id in self.target_ids 338 new_targets = [ 339 (node, csubst) if node.id == target_node.id else (target_node, csubst) 340 for target_node, csubst in self._targets 341 ] 342 return KCFG.Split(self.source, tuple(new_targets))
343
[docs] 344 @final 345 @dataclass(frozen=True) 346 class NDBranch(MultiEdge): 347 source: KCFG.Node 348 _targets: tuple[KCFG.Node, ...] 349 rules: tuple[str, ...] 350 351 def __init__(self, source: KCFG.Node, _targets: Iterable[KCFG.Node], rules: tuple[str, ...]) -> None: 352 object.__setattr__(self, 'source', source) 353 object.__setattr__(self, '_targets', tuple(_targets)) 354 object.__setattr__(self, 'rules', rules) 355 356 @property 357 def targets(self) -> tuple[KCFG.Node, ...]: 358 return self._targets 359
[docs] 360 def to_dict(self) -> dict[str, Any]: 361 return { 362 'source': self.source.id, 363 'targets': [target.id for target in self.targets], 364 'rules': list(self.rules), 365 }
366
[docs] 367 @staticmethod 368 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.NDBranch: 369 return KCFG.NDBranch( 370 nodes[dct['source']], tuple([nodes[target] for target in dct['targets']]), tuple(dct['rules']) 371 )
372
[docs] 373 def with_single_target(self, target: KCFG.Node) -> KCFG.NDBranch: 374 return KCFG.NDBranch(self.source, (target,), ())
375 376 @property 377 def edges(self) -> tuple[KCFG.Edge, ...]: 378 return tuple(KCFG.Edge(self.source, target, 1, ()) for target in self.targets) 379
[docs] 380 def replace_source(self, node: KCFG.Node) -> KCFG.NDBranch: 381 assert node.id == self.source.id 382 return KCFG.NDBranch(node, self._targets, self.rules)
383
[docs] 384 def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch: 385 assert node.id in self.target_ids 386 new_targets = [node if node.id == target_node.id else target_node for target_node in self._targets] 387 return KCFG.NDBranch(self.source, tuple(new_targets), self.rules)
388 389 _node_id: int 390 _nodes: MutableMapping[int, KCFG.Node] 391 392 _created_nodes: set[int] 393 _deleted_nodes: set[int] 394 395 _edges: dict[int, Edge] 396 _covers: dict[int, Cover] 397 _splits: dict[int, Split] 398 _ndbranches: dict[int, NDBranch] 399 _aliases: dict[str, int] 400 _lock: RLock 401 402 _kcfg_store: KCFGStore | None 403 404 def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) -> None: 405 self._node_id = 1 406 if optimize_memory: 407 from .store import OptimizedNodeStore 408 409 self._nodes = OptimizedNodeStore() 410 else: 411 self._nodes = {} 412 self._created_nodes = set() 413 self._deleted_nodes = set() 414 self._edges = {} 415 self._covers = {} 416 self._splits = {} 417 self._ndbranches = {} 418 self._aliases = {} 419 self._lock = RLock() 420 if cfg_dir is not None: 421 self._kcfg_store = KCFGStore(cfg_dir) 422 423 def __contains__(self, item: object) -> bool: 424 if type(item) is KCFG.Node: 425 return self.contains_node(item) 426 if type(item) is KCFG.Edge: 427 return self.contains_edge(item) 428 if type(item) is KCFG.Cover: 429 return self.contains_cover(item) 430 if type(item) is KCFG.Split: 431 return self.contains_split(item) 432 if type(item) is KCFG.NDBranch: 433 return self.contains_ndbranch(item) 434 return False 435 436 def __enter__(self) -> KCFG: 437 self._lock.acquire() 438 return self 439 440 def __exit__( 441 self, 442 exc_type: type[BaseException] | None, 443 exc_value: BaseException | None, 444 traceback: TracebackType | None, 445 ) -> bool: 446 self._lock.release() 447 if exc_type is None: 448 return True 449 return False 450 451 @property 452 def nodes(self) -> list[KCFG.Node]: 453 return list(self._nodes.values()) 454 455 @property 456 def root(self) -> list[KCFG.Node]: 457 return [node for node in self.nodes if self.is_root(node.id)] 458 459 @property 460 def vacuous(self) -> list[KCFG.Node]: 461 return [node for node in self.nodes if self.is_vacuous(node.id)] 462 463 @property 464 def stuck(self) -> list[KCFG.Node]: 465 return [node for node in self.nodes if self.is_stuck(node.id)] 466 467 @property 468 def leaves(self) -> list[KCFG.Node]: 469 return [node for node in self.nodes if self.is_leaf(node.id)] 470 471 @property 472 def covered(self) -> list[KCFG.Node]: 473 return [node for node in self.nodes if self.is_covered(node.id)] 474 475 @property 476 def uncovered(self) -> list[KCFG.Node]: 477 return [node for node in self.nodes if not self.is_covered(node.id)] 478
[docs] 479 @staticmethod 480 def from_claim( 481 defn: KDefinition, claim: KClaim, cfg_dir: Path | None = None, optimize_memory: bool = True 482 ) -> tuple[KCFG, NodeIdLike, NodeIdLike]: 483 cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory) 484 claim_body = claim.body 485 claim_body = defn.instantiate_cell_vars(claim_body) 486 claim_body = rename_generated_vars(claim_body) 487 488 claim_lhs = CTerm.from_kast(extract_lhs(claim_body)).add_constraint(bool_to_ml_pred(claim.requires)) 489 init_node = cfg.create_node(claim_lhs) 490 491 claim_rhs = CTerm.from_kast(extract_rhs(claim_body)).add_constraint( 492 bool_to_ml_pred(andBool([claim.requires, claim.ensures])) 493 ) 494 target_node = cfg.create_node(claim_rhs) 495 496 return cfg, init_node.id, target_node.id
497
[docs] 498 @staticmethod 499 def path_length(_path: Iterable[KCFG.Successor]) -> int: 500 _path = tuple(_path) 501 if len(_path) == 0: 502 return 0 503 if type(_path[0]) is KCFG.Split or type(_path[0]) is KCFG.Cover: 504 return KCFG.path_length(_path[1:]) 505 elif type(_path[0]) is KCFG.NDBranch: 506 return 1 + KCFG.path_length(_path[1:]) 507 elif type(_path[0]) is KCFG.Edge: 508 return _path[0].depth + KCFG.path_length(_path[1:]) 509 raise ValueError(f'Cannot handle Successor type: {type(_path[0])}')
510
[docs] 511 def extend( 512 self, 513 extend_result: KCFGExtendResult, 514 node: KCFG.Node, 515 logs: dict[int, tuple[LogEntry, ...]], 516 ) -> None: 517 match extend_result: 518 case Vacuous(): 519 self.add_vacuous(node.id) 520 521 case Stuck(): 522 self.add_stuck(node.id) 523 524 case Abstract(cterm): 525 new_node = self.create_node(cterm) 526 self.create_cover(node.id, new_node.id) 527 528 case Step(cterm, depth, next_node_logs, rule_labels, _): 529 next_node = self.create_node(cterm) 530 logs[next_node.id] = next_node_logs 531 self.create_edge(node.id, next_node.id, depth, rules=rule_labels) 532 533 case Branch(constraints, _): 534 self.split_on_constraints(node.id, constraints) 535 536 case NDBranch(cterms, next_node_logs, rule_labels): 537 next_ids = [self.create_node(cterm).id for cterm in cterms] 538 for i in next_ids: 539 logs[i] = next_node_logs 540 self.create_ndbranch(node.id, next_ids, rules=rule_labels) 541 542 case _: 543 raise AssertionError()
544
[docs] 545 def to_dict(self) -> dict[str, Any]: 546 nodes = [node.to_dict() for node in self.nodes] 547 edges = [edge.to_dict() for edge in self.edges()] 548 covers = [cover.to_dict() for cover in self.covers()] 549 splits = [split.to_dict() for split in self.splits()] 550 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()] 551 552 aliases = dict(sorted(self._aliases.items())) 553 554 res = { 555 'next': self._node_id, 556 'nodes': nodes, 557 'edges': edges, 558 'covers': covers, 559 'splits': splits, 560 'ndbranches': ndbranches, 561 'aliases': aliases, 562 } 563 return {k: v for k, v in res.items() if v}
564
[docs] 565 @staticmethod 566 def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG: 567 cfg = KCFG(optimize_memory=optimize_memory) 568 569 for node_dict in dct.get('nodes') or []: 570 node = KCFG.Node.from_dict(node_dict) 571 cfg.add_node(node) 572 573 max_id = max([node.id for node in cfg.nodes], default=0) 574 cfg._node_id = dct.get('next', max_id + 1) 575 576 for edge_dict in dct.get('edges') or []: 577 edge = KCFG.Edge.from_dict(edge_dict, cfg._nodes) 578 cfg.add_successor(edge) 579 580 for cover_dict in dct.get('covers') or []: 581 cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes) 582 cfg.add_successor(cover) 583 584 for split_dict in dct.get('splits') or []: 585 split = KCFG.Split.from_dict(split_dict, cfg._nodes) 586 cfg.add_successor(split) 587 588 for ndbranch_dict in dct.get('ndbranches') or []: 589 ndbranch = KCFG.NDBranch.from_dict(ndbranch_dict, cfg._nodes) 590 cfg.add_successor(ndbranch) 591 592 for alias, node_id in dct.get('aliases', {}).items(): 593 cfg.add_alias(alias=alias, node_id=node_id) 594 595 return cfg
596
[docs] 597 def aliases(self, node_id: NodeIdLike) -> list[str]: 598 node_id = self._resolve(node_id) 599 return [alias for alias, value in self._aliases.items() if node_id == value]
600
[docs] 601 def to_json(self) -> str: 602 return json.dumps(self.to_dict(), sort_keys=True)
603
[docs] 604 @staticmethod 605 def from_json(s: str, optimize_memory: bool = True) -> KCFG: 606 return KCFG.from_dict(json.loads(s), optimize_memory=optimize_memory)
607
[docs] 608 def to_rules(self, priority: int = 20) -> list[KRuleLike]: 609 return [e.to_rule('BASIC-BLOCK', priority=priority) for e in self.edges()]
610
[docs] 611 def to_module( 612 self, 613 module_name: str | None = None, 614 imports: Iterable[KImport] = (), 615 priority: int = 20, 616 att: KAtt = EMPTY_ATT, 617 ) -> KFlatModule: 618 module_name = 'KCFG' if module_name is None else module_name 619 return KFlatModule(module_name, self.to_rules(priority=priority), imports=imports, att=att)
620 621 def _resolve_or_none(self, id_like: NodeIdLike) -> int | None: 622 if type(id_like) is int: 623 if id_like in self._nodes: 624 return id_like 625 626 return None 627 628 if type(id_like) is not str: 629 raise TypeError(f'Expected int or str for id_like, got: {id_like}') 630 631 if id_like.startswith('@'): 632 if id_like[1:] in self._aliases: 633 return self._aliases[id_like[1:]] 634 raise ValueError(f'Unknown alias: {id_like}') 635 636 return None 637 638 def _resolve(self, id_like: NodeIdLike) -> int: 639 match = self._resolve_or_none(id_like) 640 if not match: 641 raise ValueError(f'Unknown node: {id_like}') 642 return match 643
[docs] 644 def node(self, node_id: NodeIdLike) -> KCFG.Node: 645 node_id = self._resolve(node_id) 646 return self._nodes[node_id]
647
[docs] 648 def get_node(self, node_id: NodeIdLike) -> KCFG.Node | None: 649 resolved_id = self._resolve_or_none(node_id) 650 if resolved_id is None: 651 return None 652 return self._nodes[resolved_id]
653
[docs] 654 def contains_node(self, node: KCFG.Node) -> bool: 655 return bool(self.get_node(node.id))
656
[docs] 657 def add_node(self, node: KCFG.Node) -> None: 658 if node.id in self._nodes: 659 raise ValueError(f'Node with id already exists: {node.id}') 660 self._nodes[node.id] = node 661 self._created_nodes.add(node.id)
662
[docs] 663 def create_node(self, cterm: CTerm) -> KCFG.Node: 664 node = KCFG.Node(self._node_id, cterm) 665 self._node_id += 1 666 self._nodes[node.id] = node 667 self._created_nodes.add(node.id) 668 return node
669
[docs] 670 def remove_node(self, node_id: NodeIdLike) -> None: 671 node_id = self._resolve(node_id) 672 673 node = self._nodes.pop(node_id) 674 self._created_nodes.discard(node_id) 675 self._deleted_nodes.add(node.id) 676 677 self._edges = {k: s for k, s in self._edges.items() if k != node_id and node_id not in s.target_ids} 678 self._covers = {k: s for k, s in self._covers.items() if k != node_id and node_id not in s.target_ids} 679 680 self._splits = {k: s for k, s in self._splits.items() if k != node_id and node_id not in s.target_ids} 681 self._ndbranches = {k: b for k, b in self._ndbranches.items() if k != node_id and node_id not in b.target_ids} 682 683 for alias in [alias for alias, id in self._aliases.items() if id == node_id]: 684 self.remove_alias(alias)
685 686 def _update_refs(self, node_id: int) -> None: 687 node = self.node(node_id) 688 for succ in self.successors(node_id): 689 new_succ = succ.replace_source(node) 690 if type(new_succ) is KCFG.Edge: 691 self._edges[new_succ.source.id] = new_succ 692 if type(new_succ) is KCFG.Cover: 693 self._covers[new_succ.source.id] = new_succ 694 if type(new_succ) is KCFG.Split: 695 self._splits[new_succ.source.id] = new_succ 696 if type(new_succ) is KCFG.NDBranch: 697 self._ndbranches[new_succ.source.id] = new_succ 698 699 for pred in self.predecessors(node_id): 700 new_pred = pred.replace_target(node) 701 if type(new_pred) is KCFG.Edge: 702 self._edges[new_pred.source.id] = new_pred 703 if type(new_pred) is KCFG.Cover: 704 self._covers[new_pred.source.id] = new_pred 705 if type(new_pred) is KCFG.Split: 706 self._splits[new_pred.source.id] = new_pred 707 if type(new_pred) is KCFG.NDBranch: 708 self._ndbranches[new_pred.source.id] = new_pred 709
[docs] 710 def remove_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 711 node = self.node(node_id) 712 new_node = node.remove_attr(attr) 713 self.replace_node(new_node)
714
[docs] 715 def discard_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 716 node = self.node(node_id) 717 new_node = node.discard_attr(attr) 718 self.replace_node(new_node)
719
[docs] 720 def add_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 721 node = self.node(node_id) 722 new_node = node.add_attr(attr) 723 self.replace_node(new_node)
724
[docs] 725 def let_node( 726 self, node_id: NodeIdLike, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None 727 ) -> None: 728 node = self.node(node_id) 729 new_node = node.let(cterm=cterm, attrs=attrs) 730 self.replace_node(new_node)
731
[docs] 732 def replace_node(self, node: KCFG.Node) -> None: 733 self._nodes[node.id] = node 734 self._created_nodes.add(node.id) 735 self._update_refs(node.id)
736
[docs] 737 def successors(self, source_id: NodeIdLike) -> list[Successor]: 738 out_edges: Iterable[KCFG.Successor] = self.edges(source_id=source_id) 739 out_covers: Iterable[KCFG.Successor] = self.covers(source_id=source_id) 740 out_splits: Iterable[KCFG.Successor] = self.splits(source_id=source_id) 741 out_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(source_id=source_id) 742 return list(out_edges) + list(out_covers) + list(out_splits) + list(out_ndbranches)
743
[docs] 744 def predecessors(self, target_id: NodeIdLike) -> list[Successor]: 745 in_edges: Iterable[KCFG.Successor] = self.edges(target_id=target_id) 746 in_covers: Iterable[KCFG.Successor] = self.covers(target_id=target_id) 747 in_splits: Iterable[KCFG.Successor] = self.splits(target_id=target_id) 748 in_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(target_id=target_id) 749 return list(in_edges) + list(in_covers) + list(in_splits) + list(in_ndbranches)
750 751 def _check_no_successors(self, source_id: NodeIdLike) -> None: 752 if len(self.successors(source_id)) > 0: 753 raise ValueError(f'Node already has successors: {source_id} -> {self.successors(source_id)}') 754 755 def _check_no_zero_loops(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> None: 756 for target_id in target_ids: 757 path = self.shortest_path_between(target_id, source_id) 758 if path is not None and KCFG.path_length(path) == 0: 759 raise ValueError( 760 f'Adding successor would create zero-length loop with backedge: {source_id} -> {target_id}' 761 ) 762
[docs] 763 def add_successor(self, succ: KCFG.Successor) -> None: 764 self._check_no_successors(succ.source.id) 765 self._check_no_zero_loops(succ.source.id, succ.target_ids) 766 if type(succ) is KCFG.Edge: 767 self._edges[succ.source.id] = succ 768 elif type(succ) is KCFG.Cover: 769 self._covers[succ.source.id] = succ 770 else: 771 if len(succ.target_ids) <= 1: 772 raise ValueError( 773 f'Cannot create {type(succ)} node with less than 2 targets: {succ.source.id} -> {succ.target_ids}' 774 ) 775 if type(succ) is KCFG.Split: 776 self._splits[succ.source.id] = succ 777 elif type(succ) is KCFG.NDBranch: 778 self._ndbranches[succ.source.id] = succ
779
[docs] 780 def edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Edge | None: 781 source_id = self._resolve(source_id) 782 target_id = self._resolve(target_id) 783 edge = self._edges.get(source_id, None) 784 return edge if edge is not None and edge.target.id == target_id else None
785
[docs] 786 def edges(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Edge]: 787 source_id = self._resolve(source_id) if source_id is not None else None 788 target_id = self._resolve(target_id) if target_id is not None else None 789 return [ 790 edge 791 for edge in self._edges.values() 792 if (source_id is None or source_id == edge.source.id) and (target_id is None or target_id == edge.target.id) 793 ]
794
[docs] 795 def contains_edge(self, edge: Edge) -> bool: 796 if other := self.edge(source_id=edge.source.id, target_id=edge.target.id): 797 return edge == other 798 return False
799
[docs] 800 def create_edge(self, source_id: NodeIdLike, target_id: NodeIdLike, depth: int, rules: Iterable[str] = ()) -> Edge: 801 if depth <= 0: 802 raise ValueError(f'Cannot build KCFG Edge with non-positive depth: {depth}') 803 source = self.node(source_id) 804 target = self.node(target_id) 805 edge = KCFG.Edge(source, target, depth, tuple(rules)) 806 self.add_successor(edge) 807 return edge
808
[docs] 809 def remove_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 810 source_id = self._resolve(source_id) 811 target_id = self._resolve(target_id) 812 edge = self.edge(source_id, target_id) 813 if not edge: 814 raise ValueError(f'Edge does not exist: {source_id} -> {target_id}') 815 self._edges.pop(source_id)
816
[docs] 817 def cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Cover | None: 818 source_id = self._resolve(source_id) 819 target_id = self._resolve(target_id) 820 cover = self._covers.get(source_id, None) 821 return cover if cover is not None and cover.target.id == target_id else None
822
[docs] 823 def covers(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Cover]: 824 source_id = self._resolve(source_id) if source_id is not None else None 825 target_id = self._resolve(target_id) if target_id is not None else None 826 return [ 827 cover 828 for cover in self._covers.values() 829 if (source_id is None or source_id == cover.source.id) 830 and (target_id is None or target_id == cover.target.id) 831 ]
832
[docs] 833 def contains_cover(self, cover: Cover) -> bool: 834 if other := self.cover(source_id=cover.source.id, target_id=cover.target.id): 835 return cover == other 836 return False
837
[docs] 838 def create_cover(self, source_id: NodeIdLike, target_id: NodeIdLike, csubst: CSubst | None = None) -> Cover: 839 source = self.node(source_id) 840 target = self.node(target_id) 841 if csubst is None: 842 csubst = target.cterm.match_with_constraint(source.cterm) 843 if csubst is None: 844 raise ValueError(f'No matching between: {source.id} and {target.id}') 845 cover = KCFG.Cover(source, target, csubst=csubst) 846 self.add_successor(cover) 847 return cover
848
[docs] 849 def remove_cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 850 source_id = self._resolve(source_id) 851 target_id = self._resolve(target_id) 852 cover = self.cover(source_id, target_id) 853 if not cover: 854 raise ValueError(f'Cover does not exist: {source_id} -> {target_id}') 855 self._covers.pop(source_id)
856
[docs] 857 def edge_likes(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[EdgeLike]: 858 return cast('List[KCFG.EdgeLike]', self.edges(source_id=source_id, target_id=target_id)) + cast( 859 'List[KCFG.EdgeLike]', self.covers(source_id=source_id, target_id=target_id) 860 )
861
[docs] 862 def add_vacuous(self, node_id: NodeIdLike) -> None: 863 self.add_attr(node_id, KCFGNodeAttr.VACUOUS)
864
[docs] 865 def remove_vacuous(self, node_id: NodeIdLike) -> None: 866 self.remove_attr(node_id, KCFGNodeAttr.VACUOUS)
867
[docs] 868 def discard_vacuous(self, node_id: NodeIdLike) -> None: 869 self.discard_attr(node_id, KCFGNodeAttr.VACUOUS)
870
[docs] 871 def add_stuck(self, node_id: NodeIdLike) -> None: 872 self.add_attr(node_id, KCFGNodeAttr.STUCK)
873
[docs] 874 def remove_stuck(self, node_id: NodeIdLike) -> None: 875 self.remove_attr(node_id, KCFGNodeAttr.STUCK)
876
[docs] 877 def discard_stuck(self, node_id: NodeIdLike) -> None: 878 self.discard_attr(node_id, KCFGNodeAttr.STUCK)
879
[docs] 880 def splits(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Split]: 881 source_id = self._resolve(source_id) if source_id is not None else None 882 target_id = self._resolve(target_id) if target_id is not None else None 883 return [ 884 s 885 for s in self._splits.values() 886 if (source_id is None or source_id == s.source.id) and (target_id is None or target_id in s.target_ids) 887 ]
888
[docs] 889 def contains_split(self, split: Split) -> bool: 890 return split in self._splits.values()
891
[docs] 892 def create_split(self, source_id: NodeIdLike, splits: Iterable[tuple[NodeIdLike, CSubst]]) -> KCFG.Split: 893 source_id = self._resolve(source_id) 894 split = KCFG.Split(self.node(source_id), tuple((self.node(nid), csubst) for nid, csubst in list(splits))) 895 self.add_successor(split) 896 return split
897
[docs] 898 def ndbranches(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[NDBranch]: 899 source_id = self._resolve(source_id) if source_id is not None else None 900 target_id = self._resolve(target_id) if target_id is not None else None 901 return [ 902 b 903 for b in self._ndbranches.values() 904 if (source_id is None or source_id == b.source.id) and (target_id is None or target_id in b.target_ids) 905 ]
906
[docs] 907 def contains_ndbranch(self, ndbranch: NDBranch) -> bool: 908 return ndbranch in self._ndbranches
909
[docs] 910 def create_ndbranch( 911 self, source_id: NodeIdLike, ndbranches: Iterable[NodeIdLike], rules: Iterable[str] = () 912 ) -> KCFG.NDBranch: 913 source_id = self._resolve(source_id) 914 ndbranch = KCFG.NDBranch(self.node(source_id), tuple(self.node(nid) for nid in list(ndbranches)), tuple(rules)) 915 self.add_successor(ndbranch) 916 return ndbranch
917
[docs] 918 def split_on_constraints(self, source_id: NodeIdLike, constraints: Iterable[KInner]) -> list[int]: 919 source = self.node(source_id) 920 branch_node_ids = [self.create_node(source.cterm.add_constraint(c)).id for c in constraints] 921 csubsts = [not_none(source.cterm.match_with_constraint(self.node(id).cterm)) for id in branch_node_ids] 922 csubsts = [ 923 reduce(CSubst.add_constraint, flatten_label('#And', constraint), csubst) 924 for (csubst, constraint) in zip(csubsts, constraints, strict=True) 925 ] 926 self.create_split(source.id, zip(branch_node_ids, csubsts, strict=True)) 927 return branch_node_ids
928
[docs] 929 def lift_edge(self, b_id: NodeIdLike) -> None: 930 """Lift an edge up another edge directly preceding it. 931 932 Input: 933 - b_id: the identifier of the central node `B` of a sequence of edges `A --> B --> C`. 934 935 Effect: `A --M steps--> B --N steps--> C` becomes `A --(M + N) steps--> C`. Node `B` is removed. 936 937 Output: None 938 939 Raises: An `AssertionError` if the edges in question are not in place. 940 """ 941 942 # Obtain edges `A -> B`, `B -> C` 943 a_to_b = single(self.edges(target_id=b_id)) 944 b_to_c = single(self.edges(source_id=b_id)) 945 # Remove the node `B`, effectively removing the entire initial structure 946 self.remove_node(b_id) 947 # Create edge `A -> C` 948 self.create_edge(a_to_b.source.id, b_to_c.target.id, a_to_b.depth + b_to_c.depth, a_to_b.rules + b_to_c.rules)
949
[docs] 950 def lift_edges(self) -> bool: 951 """Perform all possible edge lifts across the KCFG. 952 953 Given the KCFG design, it is not possible for one edge lift to either disallow another or 954 allow another that was not previously possible. Therefore, this function is guaranteed to 955 lift all possible edges without having to loop. 956 957 Input: None 958 959 Effect: The KCFG is transformed to an equivalent in which no further edge lifts are possible. 960 961 Output: 962 - bool: An indicator of whether or not at least one edge lift was performed. 963 """ 964 965 edges_to_lift = sorted( 966 [ 967 node.id 968 for node in self.nodes 969 if len(self.edges(source_id=node.id)) > 0 and len(self.edges(target_id=node.id)) > 0 970 ] 971 ) 972 for node_id in edges_to_lift: 973 self.lift_edge(node_id) 974 return len(edges_to_lift) > 0
975
[docs] 976 def lift_split_edge(self, b_id: NodeIdLike) -> None: 977 """Lift a split up an edge directly preceding it. 978 979 Input: 980 - b_id: the identifier of the central node `B` of the structure `A --> B --> [C_1, ..., C_N]`. 981 982 Effect: 983 `A --M steps--> B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` becomes 984 `A --[cond_1, ..., cond_N]--> [A #And cond_1 --M steps--> C_1, ..., A #And cond_N --M steps--> C_N]`. 985 Node `B` is removed. 986 987 Output: None 988 989 Raises: 990 - `AssertionError`, if the structure in question is not in place. 991 - `AssertionError`, if any of the `cond_i` contain variables not present in `A`. 992 """ 993 994 # Obtain edge `A -> B`, split `[cond_I, C_I | I = 1..N ]` 995 a_to_b = single(self.edges(target_id=b_id)) 996 a = a_to_b.source 997 split_from_b = single(self.splits(source_id=b_id)) 998 ci, csubsts = list(split_from_b.splits.keys()), list(split_from_b.splits.values()) 999 # Ensure split can be lifted soundly (i.e., that it does not introduce fresh variables) 1000 assert ( 1001 len(split_from_b.source_vars.difference(a.free_vars)) == 0 1002 and len(split_from_b.target_vars.difference(split_from_b.source_vars)) == 0 1003 ) 1004 # Create CTerms and CSubsts corresponding to the new targets of the split 1005 new_cterms_with_constraints = [ 1006 (CTerm(a.cterm.config, a.cterm.constraints + csubst.constraints), csubst.constraint) for csubst in csubsts 1007 ] 1008 # Generate substitutions for new targets, which all exist by construction 1009 new_csubsts = [ 1010 not_none(a.cterm.match_with_constraint(cterm)).add_constraint(constraint) 1011 for (cterm, constraint) in new_cterms_with_constraints 1012 ] 1013 # Remove the node `B`, effectively removing the entire initial structure 1014 self.remove_node(b_id) 1015 # Create the nodes `[ A #And cond_I | I = 1..N ]`. 1016 ai: list[NodeIdLike] = [self.create_node(cterm).id for (cterm, _) in new_cterms_with_constraints] 1017 # Create the edges `[A #And cond_1 --M steps--> C_I | I = 1..N ]` 1018 for i in range(len(ai)): 1019 self.create_edge(ai[i], ci[i], a_to_b.depth, a_to_b.rules) 1020 # Create the split `A --[cond_1, ..., cond_N]--> [A #And cond_1, ..., A #And cond_N] 1021 self.create_split(a.id, zip(ai, new_csubsts, strict=True))
1022
[docs] 1023 def lift_split_split(self, b_id: NodeIdLike) -> None: 1024 """Lift a split up a split directly preceding it, joining them into a single split. 1025 1026 Input: 1027 - b_id: the identifier of the node `B` of the structure 1028 `A --[..., cond_B, ...]--> [..., B, ...]` with `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` 1029 1030 Effect: 1031 `A --[..., cond_B, ...]--> [..., B, ...]` with `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` becomes 1032 `A --[..., cond_B #And cond_1, ..., cond_B #And cond_N, ...]--> [..., C_1, ..., C_N, ...]`. 1033 Node `B` is removed. 1034 1035 Output: None 1036 1037 Raises: 1038 - `AssertionError`, if the structure in question is not in place. 1039 """ 1040 # Obtain splits `A --[..., cond_B, ...]--> [..., B, ...]` and 1041 # `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]-> [C_1, ..., C_N]` 1042 split_from_a, split_from_b = single(self.splits(target_id=b_id)), single(self.splits(source_id=b_id)) 1043 splits_from_a, splits_from_b = split_from_a.splits, split_from_b.splits 1044 a = split_from_a.source 1045 ci = list(splits_from_b.keys()) 1046 # Ensure split can be lifted soundly (i.e., that it does not introduce fresh variables) 1047 assert ( 1048 len(split_from_b.source_vars.difference(a.free_vars)) == 0 1049 and len(split_from_b.target_vars.difference(split_from_b.source_vars)) == 0 1050 ) 1051 # Get the substitution for `B`, at the same time removing 'B' from the targets of `A`. 1052 csubst_b = splits_from_a.pop(self._resolve(b_id)) 1053 # Generate substitutions for additional targets `C_I`, which all exist by construction; 1054 # the constraints are cumulative, resulting in `cond_B #And cond_I` 1055 additional_csubsts = [ 1056 not_none(a.cterm.match_with_constraint(self.node(ci).cterm)) 1057 .add_constraint(csubst.constraint) 1058 .add_constraint(csubst_b.constraint) 1059 for ci, csubst in splits_from_b.items() 1060 ] 1061 # Create the targets of the new split 1062 ci = list(splits_from_b.keys()) 1063 new_splits = zip( 1064 list(splits_from_a.keys()) + ci, list(splits_from_a.values()) + additional_csubsts, strict=True 1065 ) 1066 # Remove the node `B`, thereby removing the two splits as well 1067 self.remove_node(b_id) 1068 # Create the new split `A --[..., cond_B #And cond_1, ..., cond_B #And cond_N, ...]--> [..., C_1, ..., C_N, ...]` 1069 self.create_split(a.id, new_splits)
1070
[docs] 1071 def lift_splits(self) -> bool: 1072 """Perform all possible split liftings. 1073 1074 Input: None 1075 1076 Effect: The KCFG is transformed to an equivalent in which no further split lifts are possible. 1077 1078 Output: 1079 - bool: An indicator of whether or not at least one split lift was performed. 1080 """ 1081 1082 def _lift_split(finder: Callable, lifter: Callable) -> bool: 1083 result = False 1084 while True: 1085 splits_to_lift = sorted( 1086 [ 1087 node.id 1088 for node in self.nodes 1089 if (splits := self.splits(source_id=node.id)) != [] 1090 and (sources := finder(target_id=node.id)) != [] 1091 and (source := single(sources).source) 1092 and (split := single(splits)) 1093 and len(split.source_vars.difference(source.free_vars)) == 0 1094 and len(split.target_vars.difference(split.source_vars)) == 0 1095 ] 1096 ) 1097 for id in splits_to_lift: 1098 lifter(id) 1099 result = True 1100 if len(splits_to_lift) == 0: 1101 break 1102 return result 1103 1104 def _fold_lift(result: bool, finder_lifter: tuple[Callable, Callable]) -> bool: 1105 return _lift_split(finder_lifter[0], finder_lifter[1]) or result 1106 1107 return reduce(_fold_lift, [(self.edges, self.lift_split_edge), (self.splits, self.lift_split_split)], False)
1108
[docs] 1109 def minimize(self) -> None: 1110 """Minimize KCFG by repeatedly performing the lifting transformations. 1111 1112 The loop is designed so that each transformation is performed once in each iteration. 1113 1114 Input: None 1115 1116 Effect: The KCFG is transformed to an equivalent in which no further lifting transformations are possible. 1117 1118 Output: None 1119 """ 1120 1121 repeat = True 1122 while repeat: 1123 repeat = self.lift_edges() 1124 repeat = self.lift_splits() or repeat
1125
[docs] 1126 def add_alias(self, alias: str, node_id: NodeIdLike) -> None: 1127 if '@' in alias: 1128 raise ValueError('Alias may not contain "@"') 1129 if alias in self._aliases: 1130 raise ValueError(f'Duplicate alias: {alias}') 1131 node_id = self._resolve(node_id) 1132 self._aliases[alias] = node_id
1133
[docs] 1134 def remove_alias(self, alias: str) -> None: 1135 if alias not in self._aliases: 1136 raise ValueError(f'Alias does not exist: {alias}') 1137 self._aliases.pop(alias)
1138
[docs] 1139 def is_root(self, node_id: NodeIdLike) -> bool: 1140 node_id = self._resolve(node_id) 1141 return len(self.predecessors(node_id)) == 0
1142
[docs] 1143 def is_vacuous(self, node_id: NodeIdLike) -> bool: 1144 return KCFGNodeAttr.VACUOUS in self.node(node_id).attrs
1145
[docs] 1146 def is_stuck(self, node_id: NodeIdLike) -> bool: 1147 return KCFGNodeAttr.STUCK in self.node(node_id).attrs
1148
[docs] 1149 def is_split(self, node_id: NodeIdLike) -> bool: 1150 node_id = self._resolve(node_id) 1151 return node_id in self._splits
1152
[docs] 1153 def is_ndbranch(self, node_id: NodeIdLike) -> bool: 1154 node_id = self._resolve(node_id) 1155 return node_id in self._ndbranches
1156
[docs] 1157 def is_leaf(self, node_id: NodeIdLike) -> bool: 1158 return len(self.successors(node_id)) == 0
1159
[docs] 1160 def is_covered(self, node_id: NodeIdLike) -> bool: 1161 node_id = self._resolve(node_id) 1162 return node_id in self._covers
1163
[docs] 1164 def prune(self, node_id: NodeIdLike, keep_nodes: Iterable[NodeIdLike] = ()) -> list[int]: 1165 nodes = self.reachable_nodes(node_id) 1166 keep_nodes = [self._resolve(nid) for nid in keep_nodes] 1167 pruned_nodes: list[int] = [] 1168 for node in nodes: 1169 if node.id not in keep_nodes: 1170 self.remove_node(node.id) 1171 pruned_nodes.append(node.id) 1172 return pruned_nodes
1173
[docs] 1174 def shortest_path_between( 1175 self, source_node_id: NodeIdLike, target_node_id: NodeIdLike 1176 ) -> tuple[Successor, ...] | None: 1177 paths = self.paths_between(source_node_id, target_node_id) 1178 if len(paths) == 0: 1179 return None 1180 return sorted(paths, key=(lambda path: KCFG.path_length(path)))[0]
1181
[docs] 1182 def shortest_distance_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> int | None: 1183 path_1 = self.shortest_path_between(node_1_id, node_2_id) 1184 path_2 = self.shortest_path_between(node_2_id, node_1_id) 1185 distance: int | None = None 1186 if path_1 is not None: 1187 distance = KCFG.path_length(path_1) 1188 if path_2 is not None: 1189 distance_2 = KCFG.path_length(path_2) 1190 if distance is None or distance_2 < distance: 1191 distance = distance_2 1192 return distance
1193
[docs] 1194 def zero_depth_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> bool: 1195 shortest_distance = self.shortest_distance_between(node_1_id, node_2_id) 1196 return shortest_distance is not None and shortest_distance == 0
1197
[docs] 1198 def paths_between(self, source_id: NodeIdLike, target_id: NodeIdLike) -> list[tuple[Successor, ...]]: 1199 source_id = self._resolve(source_id) 1200 target_id = self._resolve(target_id) 1201 1202 if source_id == target_id: 1203 return [()] 1204 1205 source_successors = list(self.successors(source_id)) 1206 assert len(source_successors) <= 1 1207 if len(source_successors) == 0: 1208 return [] 1209 1210 paths: list[tuple[KCFG.Successor, ...]] = [] 1211 worklist: list[list[KCFG.Successor]] = [[source_successors[0]]] 1212 1213 def _in_path(_nid: int, _path: list[KCFG.Successor]) -> bool: 1214 for succ in _path: 1215 if _nid == succ.source.id: 1216 return True 1217 if len(_path) > 0: 1218 if isinstance(_path[-1], KCFG.EdgeLike) and _path[-1].target.id == _nid: 1219 return True 1220 elif isinstance(_path[-1], KCFG.MultiEdge) and _nid in _path[-1].target_ids: 1221 return True 1222 return False 1223 1224 while worklist: 1225 curr_path = worklist.pop() 1226 curr_successor = curr_path[-1] 1227 successors: list[KCFG.Successor] = [] 1228 1229 if isinstance(curr_successor, KCFG.EdgeLike): 1230 if curr_successor.target.id == target_id: 1231 paths.append(tuple(curr_path)) 1232 continue 1233 else: 1234 successors = list(self.successors(curr_successor.target.id)) 1235 1236 elif isinstance(curr_successor, KCFG.MultiEdge): 1237 if len(list(curr_successor.targets)) == 1: 1238 target = list(curr_successor.targets)[0] 1239 if target.id == target_id: 1240 paths.append(tuple(curr_path)) 1241 continue 1242 else: 1243 successors = list(self.successors(target.id)) 1244 if len(list(curr_successor.targets)) > 1: 1245 curr_path = curr_path[0:-1] 1246 successors = [curr_successor.with_single_target(target) for target in curr_successor.targets] 1247 1248 for successor in successors: 1249 if isinstance(successor, KCFG.EdgeLike) and not _in_path(successor.target.id, curr_path): 1250 worklist.append(curr_path + [successor]) 1251 elif isinstance(successor, KCFG.MultiEdge): 1252 if len(list(successor.targets)) == 1: 1253 target = list(successor.targets)[0] 1254 if not _in_path(target.id, curr_path): 1255 worklist.append(curr_path + [successor]) 1256 elif len(list(successor.targets)) > 1: 1257 worklist.append(curr_path + [successor]) 1258 1259 return paths
1260
[docs] 1261 def reachable_nodes(self, source_id: NodeIdLike, *, reverse: bool = False) -> set[KCFG.Node]: 1262 visited: set[KCFG.Node] = set() 1263 worklist: list[KCFG.Node] = [self.node(source_id)] 1264 1265 while worklist: 1266 node = worklist.pop() 1267 1268 if node in visited: 1269 continue 1270 1271 visited.add(node) 1272 1273 if not reverse: 1274 worklist.extend(target for succ in self.successors(source_id=node.id) for target in succ.targets) 1275 else: 1276 worklist.extend(succ.source for succ in self.predecessors(target_id=node.id)) 1277 1278 return visited
1279
[docs] 1280 def write_cfg_data(self) -> None: 1281 assert self._kcfg_store is not None 1282 self._kcfg_store.write_cfg_data( 1283 self.to_dict(), deleted_nodes=self._deleted_nodes, created_nodes=self._created_nodes 1284 ) 1285 self._deleted_nodes.clear() 1286 self._created_nodes.clear()
1287
[docs] 1288 @staticmethod 1289 def read_cfg_data(cfg_dir: Path) -> KCFG: 1290 store = KCFGStore(cfg_dir) 1291 cfg = KCFG.from_dict(store.read_cfg_data()) 1292 cfg._kcfg_store = store 1293 return cfg
1294
[docs] 1295 @staticmethod 1296 def read_node_data(cfg_dir: Path, node_id: int) -> KCFG.Node: 1297 store = KCFGStore(cfg_dir) 1298 return KCFG.Node.from_dict(store.read_node_data(node_id))
1299 1300
[docs] 1301class KCFGExtendResult(ABC): ...
1302 1303
[docs] 1304@final 1305@dataclass(frozen=True) 1306class Vacuous(KCFGExtendResult): ...
1307 1308
[docs] 1309@final 1310@dataclass(frozen=True) 1311class Stuck(KCFGExtendResult): ...
1312 1313
[docs] 1314@final 1315@dataclass(frozen=True) 1316class Abstract(KCFGExtendResult): 1317 cterm: CTerm
1318 1319
[docs] 1320@final 1321@dataclass(frozen=True) 1322class Step(KCFGExtendResult): 1323 cterm: CTerm 1324 depth: int 1325 logs: tuple[LogEntry, ...] 1326 rule_labels: list[str] 1327 cut: bool = field(default=False)
1328 1329
[docs] 1330@final 1331@dataclass(frozen=True) 1332class Branch(KCFGExtendResult): 1333 constraints: tuple[KInner, ...] 1334 heuristic: bool 1335 1336 def __init__(self, constraints: Iterable[KInner], *, heuristic: bool = False): 1337 object.__setattr__(self, 'constraints', tuple(constraints)) 1338 object.__setattr__(self, 'heuristic', heuristic)
1339 1340
[docs] 1341@final 1342@dataclass(frozen=True) 1343class NDBranch(KCFGExtendResult): 1344 cterms: tuple[CTerm, ...] 1345 logs: tuple[LogEntry, ...] 1346 rule_labels: tuple[str, ...] 1347 1348 def __init__(self, cterms: Iterable[CTerm], logs: Iterable[LogEntry,], rule_labels: Iterable[str]): 1349 object.__setattr__(self, 'cterms', tuple(cterms)) 1350 object.__setattr__(self, 'logs', tuple(logs)) 1351 object.__setattr__(self, 'rule_labels', tuple(rule_labels))