Source code for pyk.kcfg.show

  1from __future__ import annotations
  2
  3import logging
  4from typing import TYPE_CHECKING
  5
  6from graphviz import Digraph
  7
  8from ..kast.inner import KApply, KRewrite, top_down
  9from ..kast.manip import (
 10    flatten_label,
 11    inline_cell_maps,
 12    minimize_rule,
 13    minimize_term,
 14    ml_pred_to_bool,
 15    push_down_rewrites,
 16    remove_generated_cells,
 17    sort_ac_collections,
 18)
 19from ..kast.outer import KRule
 20from ..prelude.k import DOTS
 21from ..prelude.ml import mlAnd
 22from ..utils import add_indent, ensure_dir_path
 23from .kcfg import KCFG
 24
 25if TYPE_CHECKING:
 26    from collections.abc import Iterable
 27    from pathlib import Path
 28    from typing import Final
 29
 30    from ..kast import KInner
 31    from ..kast.outer import KFlatModule, KSentence
 32    from ..ktool.kprint import KPrint
 33    from .kcfg import NodeIdLike
 34
 35_LOGGER: Final = logging.getLogger(__name__)
 36
 37
[docs] 38class NodePrinter: 39 kprint: KPrint 40 full_printer: bool 41 minimize: bool 42 43 def __init__(self, kprint: KPrint, full_printer: bool = False, minimize: bool = False): 44 self.kprint = kprint 45 self.full_printer = full_printer 46 self.minimize = minimize 47
[docs] 48 def print_node(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 49 attrs = self.node_attrs(kcfg, node) 50 attr_str = ' (' + ', '.join(attrs) + ')' if attrs else '' 51 node_strs = [f'{node.id}{attr_str}'] 52 if self.full_printer: 53 kast = node.cterm.kast 54 if self.minimize: 55 kast = minimize_term(kast) 56 node_strs.extend(' ' + line for line in self.kprint.pretty_print(kast).split('\n')) 57 return node_strs
58
[docs] 59 def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 60 attrs = [] 61 if kcfg.is_root(node.id): 62 attrs.append('root') 63 if kcfg.is_stuck(node.id): 64 attrs.append('stuck') 65 if kcfg.is_vacuous(node.id): 66 attrs.append('vacuous') 67 if kcfg.is_leaf(node.id): 68 attrs.append('leaf') 69 if kcfg.is_split(node.id): 70 attrs.append('split') 71 attrs.extend(['@' + alias for alias in sorted(kcfg.aliases(node.id))]) 72 return attrs
73 74
[docs] 75class KCFGShow: 76 kprint: KPrint 77 node_printer: NodePrinter 78 79 def __init__(self, kprint: KPrint, node_printer: NodePrinter | None = None): 80 self.kprint = kprint 81 self.node_printer = node_printer if node_printer is not None else NodePrinter(kprint) 82
[docs] 83 def node_short_info(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 84 return self.node_printer.print_node(kcfg, node)
85
[docs] 86 @staticmethod 87 def hide_cells(term: KInner, omit_cells: Iterable[str]) -> KInner: 88 def _hide_cells(_k: KInner) -> KInner: 89 if type(_k) == KApply and _k.label.name in omit_cells: 90 return DOTS 91 return _k 92 93 if omit_cells: 94 return top_down(_hide_cells, term) 95 return term
96
[docs] 97 @staticmethod 98 def simplify_config(config: KInner, omit_cells: Iterable[str]) -> KInner: 99 config = inline_cell_maps(config) 100 config = sort_ac_collections(config) 101 config = KCFGShow.hide_cells(config, omit_cells) 102 return config
103
[docs] 104 @staticmethod 105 def make_unique_segments(segments: Iterable[tuple[str, Iterable[str]]]) -> Iterable[tuple[str, Iterable[str]]]: 106 _segments = [] 107 used_ids = [] 108 for id, seg_lines in segments: 109 suffix = '' 110 counter = 0 111 while f'{id}{suffix}' in used_ids: 112 suffix = f'_{counter}' 113 counter += 1 114 new_id = f'{id}{suffix}' 115 used_ids.append(new_id) 116 _segments.append((f'{new_id}', [l.rstrip() for l in seg_lines])) 117 return _segments
118
[docs] 119 def pretty_segments(self, kcfg: KCFG, minimize: bool = True) -> Iterable[tuple[str, Iterable[str]]]: 120 """Return a pretty version of the KCFG in segments. 121 122 Each segment is a tuple of an identifier and a list of lines to be printed for that segment (Tuple[str, Iterable[str]). 123 The identifier tells you whether that segment is for a given node, edge, or just pretty spacing ('unknown'). 124 This is useful for applications which want to pretty print in chunks, so that they can know which printed region corresponds to each node/edge. 125 """ 126 127 processed_nodes: list[KCFG.Node] = [] 128 ret_lines: list[tuple[str, list[str]]] = [] 129 130 def _print_node(node: KCFG.Node) -> list[str]: 131 return self.node_short_info(kcfg, node) 132 133 def _print_edge(edge: KCFG.Edge) -> list[str]: 134 if edge.depth == 1: 135 return ['(' + str(edge.depth) + ' step)'] 136 else: 137 return ['(' + str(edge.depth) + ' steps)'] 138 139 def _print_cover(cover: KCFG.Cover) -> Iterable[str]: 140 subst_strs = [f'{k} <- {self.kprint.pretty_print(v)}' for k, v in cover.csubst.subst.items()] 141 subst_str = '' 142 if len(subst_strs) == 0: 143 subst_str = '.Subst' 144 if len(subst_strs) == 1: 145 subst_str = subst_strs[0] 146 if len(subst_strs) > 1 and minimize: 147 subst_str = 'OMITTED SUBST' 148 if len(subst_strs) > 1 and not minimize: 149 subst_str = '{\n ' + '\n '.join(subst_strs) + '\n}' 150 constraint_str = self.kprint.pretty_print(ml_pred_to_bool(cover.csubst.constraint, unsafe=True)) 151 if len(constraint_str) > 78: 152 constraint_str = 'OMITTED CONSTRAINT' 153 return [ 154 f'constraint: {constraint_str}', 155 f'subst: {subst_str}', 156 ] 157 158 def _print_split_edge(split: KCFG.Split, target_id: int) -> list[str]: 159 csubst = split.splits[target_id] 160 ret_split_lines: list[str] = [] 161 substs = csubst.subst.minimize().items() 162 constraints = [ml_pred_to_bool(c, unsafe=True) for c in csubst.constraints] 163 if len(constraints) == 1: 164 first_line, *rest_lines = self.kprint.pretty_print(constraints[0]).split('\n') 165 ret_split_lines.append(f'constraint: {first_line}') 166 ret_split_lines.extend(f' {line}' for line in rest_lines) 167 elif len(constraints) > 1: 168 ret_split_lines.append('constraints:') 169 for constraint in constraints: 170 first_line, *rest_lines = self.kprint.pretty_print(constraint).split('\n') 171 ret_split_lines.append(f' {first_line}') 172 ret_split_lines.extend(f' {line}' for line in rest_lines) 173 if len(substs) == 1: 174 vname, term = list(substs)[0] 175 ret_split_lines.append(f'subst: {vname} <- {self.kprint.pretty_print(term)}') 176 elif len(substs) > 1: 177 ret_split_lines.append('substs:') 178 ret_split_lines.extend(f' {vname} <- {self.kprint.pretty_print(term)}' for vname, term in substs) 179 return ret_split_lines 180 181 def _print_subgraph(indent: str, curr_node: KCFG.Node, prior_on_trace: list[KCFG.Node]) -> None: 182 processed = curr_node in processed_nodes 183 processed_nodes.append(curr_node) 184 successors = list(kcfg.successors(curr_node.id)) 185 186 curr_node_strs = _print_node(curr_node) 187 188 ret_node_lines = [] 189 suffix = [] 190 elbow = '├─' 191 node_indent = '│ ' 192 if kcfg.is_root(curr_node.id): 193 elbow = '┌─' 194 elif processed or not successors: 195 elbow = '└─' 196 node_indent = ' ' 197 if curr_node in prior_on_trace: 198 suffix = ['(looped back)', ''] 199 elif processed and not kcfg.is_leaf(curr_node.id): 200 suffix = ['(continues as previously)', ''] 201 else: 202 suffix = [''] 203 ret_node_lines.append(indent + elbow + ' ' + curr_node_strs[0]) 204 ret_node_lines.extend(add_indent(indent + node_indent, curr_node_strs[1:])) 205 ret_node_lines.extend(add_indent(indent + ' ', suffix)) 206 ret_lines.append((f'node_{curr_node.id}', ret_node_lines)) 207 208 if processed or not successors: 209 return 210 successor = successors[0] 211 212 if isinstance(successor, KCFG.MultiEdge): 213 ret_lines.append(('unknown', [f'{indent}┃'])) 214 multiedge_label = '1 step' if type(successor) is KCFG.NDBranch else 'branch' 215 multiedge_id = 'ndbranch' if type(successor) is KCFG.NDBranch else 'split' 216 ret_lines.append(('unknown', [f'{indent}┃ ({multiedge_label})'])) 217 218 for target in successor.targets[:-1]: 219 if type(successor) is KCFG.Split: 220 ret_edge_lines = _print_split_edge(successor, target.id) 221 ret_edge_lines = [indent + '┣━━┓ ' + ret_edge_lines[0]] + add_indent( 222 indent + '┃ ┃ ', ret_edge_lines[1:] 223 ) 224 elif type(successor) is KCFG.NDBranch: 225 ret_edge_lines = [indent + '┣━━┓ '] 226 else: 227 raise AssertionError() 228 ret_edge_lines.append(indent + '┃ │') 229 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines)) 230 _print_subgraph(indent + '┃ ', target, prior_on_trace + [curr_node]) 231 target = successor.targets[-1] 232 if type(successor) is KCFG.Split: 233 ret_edge_lines = _print_split_edge(successor, target.id) 234 ret_edge_lines = [indent + '┗━━┓ ' + ret_edge_lines[0]] + add_indent( 235 indent + ' ┃ ', ret_edge_lines[1:] 236 ) 237 elif type(successor) is KCFG.NDBranch: 238 ret_edge_lines = [indent + '┗━━┓ '] 239 else: 240 raise AssertionError() 241 ret_edge_lines.append(indent + ' │') 242 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines)) 243 _print_subgraph(indent + ' ', target, prior_on_trace + [curr_node]) 244 245 elif isinstance(successor, KCFG.EdgeLike): 246 ret_lines.append(('unknown', [f'{indent}│'])) 247 248 if type(successor) is KCFG.Edge: 249 ret_edge_lines = [] 250 ret_edge_lines.extend(add_indent(indent + '│ ', _print_edge(successor))) 251 ret_lines.append((f'edge_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 252 253 elif type(successor) is KCFG.Cover: 254 ret_edge_lines = [] 255 ret_edge_lines.extend(add_indent(indent + '┊ ', _print_cover(successor))) 256 ret_lines.append((f'cover_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 257 258 _print_subgraph(indent, successor.target, prior_on_trace + [curr_node]) 259 260 def _sorted_init_nodes() -> tuple[list[KCFG.Node], list[KCFG.Node]]: 261 sorted_init_nodes = sorted(node for node in kcfg.nodes if node not in processed_nodes) 262 init_nodes = [] 263 init_leaf_nodes = [] 264 remaining_nodes = [] 265 for node in sorted_init_nodes: 266 if kcfg.is_root(node.id): 267 if kcfg.is_leaf(node.id): 268 init_leaf_nodes.append(node) 269 else: 270 init_nodes.append(node) 271 else: 272 remaining_nodes.append(node) 273 return (init_nodes + init_leaf_nodes, remaining_nodes) 274 275 init, _ = _sorted_init_nodes() 276 while init: 277 ret_lines.append(('unknown', [''])) 278 _print_subgraph('', init[0], []) 279 init, _ = _sorted_init_nodes() 280 _, remaining = _sorted_init_nodes() 281 if remaining: 282 ret_lines.append(('unknown', ['', 'Remaining Nodes:'])) 283 for node in remaining: 284 ret_node_lines = [''] + _print_node(node) 285 ret_lines.append((f'node_{node.id}', ret_node_lines)) 286 287 return KCFGShow.make_unique_segments(ret_lines)
288
[docs] 289 def pretty( 290 self, 291 kcfg: KCFG, 292 minimize: bool = True, 293 ) -> Iterable[str]: 294 return (line for _, seg_lines in self.pretty_segments(kcfg, minimize=minimize) for line in seg_lines)
295
[docs] 296 def to_module( 297 self, 298 cfg: KCFG, 299 module_name: str | None = None, 300 omit_cells: Iterable[str] = (), 301 parseable_output: bool = True, 302 ) -> KFlatModule: 303 def _process_sentence(sent: KSentence) -> KSentence: 304 if type(sent) is KRule: 305 sent = sent.let(body=KCFGShow.hide_cells(sent.body, omit_cells)) 306 if parseable_output: 307 sent = sent.let(body=remove_generated_cells(sent.body)) 308 sent = minimize_rule(sent) 309 return sent 310 311 module = cfg.to_module(module_name) 312 return module.let(sentences=[_process_sentence(sent) for sent in module.sentences])
313
[docs] 314 def show( 315 self, 316 cfg: KCFG, 317 nodes: Iterable[NodeIdLike] = (), 318 node_deltas: Iterable[tuple[NodeIdLike, NodeIdLike]] = (), 319 to_module: bool = False, 320 minimize: bool = True, 321 sort_collections: bool = False, 322 omit_cells: Iterable[str] = (), 323 module_name: str | None = None, 324 ) -> list[str]: 325 res_lines: list[str] = [] 326 res_lines += self.pretty(cfg, minimize=minimize) 327 328 nodes_printed = False 329 330 for node_id in nodes: 331 nodes_printed = True 332 kast = cfg.node(node_id).cterm.kast 333 kast = KCFGShow.hide_cells(kast, omit_cells) 334 if minimize: 335 kast = minimize_term(kast) 336 res_lines.append('') 337 res_lines.append('') 338 res_lines.append(f'Node {node_id}:') 339 res_lines.append('') 340 res_lines.append(self.kprint.pretty_print(kast, sort_collections=sort_collections)) 341 res_lines.append('') 342 343 for node_id_1, node_id_2 in node_deltas: 344 nodes_printed = True 345 config_1 = KCFGShow.simplify_config(cfg.node(node_id_1).cterm.config, omit_cells) 346 config_2 = KCFGShow.simplify_config(cfg.node(node_id_2).cterm.config, omit_cells) 347 config_delta = push_down_rewrites(KRewrite(config_1, config_2)) 348 if minimize: 349 config_delta = minimize_term(config_delta) 350 res_lines.append('') 351 res_lines.append('') 352 res_lines.append(f'State Delta {node_id_1} => {node_id_2}:') 353 res_lines.append('') 354 res_lines.append(self.kprint.pretty_print(config_delta, sort_collections=sort_collections)) 355 res_lines.append('') 356 357 if not (nodes_printed): 358 res_lines.append('') 359 res_lines.append('') 360 res_lines.append('') 361 362 if to_module: 363 module = self.to_module(cfg, module_name, omit_cells=omit_cells) 364 res_lines.append(self.kprint.pretty_print(module, sort_collections=sort_collections)) 365 366 return res_lines
367
[docs] 368 def dot(self, kcfg: KCFG) -> Digraph: 369 def _short_label(label: str) -> str: 370 return '\n'.join( 371 [ 372 label_line if len(label_line) < 100 else (label_line[0:100] + ' ...') 373 for label_line in label.split('\n') 374 ] 375 ) 376 377 graph = Digraph() 378 379 for node in kcfg.nodes: 380 label = '\n'.join(self.node_short_info(kcfg, node)) 381 class_attrs = ' '.join(self.node_printer.node_attrs(kcfg, node)) 382 attrs = {'class': class_attrs} if class_attrs else {} 383 graph.node(name=node.id, label=label, **attrs) 384 385 for edge in kcfg.edges(): 386 depth = edge.depth 387 label = f'{depth} steps' 388 graph.edge(tail_name=edge.source.id, head_name=edge.target.id, label=f' {label} ') 389 390 for cover in kcfg.covers(): 391 label = ', '.join( 392 f'{k} |-> {self.kprint.pretty_print(v)}' for k, v in cover.csubst.subst.minimize().items() 393 ) 394 label = _short_label(label) 395 attrs = {'class': 'abstraction', 'style': 'dashed'} 396 graph.edge(tail_name=cover.source.id, head_name=cover.target.id, label=f' {label} ', **attrs) 397 398 for split in kcfg.splits(): 399 for target_id, csubst in split.splits.items(): 400 label = '\n#And'.join( 401 f'{self.kprint.pretty_print(v)}' for v in split.source.cterm.constraints + csubst.constraints 402 ) 403 graph.edge(tail_name=split.source.id, head_name=target_id, label=f' {label} ') 404 405 for ndbranch in kcfg.ndbranches(): 406 for target in ndbranch.target_ids: 407 label = '1 step' 408 graph.edge(tail_name=ndbranch.source.id, head_name=target, label=f' {label} ') 409 410 return graph
411
[docs] 412 def dump(self, cfgid: str, cfg: KCFG, dump_dir: Path, dot: bool = False) -> None: 413 ensure_dir_path(dump_dir) 414 415 cfg_file = dump_dir / f'{cfgid}.json' 416 cfg_file.write_text(cfg.to_json()) 417 _LOGGER.info(f'Wrote CFG file {cfgid}: {cfg_file}') 418 419 if dot: 420 cfg_dot = self.dot(cfg) 421 dot_file = dump_dir / f'{cfgid}.dot' 422 dot_file.write_text(cfg_dot.source) 423 _LOGGER.info(f'Wrote DOT file {cfgid}: {dot_file}') 424 425 nodes_dir = dump_dir / 'nodes' 426 ensure_dir_path(nodes_dir) 427 for node in cfg.nodes: 428 node_file = nodes_dir / f'config_{node.id}.txt' 429 node_minimized_file = nodes_dir / f'config_minimized_{node.id}.txt' 430 node_constraint_file = nodes_dir / f'constraint_{node.id}.txt' 431 432 config = node.cterm.config 433 if not node_file.exists(): 434 node_file.write_text(self.kprint.pretty_print(config)) 435 _LOGGER.info(f'Wrote node file {cfgid}: {node_file}') 436 config = minimize_term(config) 437 if not node_minimized_file.exists(): 438 node_minimized_file.write_text(self.kprint.pretty_print(config)) 439 _LOGGER.info(f'Wrote node file {cfgid}: {node_minimized_file}') 440 if not node_constraint_file.exists(): 441 constraint = mlAnd(node.cterm.constraints) 442 node_constraint_file.write_text(self.kprint.pretty_print(constraint)) 443 _LOGGER.info(f'Wrote node file {cfgid}: {node_constraint_file}') 444 445 edges_dir = dump_dir / 'edges' 446 ensure_dir_path(edges_dir) 447 for edge in cfg.edges(): 448 edge_file = edges_dir / f'config_{edge.source.id}_{edge.target.id}.txt' 449 edge_minimized_file = edges_dir / f'config_minimized_{edge.source.id}_{edge.target.id}.txt' 450 451 config = push_down_rewrites(KRewrite(edge.source.cterm.config, edge.target.cterm.config)) 452 if not edge_file.exists(): 453 edge_file.write_text(self.kprint.pretty_print(config)) 454 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_file}') 455 config = minimize_term(config) 456 if not edge_minimized_file.exists(): 457 edge_minimized_file.write_text(self.kprint.pretty_print(config)) 458 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_minimized_file}') 459 460 covers_dir = dump_dir / 'covers' 461 ensure_dir_path(covers_dir) 462 for cover in cfg.covers(): 463 cover_file = covers_dir / f'config_{cover.source.id}_{cover.target.id}.txt' 464 cover_constraint_file = covers_dir / f'constraint_{cover.source.id}_{cover.target.id}.txt' 465 466 subst_equalities = flatten_label('#And', cover.csubst.subst.ml_pred) 467 468 if not cover_file.exists(): 469 cover_file.write_text('\n'.join(self.kprint.pretty_print(se) for se in subst_equalities)) 470 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_file}') 471 if not cover_constraint_file.exists(): 472 cover_constraint_file.write_text(self.kprint.pretty_print(cover.csubst.constraint)) 473 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_constraint_file}')