1from __future__ import annotations
   2
   3import json
   4import logging
   5from abc import ABC, abstractmethod
   6from collections.abc import Container
   7from dataclasses import dataclass, field
   8from functools import reduce
   9from threading import RLock
  10from typing import TYPE_CHECKING, Final, List, Union, cast, final
  11
  12from ..cterm import CSubst, CTerm, cterm_build_claim, cterm_build_rule
  13from ..kast import EMPTY_ATT
  14from ..kast.inner import KApply
  15from ..kast.manip import (
  16    bool_to_ml_pred,
  17    extract_lhs,
  18    extract_rhs,
  19    flatten_label,
  20    inline_cell_maps,
  21    rename_generated_vars,
  22    sort_ac_collections,
  23)
  24from ..kast.outer import KFlatModule
  25from ..prelude.kbool import andBool
  26from ..utils import ensure_dir_path, not_none, single
  27
  28if TYPE_CHECKING:
  29    from collections.abc import Callable, Iterable, Mapping, MutableMapping
  30    from pathlib import Path
  31    from types import TracebackType
  32    from typing import Any
  33
  34    from pyk.kore.rpc import LogEntry
  35
  36    from ..kast import KAtt
  37    from ..kast.inner import KInner
  38    from ..kast.outer import KClaim, KDefinition, KImport, KRuleLike
  39
  40
  41NodeIdLike = int | str
  42
  43_LOGGER: Final = logging.getLogger(__name__)
  44
  45
[docs]
  46@dataclass(frozen=True)
  47class NodeAttr:
  48    value: str 
  49
  50
[docs]
  51class KCFGNodeAttr(NodeAttr):
  52    VACUOUS = NodeAttr('vacuous')
  53    STUCK = NodeAttr('stuck') 
  54
  55
[docs]
  56class KCFGStore:
  57    store_path: Path
  58
  59    def __init__(self, store_path: Path) -> None:
  60        self.store_path = store_path
  61        ensure_dir_path(store_path)
  62        ensure_dir_path(self.kcfg_node_dir)
  63
  64    @property
  65    def kcfg_json_path(self) -> Path:
  66        return self.store_path / 'kcfg.json'
  67
  68    @property
  69    def kcfg_node_dir(self) -> Path:
  70        return self.store_path / 'nodes'
  71
[docs]
  72    def kcfg_node_path(self, node_id: int) -> Path:
  73        return self.kcfg_node_dir / f'{node_id}.json' 
  74
[docs]
  75    def write_cfg_data(
  76        self, dct: dict[str, Any], deleted_nodes: Iterable[int] = (), created_nodes: Iterable[int] = ()
  77    ) -> None:
  78        node_dict = {node_dct['id']: node_dct for node_dct in dct['nodes']}
  79        vacuous_nodes = [
  80            node_id for node_id in node_dict.keys() if KCFGNodeAttr.VACUOUS.value in node_dict[node_id]['attrs']
  81        ]
  82        stuck_nodes = [
  83            node_id for node_id in node_dict.keys() if KCFGNodeAttr.STUCK.value in node_dict[node_id]['attrs']
  84        ]
  85        dct['vacuous'] = vacuous_nodes
  86        dct['stuck'] = stuck_nodes
  87        for node_id in deleted_nodes:
  88            self.kcfg_node_path(node_id).unlink(missing_ok=True)
  89        for node_id in created_nodes:
  90            del node_dict[node_id]['attrs']
  91            self.kcfg_node_path(node_id).write_text(json.dumps(node_dict[node_id]))
  92        dct['nodes'] = list(node_dict.keys())
  93        self.kcfg_json_path.write_text(json.dumps(dct)) 
  94
[docs]
  95    def read_cfg_data(self) -> dict[str, Any]:
  96        dct = json.loads(self.kcfg_json_path.read_text())
  97        nodes = [self.read_node_data(node_id) for node_id in dct.get('nodes') or []]
  98        dct['nodes'] = nodes
  99
 100        new_nodes = []
 101        for node in dct['nodes']:
 102            attrs = []
 103            if node['id'] in dct['vacuous']:
 104                attrs.append(KCFGNodeAttr.VACUOUS.value)
 105            if node['id'] in dct['stuck']:
 106                attrs.append(KCFGNodeAttr.STUCK.value)
 107            new_nodes.append({'id': node['id'], 'cterm': node['cterm'], 'attrs': attrs})
 108
 109        dct['nodes'] = new_nodes
 110
 111        del dct['vacuous']
 112        del dct['stuck']
 113
 114        return dct 
 115
[docs]
 116    def read_node_data(self, node_id: int) -> dict[str, Any]:
 117        return json.loads(self.kcfg_node_path(node_id).read_text()) 
 
 118
 119
[docs]
 120class KCFG(Container[Union['KCFG.Node', 'KCFG.Successor']]):
[docs]
 121    @final
 122    @dataclass(frozen=True, order=True)
 123    class Node:
 124        id: int
 125        cterm: CTerm
 126        attrs: frozenset[NodeAttr]
 127
 128        def __init__(self, id: int, cterm: CTerm, attrs: Iterable[NodeAttr] = ()) -> None:
 129            object.__setattr__(self, 'id', id)
 130            object.__setattr__(self, 'cterm', cterm)
 131            object.__setattr__(self, 'attrs', frozenset(attrs))
 132
[docs]
 133        def to_dict(self) -> dict[str, Any]:
 134            return {'id': self.id, 'cterm': self.cterm.to_dict(), 'attrs': [attr.value for attr in self.attrs]} 
 135
[docs]
 136        @staticmethod
 137        def from_dict(dct: dict[str, Any]) -> KCFG.Node:
 138            return KCFG.Node(dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']]) 
 139
[docs]
 140        def add_attr(self, attr: NodeAttr) -> KCFG.Node:
 141            return KCFG.Node(self.id, self.cterm, list(self.attrs) + [attr]) 
 142
[docs]
 143        def remove_attr(self, attr: NodeAttr) -> KCFG.Node:
 144            if attr not in self.attrs:
 145                raise ValueError(f'Node {self.id} does not have attribute {attr.value}')
 146            return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr])) 
 147
[docs]
 148        def discard_attr(self, attr: NodeAttr) -> KCFG.Node:
 149            return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr])) 
 150
[docs]
 151        def let(self, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None) -> KCFG.Node:
 152            new_cterm = cterm if cterm is not None else self.cterm
 153            new_attrs = attrs if attrs is not None else self.attrs
 154            return KCFG.Node(self.id, new_cterm, new_attrs) 
 155
 156        @property
 157        def free_vars(self) -> frozenset[str]:
 158            return frozenset(self.cterm.free_vars) 
 159
[docs]
 160    class Successor(ABC):
 161        source: KCFG.Node
 162
 163        def __lt__(self, other: Any) -> bool:
 164            if not isinstance(other, KCFG.Successor):
 165                return NotImplemented
 166            return self.source < other.source
 167
 168        @property
 169        def source_vars(self) -> frozenset[str]:
 170            return frozenset(self.source.free_vars)
 171
 172        @property
 173        @abstractmethod
 174        def targets(self) -> tuple[KCFG.Node, ...]: ...
 175
 176        @property
 177        def target_ids(self) -> list[int]:
 178            return sorted(target.id for target in self.targets)
 179
 180        @property
 181        def target_vars(self) -> frozenset[str]:
 182            return frozenset(set.union(set(), *(target.free_vars for target in self.targets)))
 183
[docs]
 184        @abstractmethod
 185        def replace_source(self, node: KCFG.Node) -> KCFG.Successor: ... 
 186
[docs]
 187        @abstractmethod
 188        def replace_target(self, node: KCFG.Node) -> KCFG.Successor: ... 
 189
[docs]
 190        @abstractmethod
 191        def to_dict(self) -> dict[str, Any]: ... 
 192
[docs]
 193        @staticmethod
 194        @abstractmethod
 195        def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: ... 
 
 196
[docs]
 197    class EdgeLike(Successor):
 198        source: KCFG.Node
 199        target: KCFG.Node
 200
 201        @property
 202        def targets(self) -> tuple[KCFG.Node, ...]:
 203            return (self.target,) 
 204
[docs]
 205    @final
 206    @dataclass(frozen=True)
 207    class Edge(EdgeLike):
 208        source: KCFG.Node
 209        target: KCFG.Node
 210        depth: int
 211        rules: tuple[str, ...]
 212
[docs]
 213        def to_dict(self) -> dict[str, Any]:
 214            return {
 215                'source': self.source.id,
 216                'target': self.target.id,
 217                'depth': self.depth,
 218                'rules': list(self.rules),
 219            } 
 220
[docs]
 221        @staticmethod
 222        def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Edge:
 223            return KCFG.Edge(nodes[dct['source']], nodes[dct['target']], dct['depth'], tuple(dct['rules'])) 
 224
[docs]
 225        def to_rule(self, label: str, claim: bool = False, priority: int | None = None) -> KRuleLike:
 226            def is_ceil_condition(kast: KInner) -> bool:
 227                return type(kast) is KApply and kast.label.name == '#Ceil'
 228
 229            def _simplify_config(config: KInner) -> KInner:
 230                return sort_ac_collections(inline_cell_maps(config))
 231
 232            sentence_id = f'{label}-{self.source.id}-TO-{self.target.id}'
 233            init_constraints = [c for c in self.source.cterm.constraints if not is_ceil_condition(c)]
 234            init_cterm = CTerm(_simplify_config(self.source.cterm.config), init_constraints)
 235            target_constraints = [c for c in self.target.cterm.constraints if not is_ceil_condition(c)]
 236            target_cterm = CTerm(_simplify_config(self.target.cterm.config), target_constraints)
 237            rule: KRuleLike
 238            if claim:
 239                rule, _ = cterm_build_claim(sentence_id, init_cterm, target_cterm)
 240            else:
 241                rule, _ = cterm_build_rule(sentence_id, init_cterm, target_cterm, priority=priority)
 242            return rule 
 243
[docs]
 244        def replace_source(self, node: KCFG.Node) -> KCFG.Edge:
 245            assert node.id == self.source.id
 246            return KCFG.Edge(node, self.target, self.depth, self.rules) 
 247
[docs]
 248        def replace_target(self, node: KCFG.Node) -> KCFG.Edge:
 249            assert node.id == self.target.id
 250            return KCFG.Edge(self.source, node, self.depth, self.rules) 
 
 251
[docs]
 252    @final
 253    @dataclass(frozen=True)
 254    class Cover(EdgeLike):
 255        source: KCFG.Node
 256        target: KCFG.Node
 257        csubst: CSubst
 258
[docs]
 259        def to_dict(self) -> dict[str, Any]:
 260            return {
 261                'source': self.source.id,
 262                'target': self.target.id,
 263                'csubst': self.csubst.to_dict(),
 264            } 
 265
[docs]
 266        @staticmethod
 267        def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Cover:
 268            return KCFG.Cover(nodes[dct['source']], nodes[dct['target']], CSubst.from_dict(dct['csubst'])) 
 269
[docs]
 270        def replace_source(self, node: KCFG.Node) -> KCFG.Cover:
 271            assert node.id == self.source.id
 272            return KCFG.Cover(node, self.target, self.csubst) 
 273
[docs]
 274        def replace_target(self, node: KCFG.Node) -> KCFG.Cover:
 275            assert node.id == self.target.id
 276            return KCFG.Cover(self.source, node, self.csubst) 
 
 277
[docs]
 278    @dataclass(frozen=True)
 279    class MultiEdge(Successor):
 280        source: KCFG.Node
 281
 282        def __lt__(self, other: Any) -> bool:
 283            if not type(other) is type(self):
 284                return NotImplemented
 285            return (self.source, self.target_ids) < (other.source, other.target_ids)
 286
[docs]
 287        @abstractmethod
 288        def with_single_target(self, target: KCFG.Node) -> KCFG.MultiEdge: ... 
 
 289
[docs]
 290    @final
 291    @dataclass(frozen=True)
 292    class Split(MultiEdge):
 293        source: KCFG.Node
 294        _targets: tuple[tuple[KCFG.Node, CSubst], ...]
 295
 296        def __init__(self, source: KCFG.Node, _targets: Iterable[tuple[KCFG.Node, CSubst]]) -> None:
 297            object.__setattr__(self, 'source', source)
 298            object.__setattr__(self, '_targets', tuple(_targets))
 299
 300        @property
 301        def targets(self) -> tuple[KCFG.Node, ...]:
 302            return tuple(target for target, _ in self._targets)
 303
 304        @property
 305        def splits(self) -> dict[int, CSubst]:
 306            return {target.id: csubst for target, csubst in self._targets}
 307
[docs]
 308        def to_dict(self) -> dict[str, Any]:
 309            return {
 310                'source': self.source.id,
 311                'targets': [
 312                    {
 313                        'target': target.id,
 314                        'csubst': csubst.to_dict(),
 315                    }
 316                    for target, csubst in self._targets
 317                ],
 318            } 
 319
[docs]
 320        @staticmethod
 321        def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Split:
 322            _targets = [(nodes[target['target']], CSubst.from_dict(target['csubst'])) for target in dct['targets']]
 323            return KCFG.Split(nodes[dct['source']], tuple(_targets)) 
 324
[docs]
 325        def with_single_target(self, target: KCFG.Node) -> KCFG.Split:
 326            return KCFG.Split(self.source, ((target, self.splits[target.id]),)) 
 327
 328        @property
 329        def covers(self) -> tuple[KCFG.Cover, ...]:
 330            return tuple(KCFG.Cover(target, self.source, csubst) for target, csubst in self._targets)
 331
[docs]
 332        def replace_source(self, node: KCFG.Node) -> KCFG.Split:
 333            assert node.id == self.source.id
 334            return KCFG.Split(node, self._targets) 
 335
[docs]
 336        def replace_target(self, node: KCFG.Node) -> KCFG.Split:
 337            assert node.id in self.target_ids
 338            new_targets = [
 339                (node, csubst) if node.id == target_node.id else (target_node, csubst)
 340                for target_node, csubst in self._targets
 341            ]
 342            return KCFG.Split(self.source, tuple(new_targets)) 
 
 343
[docs]
 344    @final
 345    @dataclass(frozen=True)
 346    class NDBranch(MultiEdge):
 347        source: KCFG.Node
 348        _targets: tuple[KCFG.Node, ...]
 349        rules: tuple[str, ...]
 350
 351        def __init__(self, source: KCFG.Node, _targets: Iterable[KCFG.Node], rules: tuple[str, ...]) -> None:
 352            object.__setattr__(self, 'source', source)
 353            object.__setattr__(self, '_targets', tuple(_targets))
 354            object.__setattr__(self, 'rules', rules)
 355
 356        @property
 357        def targets(self) -> tuple[KCFG.Node, ...]:
 358            return self._targets
 359
[docs]
 360        def to_dict(self) -> dict[str, Any]:
 361            return {
 362                'source': self.source.id,
 363                'targets': [target.id for target in self.targets],
 364                'rules': list(self.rules),
 365            } 
 366
[docs]
 367        @staticmethod
 368        def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.NDBranch:
 369            return KCFG.NDBranch(
 370                nodes[dct['source']], tuple([nodes[target] for target in dct['targets']]), tuple(dct['rules'])
 371            ) 
 372
[docs]
 373        def with_single_target(self, target: KCFG.Node) -> KCFG.NDBranch:
 374            return KCFG.NDBranch(self.source, (target,), ()) 
 375
 376        @property
 377        def edges(self) -> tuple[KCFG.Edge, ...]:
 378            return tuple(KCFG.Edge(self.source, target, 1, ()) for target in self.targets)
 379
[docs]
 380        def replace_source(self, node: KCFG.Node) -> KCFG.NDBranch:
 381            assert node.id == self.source.id
 382            return KCFG.NDBranch(node, self._targets, self.rules) 
 383
[docs]
 384        def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch:
 385            assert node.id in self.target_ids
 386            new_targets = [node if node.id == target_node.id else target_node for target_node in self._targets]
 387            return KCFG.NDBranch(self.source, tuple(new_targets), self.rules) 
 
 388
 389    _node_id: int
 390    _nodes: MutableMapping[int, KCFG.Node]
 391
 392    _created_nodes: set[int]
 393    _deleted_nodes: set[int]
 394
 395    _edges: dict[int, Edge]
 396    _covers: dict[int, Cover]
 397    _splits: dict[int, Split]
 398    _ndbranches: dict[int, NDBranch]
 399    _aliases: dict[str, int]
 400    _lock: RLock
 401
 402    _kcfg_store: KCFGStore | None
 403
 404    def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) -> None:
 405        self._node_id = 1
 406        if optimize_memory:
 407            from .store import OptimizedNodeStore
 408
 409            self._nodes = OptimizedNodeStore()
 410        else:
 411            self._nodes = {}
 412        self._created_nodes = set()
 413        self._deleted_nodes = set()
 414        self._edges = {}
 415        self._covers = {}
 416        self._splits = {}
 417        self._ndbranches = {}
 418        self._aliases = {}
 419        self._lock = RLock()
 420        if cfg_dir is not None:
 421            self._kcfg_store = KCFGStore(cfg_dir)
 422
 423    def __contains__(self, item: object) -> bool:
 424        if type(item) is KCFG.Node:
 425            return self.contains_node(item)
 426        if type(item) is KCFG.Edge:
 427            return self.contains_edge(item)
 428        if type(item) is KCFG.Cover:
 429            return self.contains_cover(item)
 430        if type(item) is KCFG.Split:
 431            return self.contains_split(item)
 432        if type(item) is KCFG.NDBranch:
 433            return self.contains_ndbranch(item)
 434        return False
 435
 436    def __enter__(self) -> KCFG:
 437        self._lock.acquire()
 438        return self
 439
 440    def __exit__(
 441        self,
 442        exc_type: type[BaseException] | None,
 443        exc_value: BaseException | None,
 444        traceback: TracebackType | None,
 445    ) -> bool:
 446        self._lock.release()
 447        if exc_type is None:
 448            return True
 449        return False
 450
 451    @property
 452    def nodes(self) -> list[KCFG.Node]:
 453        return list(self._nodes.values())
 454
 455    @property
 456    def root(self) -> list[KCFG.Node]:
 457        return [node for node in self.nodes if self.is_root(node.id)]
 458
 459    @property
 460    def vacuous(self) -> list[KCFG.Node]:
 461        return [node for node in self.nodes if self.is_vacuous(node.id)]
 462
 463    @property
 464    def stuck(self) -> list[KCFG.Node]:
 465        return [node for node in self.nodes if self.is_stuck(node.id)]
 466
 467    @property
 468    def leaves(self) -> list[KCFG.Node]:
 469        return [node for node in self.nodes if self.is_leaf(node.id)]
 470
 471    @property
 472    def covered(self) -> list[KCFG.Node]:
 473        return [node for node in self.nodes if self.is_covered(node.id)]
 474
 475    @property
 476    def uncovered(self) -> list[KCFG.Node]:
 477        return [node for node in self.nodes if not self.is_covered(node.id)]
 478
[docs]
 479    @staticmethod
 480    def from_claim(
 481        defn: KDefinition, claim: KClaim, cfg_dir: Path | None = None, optimize_memory: bool = True
 482    ) -> tuple[KCFG, NodeIdLike, NodeIdLike]:
 483        cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory)
 484        claim_body = claim.body
 485        claim_body = defn.instantiate_cell_vars(claim_body)
 486        claim_body = rename_generated_vars(claim_body)
 487
 488        claim_lhs = CTerm.from_kast(extract_lhs(claim_body)).add_constraint(bool_to_ml_pred(claim.requires))
 489        init_node = cfg.create_node(claim_lhs)
 490
 491        claim_rhs = CTerm.from_kast(extract_rhs(claim_body)).add_constraint(
 492            bool_to_ml_pred(andBool([claim.requires, claim.ensures]))
 493        )
 494        target_node = cfg.create_node(claim_rhs)
 495
 496        return cfg, init_node.id, target_node.id 
 497
[docs]
 498    @staticmethod
 499    def path_length(_path: Iterable[KCFG.Successor]) -> int:
 500        _path = tuple(_path)
 501        if len(_path) == 0:
 502            return 0
 503        if type(_path[0]) is KCFG.Split or type(_path[0]) is KCFG.Cover:
 504            return KCFG.path_length(_path[1:])
 505        elif type(_path[0]) is KCFG.NDBranch:
 506            return 1 + KCFG.path_length(_path[1:])
 507        elif type(_path[0]) is KCFG.Edge:
 508            return _path[0].depth + KCFG.path_length(_path[1:])
 509        raise ValueError(f'Cannot handle Successor type: {type(_path[0])}') 
 510
[docs]
 511    def extend(
 512        self,
 513        extend_result: KCFGExtendResult,
 514        node: KCFG.Node,
 515        logs: dict[int, tuple[LogEntry, ...]],
 516    ) -> None:
 517        match extend_result:
 518            case Vacuous():
 519                self.add_vacuous(node.id)
 520
 521            case Stuck():
 522                self.add_stuck(node.id)
 523
 524            case Abstract(cterm):
 525                new_node = self.create_node(cterm)
 526                self.create_cover(node.id, new_node.id)
 527
 528            case Step(cterm, depth, next_node_logs, rule_labels, _):
 529                next_node = self.create_node(cterm)
 530                logs[next_node.id] = next_node_logs
 531                self.create_edge(node.id, next_node.id, depth, rules=rule_labels)
 532
 533            case Branch(constraints, _):
 534                self.split_on_constraints(node.id, constraints)
 535
 536            case NDBranch(cterms, next_node_logs, rule_labels):
 537                next_ids = [self.create_node(cterm).id for cterm in cterms]
 538                for i in next_ids:
 539                    logs[i] = next_node_logs
 540                self.create_ndbranch(node.id, next_ids, rules=rule_labels)
 541
 542            case _:
 543                raise AssertionError() 
 544
[docs]
 545    def to_dict(self) -> dict[str, Any]:
 546        nodes = [node.to_dict() for node in self.nodes]
 547        edges = [edge.to_dict() for edge in self.edges()]
 548        covers = [cover.to_dict() for cover in self.covers()]
 549        splits = [split.to_dict() for split in self.splits()]
 550        ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()]
 551
 552        aliases = dict(sorted(self._aliases.items()))
 553
 554        res = {
 555            'next': self._node_id,
 556            'nodes': nodes,
 557            'edges': edges,
 558            'covers': covers,
 559            'splits': splits,
 560            'ndbranches': ndbranches,
 561            'aliases': aliases,
 562        }
 563        return {k: v for k, v in res.items() if v} 
 564
[docs]
 565    @staticmethod
 566    def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG:
 567        cfg = KCFG(optimize_memory=optimize_memory)
 568
 569        for node_dict in dct.get('nodes') or []:
 570            node = KCFG.Node.from_dict(node_dict)
 571            cfg.add_node(node)
 572
 573        max_id = max([node.id for node in cfg.nodes], default=0)
 574        cfg._node_id = dct.get('next', max_id + 1)
 575
 576        for edge_dict in dct.get('edges') or []:
 577            edge = KCFG.Edge.from_dict(edge_dict, cfg._nodes)
 578            cfg.add_successor(edge)
 579
 580        for cover_dict in dct.get('covers') or []:
 581            cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes)
 582            cfg.add_successor(cover)
 583
 584        for split_dict in dct.get('splits') or []:
 585            split = KCFG.Split.from_dict(split_dict, cfg._nodes)
 586            cfg.add_successor(split)
 587
 588        for ndbranch_dict in dct.get('ndbranches') or []:
 589            ndbranch = KCFG.NDBranch.from_dict(ndbranch_dict, cfg._nodes)
 590            cfg.add_successor(ndbranch)
 591
 592        for alias, node_id in dct.get('aliases', {}).items():
 593            cfg.add_alias(alias=alias, node_id=node_id)
 594
 595        return cfg 
 596
[docs]
 597    def aliases(self, node_id: NodeIdLike) -> list[str]:
 598        node_id = self._resolve(node_id)
 599        return [alias for alias, value in self._aliases.items() if node_id == value] 
 600
[docs]
 601    def to_json(self) -> str:
 602        return json.dumps(self.to_dict(), sort_keys=True) 
 603
[docs]
 604    @staticmethod
 605    def from_json(s: str, optimize_memory: bool = True) -> KCFG:
 606        return KCFG.from_dict(json.loads(s), optimize_memory=optimize_memory) 
 607
[docs]
 608    def to_rules(self, priority: int = 20) -> list[KRuleLike]:
 609        return [e.to_rule('BASIC-BLOCK', priority=priority) for e in self.edges()] 
 610
[docs]
 611    def to_module(
 612        self,
 613        module_name: str | None = None,
 614        imports: Iterable[KImport] = (),
 615        priority: int = 20,
 616        att: KAtt = EMPTY_ATT,
 617    ) -> KFlatModule:
 618        module_name = 'KCFG' if module_name is None else module_name
 619        return KFlatModule(module_name, self.to_rules(priority=priority), imports=imports, att=att) 
 620
 621    def _resolve_or_none(self, id_like: NodeIdLike) -> int | None:
 622        if type(id_like) is int:
 623            if id_like in self._nodes:
 624                return id_like
 625
 626            return None
 627
 628        if type(id_like) is not str:
 629            raise TypeError(f'Expected int or str for id_like, got: {id_like}')
 630
 631        if id_like.startswith('@'):
 632            if id_like[1:] in self._aliases:
 633                return self._aliases[id_like[1:]]
 634            raise ValueError(f'Unknown alias: {id_like}')
 635
 636        return None
 637
 638    def _resolve(self, id_like: NodeIdLike) -> int:
 639        match = self._resolve_or_none(id_like)
 640        if not match:
 641            raise ValueError(f'Unknown node: {id_like}')
 642        return match
 643
[docs]
 644    def node(self, node_id: NodeIdLike) -> KCFG.Node:
 645        node_id = self._resolve(node_id)
 646        return self._nodes[node_id] 
 647
[docs]
 648    def get_node(self, node_id: NodeIdLike) -> KCFG.Node | None:
 649        resolved_id = self._resolve_or_none(node_id)
 650        if resolved_id is None:
 651            return None
 652        return self._nodes[resolved_id] 
 653
[docs]
 654    def contains_node(self, node: KCFG.Node) -> bool:
 655        return bool(self.get_node(node.id)) 
 656
[docs]
 657    def add_node(self, node: KCFG.Node) -> None:
 658        if node.id in self._nodes:
 659            raise ValueError(f'Node with id already exists: {node.id}')
 660        self._nodes[node.id] = node
 661        self._created_nodes.add(node.id) 
 662
[docs]
 663    def create_node(self, cterm: CTerm) -> KCFG.Node:
 664        node = KCFG.Node(self._node_id, cterm)
 665        self._node_id += 1
 666        self._nodes[node.id] = node
 667        self._created_nodes.add(node.id)
 668        return node 
 669
[docs]
 670    def remove_node(self, node_id: NodeIdLike) -> None:
 671        node_id = self._resolve(node_id)
 672
 673        node = self._nodes.pop(node_id)
 674        self._created_nodes.discard(node_id)
 675        self._deleted_nodes.add(node.id)
 676
 677        self._edges = {k: s for k, s in self._edges.items() if k != node_id and node_id not in s.target_ids}
 678        self._covers = {k: s for k, s in self._covers.items() if k != node_id and node_id not in s.target_ids}
 679
 680        self._splits = {k: s for k, s in self._splits.items() if k != node_id and node_id not in s.target_ids}
 681        self._ndbranches = {k: b for k, b in self._ndbranches.items() if k != node_id and node_id not in b.target_ids}
 682
 683        for alias in [alias for alias, id in self._aliases.items() if id == node_id]:
 684            self.remove_alias(alias) 
 685
 686    def _update_refs(self, node_id: int) -> None:
 687        node = self.node(node_id)
 688        for succ in self.successors(node_id):
 689            new_succ = succ.replace_source(node)
 690            if type(new_succ) is KCFG.Edge:
 691                self._edges[new_succ.source.id] = new_succ
 692            if type(new_succ) is KCFG.Cover:
 693                self._covers[new_succ.source.id] = new_succ
 694            if type(new_succ) is KCFG.Split:
 695                self._splits[new_succ.source.id] = new_succ
 696            if type(new_succ) is KCFG.NDBranch:
 697                self._ndbranches[new_succ.source.id] = new_succ
 698
 699        for pred in self.predecessors(node_id):
 700            new_pred = pred.replace_target(node)
 701            if type(new_pred) is KCFG.Edge:
 702                self._edges[new_pred.source.id] = new_pred
 703            if type(new_pred) is KCFG.Cover:
 704                self._covers[new_pred.source.id] = new_pred
 705            if type(new_pred) is KCFG.Split:
 706                self._splits[new_pred.source.id] = new_pred
 707            if type(new_pred) is KCFG.NDBranch:
 708                self._ndbranches[new_pred.source.id] = new_pred
 709
[docs]
 710    def remove_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
 711        node = self.node(node_id)
 712        new_node = node.remove_attr(attr)
 713        self.replace_node(new_node) 
 714
[docs]
 715    def discard_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
 716        node = self.node(node_id)
 717        new_node = node.discard_attr(attr)
 718        self.replace_node(new_node) 
 719
[docs]
 720    def add_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
 721        node = self.node(node_id)
 722        new_node = node.add_attr(attr)
 723        self.replace_node(new_node) 
 724
[docs]
 725    def let_node(
 726        self, node_id: NodeIdLike, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None
 727    ) -> None:
 728        node = self.node(node_id)
 729        new_node = node.let(cterm=cterm, attrs=attrs)
 730        self.replace_node(new_node) 
 731
[docs]
 732    def replace_node(self, node: KCFG.Node) -> None:
 733        self._nodes[node.id] = node
 734        self._created_nodes.add(node.id)
 735        self._update_refs(node.id) 
 736
[docs]
 737    def successors(self, source_id: NodeIdLike) -> list[Successor]:
 738        out_edges: Iterable[KCFG.Successor] = self.edges(source_id=source_id)
 739        out_covers: Iterable[KCFG.Successor] = self.covers(source_id=source_id)
 740        out_splits: Iterable[KCFG.Successor] = self.splits(source_id=source_id)
 741        out_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(source_id=source_id)
 742        return list(out_edges) + list(out_covers) + list(out_splits) + list(out_ndbranches) 
 743
[docs]
 744    def predecessors(self, target_id: NodeIdLike) -> list[Successor]:
 745        in_edges: Iterable[KCFG.Successor] = self.edges(target_id=target_id)
 746        in_covers: Iterable[KCFG.Successor] = self.covers(target_id=target_id)
 747        in_splits: Iterable[KCFG.Successor] = self.splits(target_id=target_id)
 748        in_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(target_id=target_id)
 749        return list(in_edges) + list(in_covers) + list(in_splits) + list(in_ndbranches) 
 750
 751    def _check_no_successors(self, source_id: NodeIdLike) -> None:
 752        if len(self.successors(source_id)) > 0:
 753            raise ValueError(f'Node already has successors: {source_id} -> {self.successors(source_id)}')
 754
 755    def _check_no_zero_loops(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> None:
 756        for target_id in target_ids:
 757            path = self.shortest_path_between(target_id, source_id)
 758            if path is not None and KCFG.path_length(path) == 0:
 759                raise ValueError(
 760                    f'Adding successor would create zero-length loop with backedge: {source_id} -> {target_id}'
 761                )
 762
[docs]
 763    def add_successor(self, succ: KCFG.Successor) -> None:
 764        self._check_no_successors(succ.source.id)
 765        self._check_no_zero_loops(succ.source.id, succ.target_ids)
 766        if type(succ) is KCFG.Edge:
 767            self._edges[succ.source.id] = succ
 768        elif type(succ) is KCFG.Cover:
 769            self._covers[succ.source.id] = succ
 770        else:
 771            if len(succ.target_ids) <= 1:
 772                raise ValueError(
 773                    f'Cannot create {type(succ)} node with less than 2 targets: {succ.source.id} -> {succ.target_ids}'
 774                )
 775            if type(succ) is KCFG.Split:
 776                self._splits[succ.source.id] = succ
 777            elif type(succ) is KCFG.NDBranch:
 778                self._ndbranches[succ.source.id] = succ 
 779
[docs]
 780    def edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Edge | None:
 781        source_id = self._resolve(source_id)
 782        target_id = self._resolve(target_id)
 783        edge = self._edges.get(source_id, None)
 784        return edge if edge is not None and edge.target.id == target_id else None 
 785
[docs]
 786    def edges(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Edge]:
 787        source_id = self._resolve(source_id) if source_id is not None else None
 788        target_id = self._resolve(target_id) if target_id is not None else None
 789        return [
 790            edge
 791            for edge in self._edges.values()
 792            if (source_id is None or source_id == edge.source.id) and (target_id is None or target_id == edge.target.id)
 793        ] 
 794
[docs]
 795    def contains_edge(self, edge: Edge) -> bool:
 796        if other := self.edge(source_id=edge.source.id, target_id=edge.target.id):
 797            return edge == other
 798        return False 
 799
[docs]
 800    def create_edge(self, source_id: NodeIdLike, target_id: NodeIdLike, depth: int, rules: Iterable[str] = ()) -> Edge:
 801        if depth <= 0:
 802            raise ValueError(f'Cannot build KCFG Edge with non-positive depth: {depth}')
 803        source = self.node(source_id)
 804        target = self.node(target_id)
 805        edge = KCFG.Edge(source, target, depth, tuple(rules))
 806        self.add_successor(edge)
 807        return edge 
 808
[docs]
 809    def remove_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
 810        source_id = self._resolve(source_id)
 811        target_id = self._resolve(target_id)
 812        edge = self.edge(source_id, target_id)
 813        if not edge:
 814            raise ValueError(f'Edge does not exist: {source_id} -> {target_id}')
 815        self._edges.pop(source_id) 
 816
[docs]
 817    def cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Cover | None:
 818        source_id = self._resolve(source_id)
 819        target_id = self._resolve(target_id)
 820        cover = self._covers.get(source_id, None)
 821        return cover if cover is not None and cover.target.id == target_id else None 
 822
[docs]
 823    def covers(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Cover]:
 824        source_id = self._resolve(source_id) if source_id is not None else None
 825        target_id = self._resolve(target_id) if target_id is not None else None
 826        return [
 827            cover
 828            for cover in self._covers.values()
 829            if (source_id is None or source_id == cover.source.id)
 830            and (target_id is None or target_id == cover.target.id)
 831        ] 
 832
[docs]
 833    def contains_cover(self, cover: Cover) -> bool:
 834        if other := self.cover(source_id=cover.source.id, target_id=cover.target.id):
 835            return cover == other
 836        return False 
 837
[docs]
 838    def create_cover(self, source_id: NodeIdLike, target_id: NodeIdLike, csubst: CSubst | None = None) -> Cover:
 839        source = self.node(source_id)
 840        target = self.node(target_id)
 841        if csubst is None:
 842            csubst = target.cterm.match_with_constraint(source.cterm)
 843            if csubst is None:
 844                raise ValueError(f'No matching between: {source.id} and {target.id}')
 845        cover = KCFG.Cover(source, target, csubst=csubst)
 846        self.add_successor(cover)
 847        return cover 
 848
[docs]
 849    def remove_cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
 850        source_id = self._resolve(source_id)
 851        target_id = self._resolve(target_id)
 852        cover = self.cover(source_id, target_id)
 853        if not cover:
 854            raise ValueError(f'Cover does not exist: {source_id} -> {target_id}')
 855        self._covers.pop(source_id) 
 856
[docs]
 857    def edge_likes(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[EdgeLike]:
 858        return cast('List[KCFG.EdgeLike]', self.edges(source_id=source_id, target_id=target_id)) + cast(
 859            'List[KCFG.EdgeLike]', self.covers(source_id=source_id, target_id=target_id)
 860        ) 
 861
[docs]
 862    def add_vacuous(self, node_id: NodeIdLike) -> None:
 863        self.add_attr(node_id, KCFGNodeAttr.VACUOUS) 
 864
[docs]
 865    def remove_vacuous(self, node_id: NodeIdLike) -> None:
 866        self.remove_attr(node_id, KCFGNodeAttr.VACUOUS) 
 867
[docs]
 868    def discard_vacuous(self, node_id: NodeIdLike) -> None:
 869        self.discard_attr(node_id, KCFGNodeAttr.VACUOUS) 
 870
[docs]
 871    def add_stuck(self, node_id: NodeIdLike) -> None:
 872        self.add_attr(node_id, KCFGNodeAttr.STUCK) 
 873
[docs]
 874    def remove_stuck(self, node_id: NodeIdLike) -> None:
 875        self.remove_attr(node_id, KCFGNodeAttr.STUCK) 
 876
[docs]
 877    def discard_stuck(self, node_id: NodeIdLike) -> None:
 878        self.discard_attr(node_id, KCFGNodeAttr.STUCK) 
 879
[docs]
 880    def splits(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Split]:
 881        source_id = self._resolve(source_id) if source_id is not None else None
 882        target_id = self._resolve(target_id) if target_id is not None else None
 883        return [
 884            s
 885            for s in self._splits.values()
 886            if (source_id is None or source_id == s.source.id) and (target_id is None or target_id in s.target_ids)
 887        ] 
 888
[docs]
 889    def contains_split(self, split: Split) -> bool:
 890        return split in self._splits.values() 
 891
[docs]
 892    def create_split(self, source_id: NodeIdLike, splits: Iterable[tuple[NodeIdLike, CSubst]]) -> KCFG.Split:
 893        source_id = self._resolve(source_id)
 894        split = KCFG.Split(self.node(source_id), tuple((self.node(nid), csubst) for nid, csubst in list(splits)))
 895        self.add_successor(split)
 896        return split 
 897
[docs]
 898    def ndbranches(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[NDBranch]:
 899        source_id = self._resolve(source_id) if source_id is not None else None
 900        target_id = self._resolve(target_id) if target_id is not None else None
 901        return [
 902            b
 903            for b in self._ndbranches.values()
 904            if (source_id is None or source_id == b.source.id) and (target_id is None or target_id in b.target_ids)
 905        ] 
 906
[docs]
 907    def contains_ndbranch(self, ndbranch: NDBranch) -> bool:
 908        return ndbranch in self._ndbranches 
 909
[docs]
 910    def create_ndbranch(
 911        self, source_id: NodeIdLike, ndbranches: Iterable[NodeIdLike], rules: Iterable[str] = ()
 912    ) -> KCFG.NDBranch:
 913        source_id = self._resolve(source_id)
 914        ndbranch = KCFG.NDBranch(self.node(source_id), tuple(self.node(nid) for nid in list(ndbranches)), tuple(rules))
 915        self.add_successor(ndbranch)
 916        return ndbranch 
 917
[docs]
 918    def split_on_constraints(self, source_id: NodeIdLike, constraints: Iterable[KInner]) -> list[int]:
 919        source = self.node(source_id)
 920        branch_node_ids = [self.create_node(source.cterm.add_constraint(c)).id for c in constraints]
 921        csubsts = [not_none(source.cterm.match_with_constraint(self.node(id).cterm)) for id in branch_node_ids]
 922        csubsts = [
 923            reduce(CSubst.add_constraint, flatten_label('#And', constraint), csubst)
 924            for (csubst, constraint) in zip(csubsts, constraints, strict=True)
 925        ]
 926        self.create_split(source.id, zip(branch_node_ids, csubsts, strict=True))
 927        return branch_node_ids 
 928
[docs]
 929    def lift_edge(self, b_id: NodeIdLike) -> None:
 930        """Lift an edge up another edge directly preceding it.
 931
 932        Input:
 933            -   b_id: the identifier of the central node `B` of a sequence of edges `A --> B --> C`.
 934
 935        Effect: `A --M steps--> B --N steps--> C` becomes `A --(M + N) steps--> C`. Node `B` is removed.
 936
 937        Output: None
 938
 939        Raises: An `AssertionError` if the edges in question are not in place.
 940        """
 941
 942        # Obtain edges `A -> B`, `B -> C`
 943        a_to_b = single(self.edges(target_id=b_id))
 944        b_to_c = single(self.edges(source_id=b_id))
 945        # Remove the node `B`, effectively removing the entire initial structure
 946        self.remove_node(b_id)
 947        # Create edge `A -> C`
 948        self.create_edge(a_to_b.source.id, b_to_c.target.id, a_to_b.depth + b_to_c.depth, a_to_b.rules + b_to_c.rules) 
 949
[docs]
 950    def lift_edges(self) -> bool:
 951        """Perform all possible edge lifts across the KCFG.
 952
 953        Given the KCFG design, it is not possible for one edge lift to either disallow another or
 954        allow another that was not previously possible. Therefore, this function is guaranteed to
 955        lift all possible edges without having to loop.
 956
 957        Input: None
 958
 959        Effect: The KCFG is transformed to an equivalent in which no further edge lifts are possible.
 960
 961        Output:
 962            -   bool: An indicator of whether or not at least one edge lift was performed.
 963        """
 964
 965        edges_to_lift = sorted(
 966            [
 967                node.id
 968                for node in self.nodes
 969                if len(self.edges(source_id=node.id)) > 0 and len(self.edges(target_id=node.id)) > 0
 970            ]
 971        )
 972        for node_id in edges_to_lift:
 973            self.lift_edge(node_id)
 974        return len(edges_to_lift) > 0 
 975
[docs]
 976    def lift_split_edge(self, b_id: NodeIdLike) -> None:
 977        """Lift a split up an edge directly preceding it.
 978
 979        Input:
 980            -   b_id: the identifier of the central node `B` of the structure `A --> B --> [C_1, ..., C_N]`.
 981
 982        Effect:
 983            `A --M steps--> B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` becomes
 984            `A --[cond_1, ..., cond_N]--> [A #And cond_1 --M steps--> C_1, ..., A #And cond_N --M steps--> C_N]`.
 985            Node `B` is removed.
 986
 987        Output: None
 988
 989        Raises:
 990            -   `AssertionError`, if the structure in question is not in place.
 991            -   `AssertionError`, if any of the `cond_i` contain variables not present in `A`.
 992        """
 993
 994        # Obtain edge `A -> B`, split `[cond_I, C_I | I = 1..N ]`
 995        a_to_b = single(self.edges(target_id=b_id))
 996        a = a_to_b.source
 997        split_from_b = single(self.splits(source_id=b_id))
 998        ci, csubsts = list(split_from_b.splits.keys()), list(split_from_b.splits.values())
 999        # Ensure split can be lifted soundly (i.e., that it does not introduce fresh variables)
1000        assert (
1001            len(split_from_b.source_vars.difference(a.free_vars)) == 0
1002            and len(split_from_b.target_vars.difference(split_from_b.source_vars)) == 0
1003        )
1004        # Create CTerms and CSubsts corresponding to the new targets of the split
1005        new_cterms_with_constraints = [
1006            (CTerm(a.cterm.config, a.cterm.constraints + csubst.constraints), csubst.constraint) for csubst in csubsts
1007        ]
1008        # Generate substitutions for new targets, which all exist by construction
1009        new_csubsts = [
1010            not_none(a.cterm.match_with_constraint(cterm)).add_constraint(constraint)
1011            for (cterm, constraint) in new_cterms_with_constraints
1012        ]
1013        # Remove the node `B`, effectively removing the entire initial structure
1014        self.remove_node(b_id)
1015        # Create the nodes `[ A #And cond_I | I = 1..N ]`.
1016        ai: list[NodeIdLike] = [self.create_node(cterm).id for (cterm, _) in new_cterms_with_constraints]
1017        # Create the edges `[A #And cond_1 --M steps--> C_I | I = 1..N ]`
1018        for i in range(len(ai)):
1019            self.create_edge(ai[i], ci[i], a_to_b.depth, a_to_b.rules)
1020        # Create the split `A --[cond_1, ..., cond_N]--> [A #And cond_1, ..., A #And cond_N]
1021        self.create_split(a.id, zip(ai, new_csubsts, strict=True)) 
1022
[docs]
1023    def lift_split_split(self, b_id: NodeIdLike) -> None:
1024        """Lift a split up a split directly preceding it, joining them into a single split.
1025
1026        Input:
1027            -   b_id: the identifier of the node `B` of the structure
1028                `A --[..., cond_B, ...]--> [..., B, ...]` with `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]`
1029
1030        Effect:
1031            `A --[..., cond_B, ...]--> [..., B, ...]` with `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` becomes
1032            `A --[..., cond_B #And cond_1, ..., cond_B #And cond_N, ...]--> [..., C_1, ..., C_N, ...]`.
1033            Node `B` is removed.
1034
1035        Output: None
1036
1037        Raises:
1038            -   `AssertionError`, if the structure in question is not in place.
1039        """
1040        # Obtain splits `A --[..., cond_B, ...]--> [..., B, ...]` and
1041        # `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]-> [C_1, ..., C_N]`
1042        split_from_a, split_from_b = single(self.splits(target_id=b_id)), single(self.splits(source_id=b_id))
1043        splits_from_a, splits_from_b = split_from_a.splits, split_from_b.splits
1044        a = split_from_a.source
1045        ci = list(splits_from_b.keys())
1046        # Ensure split can be lifted soundly (i.e., that it does not introduce fresh variables)
1047        assert (
1048            len(split_from_b.source_vars.difference(a.free_vars)) == 0
1049            and len(split_from_b.target_vars.difference(split_from_b.source_vars)) == 0
1050        )
1051        # Get the substitution for `B`, at the same time removing 'B' from the targets of `A`.
1052        csubst_b = splits_from_a.pop(self._resolve(b_id))
1053        # Generate substitutions for additional targets `C_I`, which all exist by construction;
1054        # the constraints are cumulative, resulting in `cond_B #And cond_I`
1055        additional_csubsts = [
1056            not_none(a.cterm.match_with_constraint(self.node(ci).cterm))
1057            .add_constraint(csubst.constraint)
1058            .add_constraint(csubst_b.constraint)
1059            for ci, csubst in splits_from_b.items()
1060        ]
1061        # Create the targets of the new split
1062        ci = list(splits_from_b.keys())
1063        new_splits = zip(
1064            list(splits_from_a.keys()) + ci, list(splits_from_a.values()) + additional_csubsts, strict=True
1065        )
1066        # Remove the node `B`, thereby removing the two splits as well
1067        self.remove_node(b_id)
1068        # Create the new split `A --[..., cond_B #And cond_1, ..., cond_B #And cond_N, ...]--> [..., C_1, ..., C_N, ...]`
1069        self.create_split(a.id, new_splits) 
1070
[docs]
1071    def lift_splits(self) -> bool:
1072        """Perform all possible split liftings.
1073
1074        Input: None
1075
1076        Effect: The KCFG is transformed to an equivalent in which no further split lifts are possible.
1077
1078        Output:
1079            -   bool: An indicator of whether or not at least one split lift was performed.
1080        """
1081
1082        def _lift_split(finder: Callable, lifter: Callable) -> bool:
1083            result = False
1084            while True:
1085                splits_to_lift = sorted(
1086                    [
1087                        node.id
1088                        for node in self.nodes
1089                        if (splits := self.splits(source_id=node.id)) != []
1090                        and (sources := finder(target_id=node.id)) != []
1091                        and (source := single(sources).source)
1092                        and (split := single(splits))
1093                        and len(split.source_vars.difference(source.free_vars)) == 0
1094                        and len(split.target_vars.difference(split.source_vars)) == 0
1095                    ]
1096                )
1097                for id in splits_to_lift:
1098                    lifter(id)
1099                    result = True
1100                if len(splits_to_lift) == 0:
1101                    break
1102            return result
1103
1104        def _fold_lift(result: bool, finder_lifter: tuple[Callable, Callable]) -> bool:
1105            return _lift_split(finder_lifter[0], finder_lifter[1]) or result
1106
1107        return reduce(_fold_lift, [(self.edges, self.lift_split_edge), (self.splits, self.lift_split_split)], False) 
1108
[docs]
1109    def minimize(self) -> None:
1110        """Minimize KCFG by repeatedly performing the lifting transformations.
1111
1112        The loop is designed so that each transformation is performed once in each iteration.
1113
1114        Input: None
1115
1116        Effect: The KCFG is transformed to an equivalent in which no further lifting transformations are possible.
1117
1118        Output: None
1119        """
1120
1121        repeat = True
1122        while repeat:
1123            repeat = self.lift_edges()
1124            repeat = self.lift_splits() or repeat 
1125
[docs]
1126    def add_alias(self, alias: str, node_id: NodeIdLike) -> None:
1127        if '@' in alias:
1128            raise ValueError('Alias may not contain "@"')
1129        if alias in self._aliases:
1130            raise ValueError(f'Duplicate alias: {alias}')
1131        node_id = self._resolve(node_id)
1132        self._aliases[alias] = node_id 
1133
[docs]
1134    def remove_alias(self, alias: str) -> None:
1135        if alias not in self._aliases:
1136            raise ValueError(f'Alias does not exist: {alias}')
1137        self._aliases.pop(alias) 
1138
[docs]
1139    def is_root(self, node_id: NodeIdLike) -> bool:
1140        node_id = self._resolve(node_id)
1141        return len(self.predecessors(node_id)) == 0 
1142
[docs]
1143    def is_vacuous(self, node_id: NodeIdLike) -> bool:
1144        return KCFGNodeAttr.VACUOUS in self.node(node_id).attrs 
1145
[docs]
1146    def is_stuck(self, node_id: NodeIdLike) -> bool:
1147        return KCFGNodeAttr.STUCK in self.node(node_id).attrs 
1148
[docs]
1149    def is_split(self, node_id: NodeIdLike) -> bool:
1150        node_id = self._resolve(node_id)
1151        return node_id in self._splits 
1152
[docs]
1153    def is_ndbranch(self, node_id: NodeIdLike) -> bool:
1154        node_id = self._resolve(node_id)
1155        return node_id in self._ndbranches 
1156
[docs]
1157    def is_leaf(self, node_id: NodeIdLike) -> bool:
1158        return len(self.successors(node_id)) == 0 
1159
[docs]
1160    def is_covered(self, node_id: NodeIdLike) -> bool:
1161        node_id = self._resolve(node_id)
1162        return node_id in self._covers 
1163
[docs]
1164    def prune(self, node_id: NodeIdLike, keep_nodes: Iterable[NodeIdLike] = ()) -> list[int]:
1165        nodes = self.reachable_nodes(node_id)
1166        keep_nodes = [self._resolve(nid) for nid in keep_nodes]
1167        pruned_nodes: list[int] = []
1168        for node in nodes:
1169            if node.id not in keep_nodes:
1170                self.remove_node(node.id)
1171                pruned_nodes.append(node.id)
1172        return pruned_nodes 
1173
[docs]
1174    def shortest_path_between(
1175        self, source_node_id: NodeIdLike, target_node_id: NodeIdLike
1176    ) -> tuple[Successor, ...] | None:
1177        paths = self.paths_between(source_node_id, target_node_id)
1178        if len(paths) == 0:
1179            return None
1180        return sorted(paths, key=(lambda path: KCFG.path_length(path)))[0] 
1181
[docs]
1182    def shortest_distance_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> int | None:
1183        path_1 = self.shortest_path_between(node_1_id, node_2_id)
1184        path_2 = self.shortest_path_between(node_2_id, node_1_id)
1185        distance: int | None = None
1186        if path_1 is not None:
1187            distance = KCFG.path_length(path_1)
1188        if path_2 is not None:
1189            distance_2 = KCFG.path_length(path_2)
1190            if distance is None or distance_2 < distance:
1191                distance = distance_2
1192        return distance 
1193
[docs]
1194    def zero_depth_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> bool:
1195        shortest_distance = self.shortest_distance_between(node_1_id, node_2_id)
1196        return shortest_distance is not None and shortest_distance == 0 
1197
[docs]
1198    def paths_between(self, source_id: NodeIdLike, target_id: NodeIdLike) -> list[tuple[Successor, ...]]:
1199        source_id = self._resolve(source_id)
1200        target_id = self._resolve(target_id)
1201
1202        if source_id == target_id:
1203            return [()]
1204
1205        source_successors = list(self.successors(source_id))
1206        assert len(source_successors) <= 1
1207        if len(source_successors) == 0:
1208            return []
1209
1210        paths: list[tuple[KCFG.Successor, ...]] = []
1211        worklist: list[list[KCFG.Successor]] = [[source_successors[0]]]
1212
1213        def _in_path(_nid: int, _path: list[KCFG.Successor]) -> bool:
1214            for succ in _path:
1215                if _nid == succ.source.id:
1216                    return True
1217            if len(_path) > 0:
1218                if isinstance(_path[-1], KCFG.EdgeLike) and _path[-1].target.id == _nid:
1219                    return True
1220                elif isinstance(_path[-1], KCFG.MultiEdge) and _nid in _path[-1].target_ids:
1221                    return True
1222            return False
1223
1224        while worklist:
1225            curr_path = worklist.pop()
1226            curr_successor = curr_path[-1]
1227            successors: list[KCFG.Successor] = []
1228
1229            if isinstance(curr_successor, KCFG.EdgeLike):
1230                if curr_successor.target.id == target_id:
1231                    paths.append(tuple(curr_path))
1232                    continue
1233                else:
1234                    successors = list(self.successors(curr_successor.target.id))
1235
1236            elif isinstance(curr_successor, KCFG.MultiEdge):
1237                if len(list(curr_successor.targets)) == 1:
1238                    target = list(curr_successor.targets)[0]
1239                    if target.id == target_id:
1240                        paths.append(tuple(curr_path))
1241                        continue
1242                    else:
1243                        successors = list(self.successors(target.id))
1244                if len(list(curr_successor.targets)) > 1:
1245                    curr_path = curr_path[0:-1]
1246                    successors = [curr_successor.with_single_target(target) for target in curr_successor.targets]
1247
1248            for successor in successors:
1249                if isinstance(successor, KCFG.EdgeLike) and not _in_path(successor.target.id, curr_path):
1250                    worklist.append(curr_path + [successor])
1251                elif isinstance(successor, KCFG.MultiEdge):
1252                    if len(list(successor.targets)) == 1:
1253                        target = list(successor.targets)[0]
1254                        if not _in_path(target.id, curr_path):
1255                            worklist.append(curr_path + [successor])
1256                    elif len(list(successor.targets)) > 1:
1257                        worklist.append(curr_path + [successor])
1258
1259        return paths 
1260
[docs]
1261    def reachable_nodes(self, source_id: NodeIdLike, *, reverse: bool = False) -> set[KCFG.Node]:
1262        visited: set[KCFG.Node] = set()
1263        worklist: list[KCFG.Node] = [self.node(source_id)]
1264
1265        while worklist:
1266            node = worklist.pop()
1267
1268            if node in visited:
1269                continue
1270
1271            visited.add(node)
1272
1273            if not reverse:
1274                worklist.extend(target for succ in self.successors(source_id=node.id) for target in succ.targets)
1275            else:
1276                worklist.extend(succ.source for succ in self.predecessors(target_id=node.id))
1277
1278        return visited 
1279
[docs]
1280    def write_cfg_data(self) -> None:
1281        assert self._kcfg_store is not None
1282        self._kcfg_store.write_cfg_data(
1283            self.to_dict(), deleted_nodes=self._deleted_nodes, created_nodes=self._created_nodes
1284        )
1285        self._deleted_nodes.clear()
1286        self._created_nodes.clear() 
1287
[docs]
1288    @staticmethod
1289    def read_cfg_data(cfg_dir: Path) -> KCFG:
1290        store = KCFGStore(cfg_dir)
1291        cfg = KCFG.from_dict(store.read_cfg_data())
1292        cfg._kcfg_store = store
1293        return cfg 
1294
[docs]
1295    @staticmethod
1296    def read_node_data(cfg_dir: Path, node_id: int) -> KCFG.Node:
1297        store = KCFGStore(cfg_dir)
1298        return KCFG.Node.from_dict(store.read_node_data(node_id)) 
 
1299
1300
[docs]
1301class KCFGExtendResult(ABC): ... 
1302
1303
[docs]
1304@final
1305@dataclass(frozen=True)
1306class Vacuous(KCFGExtendResult): ... 
1307
1308
[docs]
1309@final
1310@dataclass(frozen=True)
1311class Stuck(KCFGExtendResult): ... 
1312
1313
[docs]
1314@final
1315@dataclass(frozen=True)
1316class Abstract(KCFGExtendResult):
1317    cterm: CTerm 
1318
1319
[docs]
1320@final
1321@dataclass(frozen=True)
1322class Step(KCFGExtendResult):
1323    cterm: CTerm
1324    depth: int
1325    logs: tuple[LogEntry, ...]
1326    rule_labels: list[str]
1327    cut: bool = field(default=False) 
1328
1329
[docs]
1330@final
1331@dataclass(frozen=True)
1332class Branch(KCFGExtendResult):
1333    constraints: tuple[KInner, ...]
1334    heuristic: bool
1335
1336    def __init__(self, constraints: Iterable[KInner], *, heuristic: bool = False):
1337        object.__setattr__(self, 'constraints', tuple(constraints))
1338        object.__setattr__(self, 'heuristic', heuristic) 
1339
1340
[docs]
1341@final
1342@dataclass(frozen=True)
1343class NDBranch(KCFGExtendResult):
1344    cterms: tuple[CTerm, ...]
1345    logs: tuple[LogEntry, ...]
1346    rule_labels: tuple[str, ...]
1347
1348    def __init__(self, cterms: Iterable[CTerm], logs: Iterable[LogEntry,], rule_labels: Iterable[str]):
1349        object.__setattr__(self, 'cterms', tuple(cterms))
1350        object.__setattr__(self, 'logs', tuple(logs))
1351        object.__setattr__(self, 'rule_labels', tuple(rule_labels))