import threading
import time
-from . import debug_print
+from transitions.extensions import HierarchicalMachine as Machine
+from . import debug_print, error_print
class Action:
- action_types = [
+ ACTION_TYPES = [
'command',
'interrupt_wait',
'pause',
'wait',
]
+ STATES = [
+ 'initial',
+ 'loading',
+ 'failed',
+ {
+ 'name': 'loaded',
+ 'children': ['running']
+ }
+ ]
+
+ TRANSITIONS = [
+ {
+ 'trigger': 'load',
+ 'source': 'initial',
+ 'dest': 'loading'
+ },
+ {
+ 'trigger': 'fail',
+ 'source': 'loading',
+ 'dest': 'failed',
+ 'after': 'poll_loaded'
+ },
+ {
+ 'trigger': 'success',
+ 'source': 'loading',
+ 'dest': 'loaded',
+ 'after': 'poll_loaded'
+ },
+ {
+ 'trigger': 'run',
+ 'source': 'loaded',
+ 'dest': 'loaded_running',
+ 'after': 'finish_action',
+ # if a child has no transitions, then it is bubbled to the parent,
+ # and we don't want that. Not useful in that machine precisely.
+ 'conditions': ['is_loaded']
+ },
+ {
+ 'trigger': 'finish_action',
+ 'source': 'loaded_running',
+ 'dest': 'loaded'
+ }
+ ]
+
def __init__(self, action, key, **kwargs):
- if action in self.action_types:
- self.action = action
- else:
- raise Exception("Unknown action {}".format(action))
+ Machine(model=self, states=self.STATES,
+ transitions=self.TRANSITIONS, initial='initial',
+ ignore_invalid_triggers=True, queued=True)
+ self.action = action
self.key = key
self.mapping = key.parent
self.arguments = kwargs
self.sleep_event = None
+ self.waiting_music = None
+
+ def is_loaded_or_failed(self):
+ return self.is_loaded(allow_substates=True) or self.is_failed()
+
+ def callback_music_loaded(self, success):
+ if success:
+ self.success()
+ else:
+ self.fail()
- def ready(self):
- if 'music' in self.arguments:
- return self.arguments['music'].is_loaded(allow_substates=True)
+ # Machine states / events
+ def on_enter_loading(self):
+ if self.action in self.ACTION_TYPES:
+ if 'music' in self.arguments:
+ self.arguments['music'].subscribe_loaded(self.callback_music_loaded)
+ else:
+ self.success()
else:
- return True
+ error_print("Unknown action {}".format(self.action))
+ self.fail()
- def run(self):
+ def on_enter_loaded_running(self):
debug_print(self.description())
getattr(self, self.action)(**self.arguments)
- def description(self):
- return getattr(self, self.action + "_print")(**self.arguments)
+ def poll_loaded(self):
+ self.key.callback_action_ready(self,
+ self.is_loaded(allow_substates=True))
+ # This one cannot be in the Machine state since it would be queued to run
+ # *after* the wait is ended...
def interrupt(self):
if getattr(self, self.action + "_interrupt", None):
return getattr(self, self.action + "_interrupt")(**self.arguments)
+ # Helpers
def music_list(self, music):
if music is not None:
return [music]
else:
return self.mapping.open_files.values()
+ def description(self):
+ if getattr(self, self.action + "_print", None):
+ return getattr(self, self.action + "_print")(**self.arguments)
+ else:
+ return "unknown action {}".format(self.action)
+
# Actions
def command(self, command="", **kwargs):
# FIXME: todo
music.stop(fade_out=fade_out)
if previous is not None:
+ self.waiting_music = previous
previous.stop(
fade_out=fade_out,
wait=wait,
self.mapping.add_wait_id(set_wait_id, self)
self.sleep_event = threading.Event()
+ self.sleep_event_timer = threading.Timer(duration, self.sleep_event.set)
if music is not None:
music.wait_end()
- threading.Timer(duration, self.sleep_event.set).start()
+ self.sleep_event_timer.start()
self.sleep_event.wait()
# Action messages
return message
- # Interruptions
+ # Interruptions (only for non-"atomic" actions)
def wait_interrupt(self, duration=0, music=None, **kwargs):
if self.sleep_event is not None:
self.sleep_event.set()
+ self.sleep_event_timer.cancel()
if music is not None:
music.wait_event.set()
+ def stop_interrupt(self, music=None, fade_out=0, wait=False,
+ set_wait_id=None, **kwargs):
+ if self.waiting_music is not None:
+ self.waiting_music.wait_event.set()