Created
February 6, 2025 05:20
-
-
Save cheery/dfb45b7267910f4f91753bfbe42363fc to your computer and use it in GitHub Desktop.
SLDNF resolution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from collections import deque | |
from dataclasses import dataclass | |
from contextvars import ContextVar | |
from immutables import Map, MapMutation | |
from typing import Union, Optional, Any | |
class Variable: | |
pass | |
@dataclass(eq=True, frozen=True) | |
class Structure: | |
functor : str | |
args : tuple[Any] | |
@property | |
def signature(self): | |
return self.functor, len(self.args) | |
def show(self, show_fn): | |
args = " ".join(map(show_fn, self.args)) | |
return f"{self.functor} {args}" | |
class Command: | |
pass | |
class Expr: | |
pass | |
Subs = Map[Variable, Any] | |
Signature = tuple[str, int] | |
Code = list[Command] | |
Module = dict[Signature, Code] | |
Env = list[Any] | |
@dataclass | |
class Cont: | |
cont : Optional[Cont] | |
env : Env | |
code : Code | |
pc : int | |
def copy(self): | |
return Cont(self.cont, self.env, self.code, self.pc) | |
@dataclass | |
class Frame: | |
subs : Subs | |
cont : Cont | |
@dataclass | |
class Stream: | |
module : Module | |
datasets : dict[str, set[tuple[Any]]] | |
frames : deque[Frame] | |
empty = Map() | |
def init_frame(env, code, pc=0, subs=empty): | |
return Frame(subs = subs, cont = Cont(None, env, code, pc)) | |
def run(stream): | |
while stream.frames: | |
frame = stream.frames.pop() | |
while frame and frame.cont: | |
cont = frame.cont | |
command = cont.code[cont.pc] | |
cont.pc += 1 | |
frame = command(stream, frame) | |
if frame: | |
yield frame.subs | |
@dataclass | |
class Ix(Expr): | |
index : int | |
def evaluate(self, env : Env): | |
return env[self.index] | |
@dataclass | |
class Xt(Expr): | |
functor : str | |
args : list[Expr] | |
def evaluate(self, env : Env): | |
args = [a.evaluate(env) for a in self.args] | |
return Structure(self.functor, args) | |
@dataclass | |
class Const(Expr): | |
value : Any | |
def evaluate(self, env : Env): | |
return self.value | |
def query_generator(dataset: set[tuple[Any]], pattern_args: tuple[Any], base_subs: Subs): | |
for fact in dataset: | |
if len(fact) != len(pattern_args): | |
continue | |
engine = Engine(base_subs.mutate()) | |
success = True | |
if engine.unify(pattern_args, fact): | |
yield engine.subs.finish() | |
@dataclass | |
class ResumeQuery(Command): | |
gen: Any | |
def __call__(self, ab: Stream, frame: Frame): | |
try: | |
next_subs = next(self.gen) | |
except StopIteration: | |
return fail(ab) | |
frame.cont.pc -= 1 | |
ab.frames.append(frame) | |
return Frame(next_subs, frame.cont.cont.copy()) | |
@dataclass | |
class Query(Command): | |
name: str | |
args: list[Expr] | |
def __call__(self, ab: Stream, frame: Frame): | |
env = frame.cont.env | |
pattern_args = tuple(a.evaluate(env) for a in self.args) | |
dataset = ab.datasets.get(self.name, set()) | |
base_subs = frame.subs | |
gen = query_generator(dataset, pattern_args, base_subs) | |
try: | |
first = next(gen) | |
except StopIteration: | |
return fail(ab) | |
alt_cont = Cont(frame.cont.copy(), [], [ResumeQuery(gen)], 0) | |
ab.frames.append(Frame(base_subs, alt_cont)) | |
frame.subs = first | |
return frame | |
@dataclass | |
class Invoke(Command): | |
name : str | |
args : list[Expr] | |
tail_call : bool = False | |
recursive : bool = False | |
def __call__(self, ab : Stream, frame : Frame): | |
env = frame.cont.env | |
args = [a.evaluate(env) for a in self.args] | |
code = ab.module[(self.name, len(args))] | |
if self.tail_call: | |
c = frame.cont | |
c.env = frame.args | |
c.code = code | |
c.pc = 0 | |
else: | |
frame.cont = Cont(frame.cont, args, code, pc=0) | |
if self.recursive: | |
ab.frames.appendleft(frame) | |
frame = ab.frames.pop() | |
return frame | |
@dataclass | |
class Goto(Command): | |
pc : int | |
def __call__(self, ab : Stream, frame : Frame): | |
frame.cont.pc += self.pc | |
return frame | |
@dataclass | |
class Choice(Command): | |
pc : int | |
def __call__(self, ab : Stream, frame : Frame): | |
pc = frame.cont.pc | |
cont = Cont(frame.cont.cont, frame.cont.env, | |
frame.cont.code, pc + self.pc) | |
ab.frames.append(Frame(frame.subs, cont)) | |
return frame | |
@dataclass | |
class Fresh(Command): | |
count : int | |
def __call__(self, ab : Stream, frame : Frame): | |
for _ in range(self.count): | |
frame.cont.env = [Variable()] + frame.cont.env | |
return frame | |
@dataclass | |
class Unify(Command): | |
x : Expr | |
y : Expr | |
def __call__(self, ab : Stream, frame : Frame): | |
x = self.x.evaluate(frame.cont.env) | |
y = self.y.evaluate(frame.cont.env) | |
engine = Engine(frame.subs.mutate()) | |
if engine.unify(x, y): | |
frame.subs = engine.subs.finish() | |
return frame | |
else: | |
return fail(ab) | |
@dataclass | |
class Proceed(Command): | |
def __call__(self, ab : Stream, frame : Frame): | |
cont = frame.cont.cont | |
if cont is None: | |
frame.cont = None | |
else: | |
frame.cont = cont.copy() | |
return frame | |
@dataclass | |
class Fail(Command): | |
def __call__(self, ab : Stream, frame : Frame): | |
return fail(ab) | |
@dataclass | |
class Neg(Command): | |
code : Code | |
def __call__(self, ab : Stream, frame : Frame): | |
bc = init(ab.module, ab.datasets, frame.env, self.code, subs=frame.subs) | |
for _ in run(bc): | |
return fail(ab) | |
return frame | |
def init(module, datasets, env, code, pc=0, subs=empty): | |
return Stream(module, datasets, deque([init_frame(env, code, pc, subs)])) | |
def fail(ab : Stream): | |
try: | |
return ab.frames.pop() | |
except IndexError: | |
return None | |
@dataclass | |
class Engine: | |
subs : MapMutation[Variable, Any] | |
def bind(self, a, x): | |
if self.occurs(a, x): | |
return False | |
self.subs[a] = x | |
return True | |
def deref(self, a): | |
while isinstance(a, Variable) and a in self.subs: | |
a = self.subs[a] | |
return a | |
def occurs(self, a, x): | |
x = self.deref(x) | |
if a is x: | |
return True | |
elif isinstance(x, Structure): | |
return any(self.occurs(a, y) for y in x.args) | |
else: | |
return False | |
def unify(self, x, y): | |
x = self.deref(x) | |
y = self.deref(y) | |
if x is y: | |
return True | |
elif isinstance(x, Variable): | |
return self.bind(x,y) | |
elif isinstance(y, Variable): | |
return self.bind(y,x) | |
elif isinstance(x, Structure) and isinstance(y, Structure): | |
if x.functor == y.functor and len(x.args) == len(y.args): | |
return all(self.unify(x,y) for x,y in zip(x.args, y.args)) | |
else: | |
return False | |
elif isinstance(x, tuple) and isinstance(y, tuple): | |
if len(x) == len(y): | |
return all(self.unify(x,y) for x,y in zip(x, y)) | |
else: | |
return False | |
else: | |
return False | |
def walk(self, x): | |
x = self.deref(x) | |
if isinstance(x, Structure): | |
args = [self.walk(a) for a in x.args] | |
return Structure(x.functor, args) | |
else: | |
return x | |
@dataclass | |
class Show: | |
names : dict[Variable, str] | |
def __call__(self, x, prec=0): | |
if isinstance(x, Variable): | |
try: | |
return self.names[x] | |
except KeyError: | |
self.names[x] = name = f"%{len(self.names)}" | |
return name | |
if isinstance(x, Structure): | |
if prec <= 5: | |
return x.show(lambda a: self(a, 10)) | |
else: | |
return '(' + self(x) + ')' | |
elif isinstance(x, tuple): | |
if prec <= 0: | |
return ", ".join(self(a, 5) for a in x) | |
else: | |
return '(' + self(x) + ')' | |
else: | |
return repr(x) | |
if __name__=='__main__': | |
module = { | |
} | |
datasets = { | |
"foo" : set([ | |
("hello", 5), | |
("world", (6,8)), | |
("guux", 7), | |
]) | |
} | |
X, Y = Variable(), Variable() | |
code = [ | |
Choice(2), | |
Unify(Xt("term", [Ix(0)]), Ix(1)), | |
Proceed(), | |
Choice(2), | |
Unify(Xt("turm", [Ix(1)]), Ix(0)), | |
Proceed(), | |
Query("foo", [Ix(0), Ix(1)]), | |
Proceed(), | |
] | |
stream = init(module, datasets, [X,Y], code) | |
for subs in run(stream): | |
engine = Engine(subs.mutate()) | |
show = Show(names = {X: "X", Y: "Y"}) | |
print(f"X = {show(engine.walk(X), 5)}, Y = {show(engine.walk(Y), 5)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment