66 lines
2.4 KiB
Python
66 lines
2.4 KiB
Python
#!/usr/bin/python3
|
|
import hassapi as hass
|
|
import inspect
|
|
from threading import Timer
|
|
from actions import tree
|
|
|
|
##################
|
|
# Sequence Logic #
|
|
##################
|
|
|
|
# Short presses will be recorded into a sequence array. Long presses are reserved for
|
|
# actions directly in hass for now; use KeyReleased event to capture them. A sequence
|
|
# can be ended either immediately by recognizing a full sequence or by TIMEOUT seconds
|
|
# passing. (The former is NYI but the code is capable as is.)
|
|
|
|
# - self.currentSequence contains an array of keys representing a sequence that is
|
|
# actively being entered.
|
|
# - self.tot contains a timer for the process_sequence function, ending sequence entry.
|
|
|
|
TIMEOUT=1
|
|
|
|
class WallmotePlumbing(hass.Hass):
|
|
def initialize(self):
|
|
self.currentSequence = [];
|
|
self.listen_event(self.onevent, "zwave_js_value_notification");
|
|
|
|
def onevent(self, _, event, kwargs):
|
|
if (event.get('value') == 'KeyHeldDown'): return;
|
|
if (event.get('device_id') != '3f7add36f94839b1446886b5dab35e6d'): return;
|
|
key = int(event.get('property_key'));
|
|
# We don't get a KeyPressed event at all for long presses
|
|
evt = event.get('value');
|
|
if (evt == "KeyPressed"): self.shortpress(key);
|
|
|
|
def shortpress(self, key):
|
|
self.currentSequence.append(key);
|
|
self.reset_tot();
|
|
|
|
def reset_tot(self):
|
|
if (hasattr(self,"tot")): self.tot.cancel();
|
|
self.tot = Timer(TIMEOUT,self.process_sequence);
|
|
self.tot.start();
|
|
|
|
def process_sequence(self):
|
|
self.tot.cancel(); # So process_sequence can be called directly
|
|
sequence = self.currentSequence;
|
|
self.currentSequence = [];
|
|
self.log(sequence);
|
|
self.run_action(sequence);
|
|
|
|
def run_action(self, sequence, tree=tree):
|
|
if (len(sequence) > 0):
|
|
# Deeper into the rabbit hole
|
|
category = sequence.pop(0);
|
|
if (category in tree): self.run_action(sequence,tree[category]);
|
|
else: self.log("No match");
|
|
else:
|
|
# Perform an action from here and log the description
|
|
if ("function" in tree): tree["function"](self);
|
|
elif ("service" in tree): self.call_service(tree["service"]);
|
|
else:
|
|
self.log("No match");
|
|
return;
|
|
if ("description" in tree): self.log(tree["description"]);
|
|
else: self.log(inspect.getsource(tree["function"]).strip());
|