Source code for nnf.amc

"""An implementation of
`algebraic model counting <https://arxiv.org/abs/1211.4475>`_."""

import functools
import operator

import typing as t

from nnf import NNF, And, Var, Or, Internal, true, false
from nnf.util import Name, memoize

neg_inf = float('-inf')

T = t.TypeVar('T')

__all__ = (
    "eval",
    "reduce",
    "SAT",
    "NUM_SAT",
    "WMC",
    "PROB",
    "GRAD",
    "MPE",
    "maxplus_reduce",
)


[docs]def eval( node: NNF, add: t.Callable[[T, T], T], mul: t.Callable[[T, T], T], add_neut: T, mul_neut: T, labeling: t.Callable[[Var], T], ) -> T: """Execute an AMC technique, given a semiring and a labeling function. :param node: The sentence to calculate the value of. :param add: The ⊕ operator, to combine :class:`nnf.Or` nodes. :param mul: The ⊗ operator, to combine :class:`nnf.And` nodes. :param add_neut: e^⊕, the neutral element of the ⊕ operator. :param mul_neut: e^⊗, the neutral element of the ⊗ operator. :param labeling: The labeling function, to assign a value to each variable node. """ @memoize def do_eval(node: NNF) -> T: if node == true: return mul_neut elif node == false: return add_neut elif isinstance(node, Var): return labeling(node) assert isinstance(node, Internal) if len(node.children) == 1: return do_eval(*node.children) if isinstance(node, Or): return functools.reduce( add, (do_eval(child) for child in node.children), add_neut ) assert isinstance(node, And) return functools.reduce( mul, (do_eval(child) for child in node.children), mul_neut ) return do_eval(node)
def _prob_label(probs: t.Dict[Name, float]) -> t.Callable[[Var], float]: """Generate a labeling function for probabilities from a dictionary.""" def label(leaf: Var) -> float: if leaf.true: return probs[leaf.name] else: return 1.0 - probs[leaf.name] return label
[docs]def SAT(node: NNF) -> bool: """Determine whether a DNNF sentence is satisfiable.""" return eval(node, operator.or_, operator.and_, False, True, lambda leaf: True)
[docs]def NUM_SAT(node: NNF) -> int: """Determine the number of models that satisfy a sd-DNNF sentence.""" # General × # Non-idempotent + # Non-neutral + # = sd-DNNF return eval(node, operator.add, operator.mul, 0, 1, lambda leaf: 1)
[docs]def WMC(node: NNF, weights: t.Callable[[Var], float]) -> float: """Model counting of sd-DNNF sentences, weighted by variables. :param node: The sentence to measure. :param weights: A dictionary mapping variable nodes to weights. """ # General × # Non-idempotent + # Non-neutral + # = sd-DNNF return eval(node, operator.add, operator.mul, 0.0, 1.0, weights)
[docs]def PROB(node: NNF, probs: t.Dict[Name, float]) -> float: """Model counting of d-DNNF sentences, weighted by probabilities. :param node: The sentence to measure. :param probs: A dictionary mapping variable names to probabilities. """ # General × # Non-idempotent + # Neutral + # = d-DNNF return eval(node, operator.add, operator.mul, 0.0, 1.0, _prob_label(probs))
GradProb = t.Tuple[float, float]
[docs]def GRAD( node: NNF, probs: t.Dict[Name, float], k: t.Optional[Name] = None ) -> GradProb: """Calculate a gradient of a d-DNNF sentence being true depending on the value of a variable, given probabilities for all variables. :param node: The sentence. :param probs: A dictionary mapping variable names to probabilities. :param k: The name of the variable to check relative to. :return: A tuple of two floats (probability, gradient). """ # General × # Neutral + # Non-idempotent + # = d-DNNF def add(a: GradProb, b: GradProb) -> GradProb: return a[0] + b[0], a[1] + b[1] def mul(a: GradProb, b: GradProb) -> GradProb: return a[0] * b[0], a[0] * b[1] + a[1] * b[0] def label(var: Var) -> GradProb: if var.true: if var.name == k: return probs[var.name], 1 else: return probs[var.name], 0 else: if var.name == k: return 1 - probs[var.name], -1 else: return 1 - probs[var.name], 0 return eval(node, add, mul, (0.0, 0.0), (1.0, 0.0), label)
[docs]def MPE(node: NNF, probs: t.Dict[Name, float]) -> float: # General × # Non-neutral + # Idempotent + # = s-DNNF return eval(node, max, operator.mul, 0.0, 1.0, _prob_label(probs))
[docs]def reduce( node: NNF, add_key: t.Optional[t.Callable[[T], t.Any]], mul: t.Callable[[T, T], T], add_neut: T, mul_neut: T, labeling: t.Callable[[Var], T], ) -> NNF: """Execute AMC reduction on a sentence. In AMC reduction, the ⊕ operator must be ``max`` on some total order, and the branches of the sentence that don't contribute to the maximum value are removed. This leaves a simpler sentence with only the models with a maximum value. :param node: The sentence. :param add_key: A function given to ``max``'s ``key`` argument to determine the total order of the ⊕ operator. Pass ``None`` to use the default ordering. :param mul: See :func:`eval`. :param add_neut: See :func:`eval`. :param mul_neut: See :func:`eval`. :param labeling: See :func:`eval`. :return: The transformed sentence. """ if add_key is not None: add_key_ = add_key else: def add_key_(n: T) -> t.Any: return n def add(a: T, b: T) -> T: return max((a, b), key=add_key_) @memoize def eval_(node: NNF) -> T: return eval(node, add, mul, add_neut, mul_neut, labeling) @memoize def reduce_(node: NNF) -> NNF: if isinstance(node, Or): best = add_neut candidates = [] # type: t.List[NNF] for child in node.children: value = eval_(child) if value > best: # type: ignore best = value candidates = [child] elif value == best: candidates.append(child) return Or(reduce_(candidate) for candidate in candidates) elif isinstance(node, And): return And(reduce_(child) for child in node.children) else: return node return reduce_(node)
[docs]def maxplus_reduce(node: NNF, labels: t.Dict[Var, float]) -> NNF: """Execute AMC reduction using the maxplus algebra. :param node: The sentence. :param labels: A dictionary mapping variable nodes to numbers. """ def labeling(v: Var) -> float: return labels[v] return reduce(node, None, operator.add, neg_inf, 0, labeling)