Skip to content

Instantly share code, notes, and snippets.

@cheery
Created February 6, 2025 05:20
Show Gist options
  • Save cheery/dfb45b7267910f4f91753bfbe42363fc to your computer and use it in GitHub Desktop.
Save cheery/dfb45b7267910f4f91753bfbe42363fc to your computer and use it in GitHub Desktop.
SLDNF resolution
from __future__ import annotations
from collections import deque
from dataclasses import dataclass
from contextvars import ContextVar
from immutables import Map, MapMutation
from typing import Union, Optional, Any
class Variable:
pass
@dataclass(eq=True, frozen=True)
class Structure:
functor : str
args : tuple[Any]
@property
def signature(self):
return self.functor, len(self.args)
def show(self, show_fn):
args = " ".join(map(show_fn, self.args))
return f"{self.functor} {args}"
class Command:
pass
class Expr:
pass
Subs = Map[Variable, Any]
Signature = tuple[str, int]
Code = list[Command]
Module = dict[Signature, Code]
Env = list[Any]
@dataclass
class Cont:
cont : Optional[Cont]
env : Env
code : Code
pc : int
def copy(self):
return Cont(self.cont, self.env, self.code, self.pc)
@dataclass
class Frame:
subs : Subs
cont : Cont
@dataclass
class Stream:
module : Module
datasets : dict[str, set[tuple[Any]]]
frames : deque[Frame]
empty = Map()
def init_frame(env, code, pc=0, subs=empty):
return Frame(subs = subs, cont = Cont(None, env, code, pc))
def run(stream):
while stream.frames:
frame = stream.frames.pop()
while frame and frame.cont:
cont = frame.cont
command = cont.code[cont.pc]
cont.pc += 1
frame = command(stream, frame)
if frame:
yield frame.subs
@dataclass
class Ix(Expr):
index : int
def evaluate(self, env : Env):
return env[self.index]
@dataclass
class Xt(Expr):
functor : str
args : list[Expr]
def evaluate(self, env : Env):
args = [a.evaluate(env) for a in self.args]
return Structure(self.functor, args)
@dataclass
class Const(Expr):
value : Any
def evaluate(self, env : Env):
return self.value
def query_generator(dataset: set[tuple[Any]], pattern_args: tuple[Any], base_subs: Subs):
for fact in dataset:
if len(fact) != len(pattern_args):
continue
engine = Engine(base_subs.mutate())
success = True
if engine.unify(pattern_args, fact):
yield engine.subs.finish()
@dataclass
class ResumeQuery(Command):
gen: Any
def __call__(self, ab: Stream, frame: Frame):
try:
next_subs = next(self.gen)
except StopIteration:
return fail(ab)
frame.cont.pc -= 1
ab.frames.append(frame)
return Frame(next_subs, frame.cont.cont.copy())
@dataclass
class Query(Command):
name: str
args: list[Expr]
def __call__(self, ab: Stream, frame: Frame):
env = frame.cont.env
pattern_args = tuple(a.evaluate(env) for a in self.args)
dataset = ab.datasets.get(self.name, set())
base_subs = frame.subs
gen = query_generator(dataset, pattern_args, base_subs)
try:
first = next(gen)
except StopIteration:
return fail(ab)
alt_cont = Cont(frame.cont.copy(), [], [ResumeQuery(gen)], 0)
ab.frames.append(Frame(base_subs, alt_cont))
frame.subs = first
return frame
@dataclass
class Invoke(Command):
name : str
args : list[Expr]
tail_call : bool = False
recursive : bool = False
def __call__(self, ab : Stream, frame : Frame):
env = frame.cont.env
args = [a.evaluate(env) for a in self.args]
code = ab.module[(self.name, len(args))]
if self.tail_call:
c = frame.cont
c.env = frame.args
c.code = code
c.pc = 0
else:
frame.cont = Cont(frame.cont, args, code, pc=0)
if self.recursive:
ab.frames.appendleft(frame)
frame = ab.frames.pop()
return frame
@dataclass
class Goto(Command):
pc : int
def __call__(self, ab : Stream, frame : Frame):
frame.cont.pc += self.pc
return frame
@dataclass
class Choice(Command):
pc : int
def __call__(self, ab : Stream, frame : Frame):
pc = frame.cont.pc
cont = Cont(frame.cont.cont, frame.cont.env,
frame.cont.code, pc + self.pc)
ab.frames.append(Frame(frame.subs, cont))
return frame
@dataclass
class Fresh(Command):
count : int
def __call__(self, ab : Stream, frame : Frame):
for _ in range(self.count):
frame.cont.env = [Variable()] + frame.cont.env
return frame
@dataclass
class Unify(Command):
x : Expr
y : Expr
def __call__(self, ab : Stream, frame : Frame):
x = self.x.evaluate(frame.cont.env)
y = self.y.evaluate(frame.cont.env)
engine = Engine(frame.subs.mutate())
if engine.unify(x, y):
frame.subs = engine.subs.finish()
return frame
else:
return fail(ab)
@dataclass
class Proceed(Command):
def __call__(self, ab : Stream, frame : Frame):
cont = frame.cont.cont
if cont is None:
frame.cont = None
else:
frame.cont = cont.copy()
return frame
@dataclass
class Fail(Command):
def __call__(self, ab : Stream, frame : Frame):
return fail(ab)
@dataclass
class Neg(Command):
code : Code
def __call__(self, ab : Stream, frame : Frame):
bc = init(ab.module, ab.datasets, frame.env, self.code, subs=frame.subs)
for _ in run(bc):
return fail(ab)
return frame
def init(module, datasets, env, code, pc=0, subs=empty):
return Stream(module, datasets, deque([init_frame(env, code, pc, subs)]))
def fail(ab : Stream):
try:
return ab.frames.pop()
except IndexError:
return None
@dataclass
class Engine:
subs : MapMutation[Variable, Any]
def bind(self, a, x):
if self.occurs(a, x):
return False
self.subs[a] = x
return True
def deref(self, a):
while isinstance(a, Variable) and a in self.subs:
a = self.subs[a]
return a
def occurs(self, a, x):
x = self.deref(x)
if a is x:
return True
elif isinstance(x, Structure):
return any(self.occurs(a, y) for y in x.args)
else:
return False
def unify(self, x, y):
x = self.deref(x)
y = self.deref(y)
if x is y:
return True
elif isinstance(x, Variable):
return self.bind(x,y)
elif isinstance(y, Variable):
return self.bind(y,x)
elif isinstance(x, Structure) and isinstance(y, Structure):
if x.functor == y.functor and len(x.args) == len(y.args):
return all(self.unify(x,y) for x,y in zip(x.args, y.args))
else:
return False
elif isinstance(x, tuple) and isinstance(y, tuple):
if len(x) == len(y):
return all(self.unify(x,y) for x,y in zip(x, y))
else:
return False
else:
return False
def walk(self, x):
x = self.deref(x)
if isinstance(x, Structure):
args = [self.walk(a) for a in x.args]
return Structure(x.functor, args)
else:
return x
@dataclass
class Show:
names : dict[Variable, str]
def __call__(self, x, prec=0):
if isinstance(x, Variable):
try:
return self.names[x]
except KeyError:
self.names[x] = name = f"%{len(self.names)}"
return name
if isinstance(x, Structure):
if prec <= 5:
return x.show(lambda a: self(a, 10))
else:
return '(' + self(x) + ')'
elif isinstance(x, tuple):
if prec <= 0:
return ", ".join(self(a, 5) for a in x)
else:
return '(' + self(x) + ')'
else:
return repr(x)
if __name__=='__main__':
module = {
}
datasets = {
"foo" : set([
("hello", 5),
("world", (6,8)),
("guux", 7),
])
}
X, Y = Variable(), Variable()
code = [
Choice(2),
Unify(Xt("term", [Ix(0)]), Ix(1)),
Proceed(),
Choice(2),
Unify(Xt("turm", [Ix(1)]), Ix(0)),
Proceed(),
Query("foo", [Ix(0), Ix(1)]),
Proceed(),
]
stream = init(module, datasets, [X,Y], code)
for subs in run(stream):
engine = Engine(subs.mutate())
show = Show(names = {X: "X", Y: "Y"})
print(f"X = {show(engine.walk(X), 5)}, Y = {show(engine.walk(Y), 5)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment