1from __future__ import annotations
2
3import logging
4from typing import TYPE_CHECKING
5
6from graphviz import Digraph
7
8from ..kast.inner import KApply, KRewrite, top_down
9from ..kast.manip import (
10 flatten_label,
11 inline_cell_maps,
12 minimize_rule,
13 minimize_term,
14 ml_pred_to_bool,
15 push_down_rewrites,
16 remove_generated_cells,
17 sort_ac_collections,
18)
19from ..kast.outer import KRule
20from ..prelude.k import DOTS
21from ..prelude.ml import mlAnd
22from ..utils import add_indent, ensure_dir_path
23from .kcfg import KCFG
24
25if TYPE_CHECKING:
26 from collections.abc import Iterable
27 from pathlib import Path
28 from typing import Final
29
30 from ..kast import KInner
31 from ..kast.outer import KFlatModule, KSentence
32 from ..ktool.kprint import KPrint
33 from .kcfg import NodeIdLike
34
35_LOGGER: Final = logging.getLogger(__name__)
36
37
[docs]
38class NodePrinter:
39 kprint: KPrint
40 full_printer: bool
41 minimize: bool
42
43 def __init__(self, kprint: KPrint, full_printer: bool = False, minimize: bool = False):
44 self.kprint = kprint
45 self.full_printer = full_printer
46 self.minimize = minimize
47
[docs]
48 def print_node(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
49 attrs = self.node_attrs(kcfg, node)
50 attr_str = ' (' + ', '.join(attrs) + ')' if attrs else ''
51 node_strs = [f'{node.id}{attr_str}']
52 if self.full_printer:
53 kast = node.cterm.kast
54 if self.minimize:
55 kast = minimize_term(kast)
56 node_strs.extend(' ' + line for line in self.kprint.pretty_print(kast).split('\n'))
57 return node_strs
58
[docs]
59 def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
60 attrs = []
61 if kcfg.is_root(node.id):
62 attrs.append('root')
63 if kcfg.is_stuck(node.id):
64 attrs.append('stuck')
65 if kcfg.is_vacuous(node.id):
66 attrs.append('vacuous')
67 if kcfg.is_leaf(node.id):
68 attrs.append('leaf')
69 if kcfg.is_split(node.id):
70 attrs.append('split')
71 attrs.extend(['@' + alias for alias in sorted(kcfg.aliases(node.id))])
72 return attrs
73
74
[docs]
75class KCFGShow:
76 kprint: KPrint
77 node_printer: NodePrinter
78
79 def __init__(self, kprint: KPrint, node_printer: NodePrinter | None = None):
80 self.kprint = kprint
81 self.node_printer = node_printer if node_printer is not None else NodePrinter(kprint)
82
[docs]
83 def node_short_info(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
84 return self.node_printer.print_node(kcfg, node)
85
[docs]
86 @staticmethod
87 def hide_cells(term: KInner, omit_cells: Iterable[str]) -> KInner:
88 def _hide_cells(_k: KInner) -> KInner:
89 if type(_k) == KApply and _k.label.name in omit_cells:
90 return DOTS
91 return _k
92
93 if omit_cells:
94 return top_down(_hide_cells, term)
95 return term
96
[docs]
97 @staticmethod
98 def simplify_config(config: KInner, omit_cells: Iterable[str]) -> KInner:
99 config = inline_cell_maps(config)
100 config = sort_ac_collections(config)
101 config = KCFGShow.hide_cells(config, omit_cells)
102 return config
103
[docs]
104 @staticmethod
105 def make_unique_segments(segments: Iterable[tuple[str, Iterable[str]]]) -> Iterable[tuple[str, Iterable[str]]]:
106 _segments = []
107 used_ids = []
108 for id, seg_lines in segments:
109 suffix = ''
110 counter = 0
111 while f'{id}{suffix}' in used_ids:
112 suffix = f'_{counter}'
113 counter += 1
114 new_id = f'{id}{suffix}'
115 used_ids.append(new_id)
116 _segments.append((f'{new_id}', [l.rstrip() for l in seg_lines]))
117 return _segments
118
[docs]
119 def pretty_segments(self, kcfg: KCFG, minimize: bool = True) -> Iterable[tuple[str, Iterable[str]]]:
120 """Return a pretty version of the KCFG in segments.
121
122 Each segment is a tuple of an identifier and a list of lines to be printed for that segment (Tuple[str, Iterable[str]).
123 The identifier tells you whether that segment is for a given node, edge, or just pretty spacing ('unknown').
124 This is useful for applications which want to pretty print in chunks, so that they can know which printed region corresponds to each node/edge.
125 """
126
127 processed_nodes: list[KCFG.Node] = []
128 ret_lines: list[tuple[str, list[str]]] = []
129
130 def _print_node(node: KCFG.Node) -> list[str]:
131 return self.node_short_info(kcfg, node)
132
133 def _print_edge(edge: KCFG.Edge) -> list[str]:
134 if edge.depth == 1:
135 return ['(' + str(edge.depth) + ' step)']
136 else:
137 return ['(' + str(edge.depth) + ' steps)']
138
139 def _print_cover(cover: KCFG.Cover) -> Iterable[str]:
140 subst_strs = [f'{k} <- {self.kprint.pretty_print(v)}' for k, v in cover.csubst.subst.items()]
141 subst_str = ''
142 if len(subst_strs) == 0:
143 subst_str = '.Subst'
144 if len(subst_strs) == 1:
145 subst_str = subst_strs[0]
146 if len(subst_strs) > 1 and minimize:
147 subst_str = 'OMITTED SUBST'
148 if len(subst_strs) > 1 and not minimize:
149 subst_str = '{\n ' + '\n '.join(subst_strs) + '\n}'
150 constraint_str = self.kprint.pretty_print(ml_pred_to_bool(cover.csubst.constraint, unsafe=True))
151 if len(constraint_str) > 78:
152 constraint_str = 'OMITTED CONSTRAINT'
153 return [
154 f'constraint: {constraint_str}',
155 f'subst: {subst_str}',
156 ]
157
158 def _print_split_edge(split: KCFG.Split, target_id: int) -> list[str]:
159 csubst = split.splits[target_id]
160 ret_split_lines: list[str] = []
161 substs = csubst.subst.minimize().items()
162 constraints = [ml_pred_to_bool(c, unsafe=True) for c in csubst.constraints]
163 if len(constraints) == 1:
164 first_line, *rest_lines = self.kprint.pretty_print(constraints[0]).split('\n')
165 ret_split_lines.append(f'constraint: {first_line}')
166 ret_split_lines.extend(f' {line}' for line in rest_lines)
167 elif len(constraints) > 1:
168 ret_split_lines.append('constraints:')
169 for constraint in constraints:
170 first_line, *rest_lines = self.kprint.pretty_print(constraint).split('\n')
171 ret_split_lines.append(f' {first_line}')
172 ret_split_lines.extend(f' {line}' for line in rest_lines)
173 if len(substs) == 1:
174 vname, term = list(substs)[0]
175 ret_split_lines.append(f'subst: {vname} <- {self.kprint.pretty_print(term)}')
176 elif len(substs) > 1:
177 ret_split_lines.append('substs:')
178 ret_split_lines.extend(f' {vname} <- {self.kprint.pretty_print(term)}' for vname, term in substs)
179 return ret_split_lines
180
181 def _print_subgraph(indent: str, curr_node: KCFG.Node, prior_on_trace: list[KCFG.Node]) -> None:
182 processed = curr_node in processed_nodes
183 processed_nodes.append(curr_node)
184 successors = list(kcfg.successors(curr_node.id))
185
186 curr_node_strs = _print_node(curr_node)
187
188 ret_node_lines = []
189 suffix = []
190 elbow = '├─'
191 node_indent = '│ '
192 if kcfg.is_root(curr_node.id):
193 elbow = '┌─'
194 elif processed or not successors:
195 elbow = '└─'
196 node_indent = ' '
197 if curr_node in prior_on_trace:
198 suffix = ['(looped back)', '']
199 elif processed and not kcfg.is_leaf(curr_node.id):
200 suffix = ['(continues as previously)', '']
201 else:
202 suffix = ['']
203 ret_node_lines.append(indent + elbow + ' ' + curr_node_strs[0])
204 ret_node_lines.extend(add_indent(indent + node_indent, curr_node_strs[1:]))
205 ret_node_lines.extend(add_indent(indent + ' ', suffix))
206 ret_lines.append((f'node_{curr_node.id}', ret_node_lines))
207
208 if processed or not successors:
209 return
210 successor = successors[0]
211
212 if isinstance(successor, KCFG.MultiEdge):
213 ret_lines.append(('unknown', [f'{indent}┃']))
214 multiedge_label = '1 step' if type(successor) is KCFG.NDBranch else 'branch'
215 multiedge_id = 'ndbranch' if type(successor) is KCFG.NDBranch else 'split'
216 ret_lines.append(('unknown', [f'{indent}┃ ({multiedge_label})']))
217
218 for target in successor.targets[:-1]:
219 if type(successor) is KCFG.Split:
220 ret_edge_lines = _print_split_edge(successor, target.id)
221 ret_edge_lines = [indent + '┣━━┓ ' + ret_edge_lines[0]] + add_indent(
222 indent + '┃ ┃ ', ret_edge_lines[1:]
223 )
224 elif type(successor) is KCFG.NDBranch:
225 ret_edge_lines = [indent + '┣━━┓ ']
226 else:
227 raise AssertionError()
228 ret_edge_lines.append(indent + '┃ │')
229 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines))
230 _print_subgraph(indent + '┃ ', target, prior_on_trace + [curr_node])
231 target = successor.targets[-1]
232 if type(successor) is KCFG.Split:
233 ret_edge_lines = _print_split_edge(successor, target.id)
234 ret_edge_lines = [indent + '┗━━┓ ' + ret_edge_lines[0]] + add_indent(
235 indent + ' ┃ ', ret_edge_lines[1:]
236 )
237 elif type(successor) is KCFG.NDBranch:
238 ret_edge_lines = [indent + '┗━━┓ ']
239 else:
240 raise AssertionError()
241 ret_edge_lines.append(indent + ' │')
242 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines))
243 _print_subgraph(indent + ' ', target, prior_on_trace + [curr_node])
244
245 elif isinstance(successor, KCFG.EdgeLike):
246 ret_lines.append(('unknown', [f'{indent}│']))
247
248 if type(successor) is KCFG.Edge:
249 ret_edge_lines = []
250 ret_edge_lines.extend(add_indent(indent + '│ ', _print_edge(successor)))
251 ret_lines.append((f'edge_{successor.source.id}_{successor.target.id}', ret_edge_lines))
252
253 elif type(successor) is KCFG.Cover:
254 ret_edge_lines = []
255 ret_edge_lines.extend(add_indent(indent + '┊ ', _print_cover(successor)))
256 ret_lines.append((f'cover_{successor.source.id}_{successor.target.id}', ret_edge_lines))
257
258 _print_subgraph(indent, successor.target, prior_on_trace + [curr_node])
259
260 def _sorted_init_nodes() -> tuple[list[KCFG.Node], list[KCFG.Node]]:
261 sorted_init_nodes = sorted(node for node in kcfg.nodes if node not in processed_nodes)
262 init_nodes = []
263 init_leaf_nodes = []
264 remaining_nodes = []
265 for node in sorted_init_nodes:
266 if kcfg.is_root(node.id):
267 if kcfg.is_leaf(node.id):
268 init_leaf_nodes.append(node)
269 else:
270 init_nodes.append(node)
271 else:
272 remaining_nodes.append(node)
273 return (init_nodes + init_leaf_nodes, remaining_nodes)
274
275 init, _ = _sorted_init_nodes()
276 while init:
277 ret_lines.append(('unknown', ['']))
278 _print_subgraph('', init[0], [])
279 init, _ = _sorted_init_nodes()
280 _, remaining = _sorted_init_nodes()
281 if remaining:
282 ret_lines.append(('unknown', ['', 'Remaining Nodes:']))
283 for node in remaining:
284 ret_node_lines = [''] + _print_node(node)
285 ret_lines.append((f'node_{node.id}', ret_node_lines))
286
287 return KCFGShow.make_unique_segments(ret_lines)
288
[docs]
289 def pretty(
290 self,
291 kcfg: KCFG,
292 minimize: bool = True,
293 ) -> Iterable[str]:
294 return (line for _, seg_lines in self.pretty_segments(kcfg, minimize=minimize) for line in seg_lines)
295
[docs]
296 def to_module(
297 self,
298 cfg: KCFG,
299 module_name: str | None = None,
300 omit_cells: Iterable[str] = (),
301 parseable_output: bool = True,
302 ) -> KFlatModule:
303 def _process_sentence(sent: KSentence) -> KSentence:
304 if type(sent) is KRule:
305 sent = sent.let(body=KCFGShow.hide_cells(sent.body, omit_cells))
306 if parseable_output:
307 sent = sent.let(body=remove_generated_cells(sent.body))
308 sent = minimize_rule(sent)
309 return sent
310
311 module = cfg.to_module(module_name)
312 return module.let(sentences=[_process_sentence(sent) for sent in module.sentences])
313
[docs]
314 def show(
315 self,
316 cfg: KCFG,
317 nodes: Iterable[NodeIdLike] = (),
318 node_deltas: Iterable[tuple[NodeIdLike, NodeIdLike]] = (),
319 to_module: bool = False,
320 minimize: bool = True,
321 sort_collections: bool = False,
322 omit_cells: Iterable[str] = (),
323 module_name: str | None = None,
324 ) -> list[str]:
325 res_lines: list[str] = []
326 res_lines += self.pretty(cfg, minimize=minimize)
327
328 nodes_printed = False
329
330 for node_id in nodes:
331 nodes_printed = True
332 kast = cfg.node(node_id).cterm.kast
333 kast = KCFGShow.hide_cells(kast, omit_cells)
334 if minimize:
335 kast = minimize_term(kast)
336 res_lines.append('')
337 res_lines.append('')
338 res_lines.append(f'Node {node_id}:')
339 res_lines.append('')
340 res_lines.append(self.kprint.pretty_print(kast, sort_collections=sort_collections))
341 res_lines.append('')
342
343 for node_id_1, node_id_2 in node_deltas:
344 nodes_printed = True
345 config_1 = KCFGShow.simplify_config(cfg.node(node_id_1).cterm.config, omit_cells)
346 config_2 = KCFGShow.simplify_config(cfg.node(node_id_2).cterm.config, omit_cells)
347 config_delta = push_down_rewrites(KRewrite(config_1, config_2))
348 if minimize:
349 config_delta = minimize_term(config_delta)
350 res_lines.append('')
351 res_lines.append('')
352 res_lines.append(f'State Delta {node_id_1} => {node_id_2}:')
353 res_lines.append('')
354 res_lines.append(self.kprint.pretty_print(config_delta, sort_collections=sort_collections))
355 res_lines.append('')
356
357 if not (nodes_printed):
358 res_lines.append('')
359 res_lines.append('')
360 res_lines.append('')
361
362 if to_module:
363 module = self.to_module(cfg, module_name, omit_cells=omit_cells)
364 res_lines.append(self.kprint.pretty_print(module, sort_collections=sort_collections))
365
366 return res_lines
367
[docs]
368 def dot(self, kcfg: KCFG) -> Digraph:
369 def _short_label(label: str) -> str:
370 return '\n'.join(
371 [
372 label_line if len(label_line) < 100 else (label_line[0:100] + ' ...')
373 for label_line in label.split('\n')
374 ]
375 )
376
377 graph = Digraph()
378
379 for node in kcfg.nodes:
380 label = '\n'.join(self.node_short_info(kcfg, node))
381 class_attrs = ' '.join(self.node_printer.node_attrs(kcfg, node))
382 attrs = {'class': class_attrs} if class_attrs else {}
383 graph.node(name=node.id, label=label, **attrs)
384
385 for edge in kcfg.edges():
386 depth = edge.depth
387 label = f'{depth} steps'
388 graph.edge(tail_name=edge.source.id, head_name=edge.target.id, label=f' {label} ')
389
390 for cover in kcfg.covers():
391 label = ', '.join(
392 f'{k} |-> {self.kprint.pretty_print(v)}' for k, v in cover.csubst.subst.minimize().items()
393 )
394 label = _short_label(label)
395 attrs = {'class': 'abstraction', 'style': 'dashed'}
396 graph.edge(tail_name=cover.source.id, head_name=cover.target.id, label=f' {label} ', **attrs)
397
398 for split in kcfg.splits():
399 for target_id, csubst in split.splits.items():
400 label = '\n#And'.join(
401 f'{self.kprint.pretty_print(v)}' for v in split.source.cterm.constraints + csubst.constraints
402 )
403 graph.edge(tail_name=split.source.id, head_name=target_id, label=f' {label} ')
404
405 for ndbranch in kcfg.ndbranches():
406 for target in ndbranch.target_ids:
407 label = '1 step'
408 graph.edge(tail_name=ndbranch.source.id, head_name=target, label=f' {label} ')
409
410 return graph
411
[docs]
412 def dump(self, cfgid: str, cfg: KCFG, dump_dir: Path, dot: bool = False) -> None:
413 ensure_dir_path(dump_dir)
414
415 cfg_file = dump_dir / f'{cfgid}.json'
416 cfg_file.write_text(cfg.to_json())
417 _LOGGER.info(f'Wrote CFG file {cfgid}: {cfg_file}')
418
419 if dot:
420 cfg_dot = self.dot(cfg)
421 dot_file = dump_dir / f'{cfgid}.dot'
422 dot_file.write_text(cfg_dot.source)
423 _LOGGER.info(f'Wrote DOT file {cfgid}: {dot_file}')
424
425 nodes_dir = dump_dir / 'nodes'
426 ensure_dir_path(nodes_dir)
427 for node in cfg.nodes:
428 node_file = nodes_dir / f'config_{node.id}.txt'
429 node_minimized_file = nodes_dir / f'config_minimized_{node.id}.txt'
430 node_constraint_file = nodes_dir / f'constraint_{node.id}.txt'
431
432 config = node.cterm.config
433 if not node_file.exists():
434 node_file.write_text(self.kprint.pretty_print(config))
435 _LOGGER.info(f'Wrote node file {cfgid}: {node_file}')
436 config = minimize_term(config)
437 if not node_minimized_file.exists():
438 node_minimized_file.write_text(self.kprint.pretty_print(config))
439 _LOGGER.info(f'Wrote node file {cfgid}: {node_minimized_file}')
440 if not node_constraint_file.exists():
441 constraint = mlAnd(node.cterm.constraints)
442 node_constraint_file.write_text(self.kprint.pretty_print(constraint))
443 _LOGGER.info(f'Wrote node file {cfgid}: {node_constraint_file}')
444
445 edges_dir = dump_dir / 'edges'
446 ensure_dir_path(edges_dir)
447 for edge in cfg.edges():
448 edge_file = edges_dir / f'config_{edge.source.id}_{edge.target.id}.txt'
449 edge_minimized_file = edges_dir / f'config_minimized_{edge.source.id}_{edge.target.id}.txt'
450
451 config = push_down_rewrites(KRewrite(edge.source.cterm.config, edge.target.cterm.config))
452 if not edge_file.exists():
453 edge_file.write_text(self.kprint.pretty_print(config))
454 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_file}')
455 config = minimize_term(config)
456 if not edge_minimized_file.exists():
457 edge_minimized_file.write_text(self.kprint.pretty_print(config))
458 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_minimized_file}')
459
460 covers_dir = dump_dir / 'covers'
461 ensure_dir_path(covers_dir)
462 for cover in cfg.covers():
463 cover_file = covers_dir / f'config_{cover.source.id}_{cover.target.id}.txt'
464 cover_constraint_file = covers_dir / f'constraint_{cover.source.id}_{cover.target.id}.txt'
465
466 subst_equalities = flatten_label('#And', cover.csubst.subst.ml_pred)
467
468 if not cover_file.exists():
469 cover_file.write_text('\n'.join(self.kprint.pretty_print(se) for se in subst_equalities))
470 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_file}')
471 if not cover_constraint_file.exists():
472 cover_constraint_file.write_text(self.kprint.pretty_print(cover.csubst.constraint))
473 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_constraint_file}')