Last active
December 7, 2019 17:04
-
-
Save kjagiello/57dd8a47ea53feac4922caf2d518ef23 to your computer and use it in GitHub Desktop.
Experimental circuit graph executioner. It allows you to build a circuit consisting of different components (gates) that are interconnected with each other using wires, then simulating its execution.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from collections import defaultdict | |
class GateError(Exception): | |
pass | |
class ExecutionError(Exception): | |
pass | |
class GatePin: | |
pass | |
class InputPin(GatePin): | |
pass | |
class OutputPin(GatePin): | |
pass | |
class GateMeta(type): | |
def __new__(cls, names, bases, attrs): | |
mapping = {InputPin: "_inputs", OutputPin: "_outputs"} | |
new_attrs = {v: {} for v in mapping.values()} | |
for k, v in attrs.items(): | |
if isinstance(v, GatePin) and v.__class__ in mapping: | |
new_attrs[mapping[v.__class__]][k] = v | |
return super().__new__(cls, names, bases, {**attrs, **new_attrs}) | |
class Gate(metaclass=GateMeta): | |
def get_inputs(self): | |
return self._inputs | |
def get_outputs(self): | |
return self.outputs | |
def _execute(self, **inputs): | |
if set(self.inputs.keys()) != set(inputs.keys()): | |
raise GateError("Received invalid inputs.") | |
outputs = self.execute(**inputs) | |
if set(self.outputs.keys()) != set(outputs.keys()): | |
raise GateError("Produced invalid outputs.") | |
return outputs | |
def execute(self, **inputs): | |
"""Takes in an input dict and returns a new output dict.""" | |
raise NotImplementedError | |
class GateBreakpoint: | |
def __init__(self, gate, inputs): | |
self.gate = gate | |
self.inputs = inputs | |
def __repr__(self): | |
inputs = " ".join(f"{k}={v!r}" for k, v in self.inputs.items()) | |
return f"<{self.gate.__class__.__name__} {inputs}>" | |
def __str__(self): | |
return repr(self) | |
class CircuitGraph: | |
def __init__(self, clock_speed=None): | |
self.clock_speed = clock_speed | |
self.breakpoints = [] | |
self.gates = [] | |
self.output_wires = defaultdict(list) | |
def add_gate(self, gate): | |
self.gates.append(gate) | |
def add_breakpoint(self, gate): | |
self.breakpoints.append(gate) | |
def add_wire(self, output_gate, output_pin, input_gate, input_pin): | |
self.output_wires[output_gate].append((output_pin, input_gate, input_pin)) | |
def execute(self, entrypoint, **kwargs): | |
"""Execute the circuit starting from the entrypoint. | |
Pass input values to the entrypoint gate as kwargs.""" | |
# TODO: ensure the graph is complete, i.e. all the pins are connected | |
propagate_inputless = True | |
state = defaultdict(dict) | |
state[entrypoint] = kwargs | |
queue = [entrypoint] | |
while True: | |
while queue: | |
gate = queue.pop() | |
if self.clock_speed is not None: | |
time.sleep(1 / self.clock_speed) | |
current_inputs = state[gate] | |
expected_inputs = gate.get_inputs() | |
satisfied = set(expected_inputs.keys()) == set(current_inputs.keys()) | |
if not satisfied: | |
continue | |
# Skip if this is an input-less gate that has already propagated | |
# its outputs. | |
has_propagated = all( | |
input_pin in state[input_gate] | |
for __, input_gate, input_pin in self.output_wires[gate] | |
) | |
if has_propagated: | |
continue | |
# Check if any breakpoint was set on this gate | |
if gate in self.breakpoints: | |
yield GateBreakpoint(gate=gate, inputs=current_inputs) | |
# Execute the gate | |
outputs = gate.execute(**current_inputs) | |
# Follow the output wires for this gate and propagate the output to | |
# the target gates | |
for output_pin, input_gate, input_pin in self.output_wires[gate]: | |
state[input_gate][input_pin] = outputs[output_pin] | |
queue.append(input_gate) | |
# Reset the input state for this gate and rerun the graph | |
state[gate] = {} | |
propagate_inputless = True | |
# We seemingly have no work left to do in the circuit. It could be either | |
# because we got stuck in the execution (probably some wires missing) or we | |
# have some input-less gates that we need to manually propagate. We attempt | |
# it once here, rerun the graph and ensure that it actually led to more work | |
# be done. | |
if not queue and propagate_inputless: | |
queue = [gate for gate in self.gates if not gate.get_inputs()] | |
propagate_inputless = False | |
# Ending up here means that we got stuck somewhere in the circuit. | |
# TODO: Add the current state to the error | |
raise ExecutionError("No progress was made.") | |
if __name__ == "__main__": | |
class ConstantValue(Gate): | |
value = OutputPin() | |
def __init__(self, value): | |
self.value = value | |
def execute(self): | |
return {"value": self.value} | |
class ProgramCounter(Gate): | |
counter_in = InputPin() | |
counter_out = OutputPin() | |
def execute(self, counter_in): | |
return {"counter_out": counter_in} | |
class AddGate(Gate): | |
x = InputPin() | |
y = InputPin() | |
z = OutputPin() | |
def execute(self, x, y): | |
return {"z": x + y} | |
class Comparator(Gate): | |
"""Compares a with b and sets c to 1 if true.""" | |
a = InputPin() | |
b = InputPin() | |
c = OutputPin() | |
def execute(self, a, b): | |
return {"c": int(a == b)} | |
class Mux(Gate): | |
a = InputPin() | |
b = InputPin() | |
s = InputPin() | |
c = OutputPin() | |
def execute(self, a, b, s): | |
return {"c": b if s == 1 else a} | |
# Setup the gates | |
program_counter = ProgramCounter() | |
pc_increment = ConstantValue(4) | |
pc_limit = ConstantValue(12) | |
pc_zero = ConstantValue(0) | |
comparator = Comparator() | |
add_gate = AddGate() | |
mux_gate = Mux() | |
# Create an empty circuit | |
graph = CircuitGraph(clock_speed=4) | |
# Add all the different gates to the circuit | |
graph.add_gate(program_counter) | |
graph.add_gate(add_gate) | |
graph.add_gate(pc_increment) | |
graph.add_gate(pc_limit) | |
graph.add_gate(pc_zero) | |
graph.add_gate(comparator) | |
graph.add_gate(mux_gate) | |
# Connect the pins between the gates | |
graph.add_wire(program_counter, "counter_out", add_gate, "x") | |
graph.add_wire(pc_increment, "value", add_gate, "y") | |
# Set up the comparator | |
graph.add_wire(add_gate, "z", comparator, "a") | |
graph.add_wire(pc_limit, "value", comparator, "b") | |
# Set up the mux | |
graph.add_wire(comparator, "c", mux_gate, "s") | |
graph.add_wire(add_gate, "z", mux_gate, "a") | |
graph.add_wire(pc_zero, "value", mux_gate, "b") | |
# Feed back the new counter from the MUX back to the PC | |
graph.add_wire(mux_gate, "c", program_counter, "counter_in") | |
# Mark the PC gate as a breakpoint, so that we can easily track the program | |
# execution | |
graph.add_breakpoint(program_counter) | |
# Start the circuit | |
executor = graph.execute(entrypoint=program_counter, counter_in=0) | |
for brk in executor: | |
print(brk) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment