Created
June 7, 2019 13:41
-
-
Save danieljfarrell/54e262d78cf31c0e635645429c61df6b to your computer and use it in GitHub Desktop.
Using an @visits decorator with an async generator to monitor and enforce state transitions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import traceback | |
import enum | |
import functools | |
class InvalidVisitedState(Exception): | |
""" Raised when an state change sequence is incorrect. | |
""" | |
pass | |
class InvalidInitialState(Exception): | |
""" Raised when the initial state is incorrect. | |
""" | |
pass | |
def visits(transitions): | |
""" Decorator idea from the EOTPR project! | |
Parameters | |
---------- | |
transitions: list | |
A list of enum.Enum states. | |
Notes | |
----- | |
Must decorator a async generator function which generate state changes. | |
""" | |
def decorator(f): | |
@functools.wraps(f) | |
async def wrapper(self, *args, **kwargs): | |
# Check initial state is correct | |
sequence = list(transitions) | |
expected = sequence.pop(0) | |
state = self.state | |
if state != expected: | |
raise InvalidInitialState({"expected": expected, "got": state}) | |
# Check sequence is correct | |
visited = [state] | |
async for state in f(self, *args, **kwargs): | |
visited.append(state) | |
expected = sequence.pop(0) | |
if state != expected: | |
raise InvalidVisitedState({"visited": visited, "expected": expected}) | |
return wrapper | |
return decorator | |
class InstrumentState(enum.Enum): | |
""" Very basic state machine for hardware. | |
""" | |
Off = 1 | |
BusySwitchingOn = 2 | |
On = 3 | |
BusySwitchingOff = 4 | |
Fault = 5 | |
class NotAState(enum.Enum): | |
""" Used just for testing. | |
""" | |
Blah = 1 | |
class Instrument(object): | |
""" Basic abstraction of hardware. | |
""" | |
#: The instrument state. | |
_state = InstrumentState.Off | |
@property | |
def state(self): | |
return self._state | |
@state.setter | |
def state(self, new_value): | |
print("{} -> {}".format(self._state, new_value)) | |
self._state = new_value | |
@visits([InstrumentState.Off, InstrumentState.BusySwitchingOn, InstrumentState.On]) | |
async def switch_on(self, broken_command=False, wrong_transition=False): | |
""" Do startup sequence. | |
Kwargs are just used for debugging/testing to simulate errors. | |
""" | |
# In reality this would be a network command/response that would be awaited... | |
self.state = InstrumentState.BusySwitchingOn | |
#state = await self.set_state(InstrumentState.BusySwitchingOn) | |
yield self.state | |
# Simulate broken socket or exception raised by the server | |
if broken_command: | |
raise RuntimeError("Command failed.") | |
# Simulate incorrect state transition | |
if wrong_transition: | |
self.state = NotAState.Blah | |
yield self.state | |
self.state = InstrumentState.On | |
yield self.state | |
@visits([InstrumentState.On, InstrumentState.BusySwitchingOff, InstrumentState.Off]) | |
async def switch_off(self): | |
""" Do shutdown sequence. | |
""" | |
# In reality this would be a network command/response that would be awaited... | |
self.state = InstrumentState.BusySwitchingOff | |
yield self.state | |
self.state = InstrumentState.Off | |
yield self.state | |
async def main(): | |
""" Tests | |
""" | |
inst = Instrument() | |
print("\nSwitch On:") | |
await inst.switch_on() | |
print("Done!") | |
inst = Instrument() | |
print("\nSwitch On with broken command:") | |
try: | |
await inst.switch_on(broken_command=True) | |
except RuntimeError: | |
print("Generated RuntimeError as expected!") | |
inst = Instrument() | |
print("\nSwitch On with wrong state transition:") | |
try: | |
await inst.switch_on(wrong_transition=True) | |
except InvalidVisitedState: | |
print("Generated InvalidVisitedState as expected!") | |
inst = Instrument() | |
inst._state = InstrumentState.Fault | |
print("\nSwitch On with wrong initial state:") | |
try: | |
await inst.switch_on(wrong_transition=True) | |
except InvalidInitialState: | |
print("Generated InvalidInitialState as expected!") | |
inst = Instrument() | |
print("\nSwitch On and then off:") | |
await inst.switch_on() | |
await inst.switch_off() | |
print("Done!") | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment