hass-4button/wallmote.py

79 lines
3.1 KiB
Python
Raw Normal View History

2022-05-16 07:00:43 +00:00
#!/usr/bin/python3
import hassapi as hass
2022-05-19 20:31:53 +00:00
import inspect
2022-05-16 07:00:43 +00:00
from threading import Timer
2022-05-26 04:01:59 +00:00
from actions import tree
2022-05-16 07:00:43 +00:00
##################
# Sequence Logic #
##################
2022-05-19 20:31:53 +00:00
# Short presses will be recorded into a sequence array. Long presses are reserved for
# actions directly in hass for now; use KeyReleased event to capture them. A sequence
# can be ended either immediately by recognizing a full sequence or by TIMEOUT seconds
# passing. (The former is NYI but the code is capable as is.)
2022-05-16 07:00:43 +00:00
# - self.currentSequence contains an array of keys representing a sequence that is
# actively being entered.
# - self.tot contains a timer for the process_sequence function, ending sequence entry.
2022-05-19 20:31:53 +00:00
TIMEOUT=1
2022-05-16 07:00:43 +00:00
class WallmotePlumbing(hass.Hass):
def initialize(self):
self.currentSequence = [];
self.listen_event(self.onevent, "zwave_js_value_notification");
def onevent(self, _, event, kwargs):
if (event.get('value') == 'KeyHeldDown'): return;
if (event.get('device_id') != '3f7add36f94839b1446886b5dab35e6d'): return;
key = int(event.get('property_key'));
# We don't get a KeyPressed event at all for long presses
evt = event.get('value');
2022-05-19 20:31:53 +00:00
if (evt == "KeyPressed"): self.shortpress(key);
2022-05-16 07:00:43 +00:00
def shortpress(self, key):
self.currentSequence.append(key);
self.reset_tot();
# Don't wait for tot to expire if we've already hit the end of a branch
if (self.check_action(self.currentSequence.copy())): self.process_sequence();
2022-05-16 07:00:43 +00:00
def reset_tot(self):
if (hasattr(self,"tot")): self.tot.cancel();
self.tot = Timer(TIMEOUT,self.process_sequence);
self.tot.start();
def process_sequence(self):
self.tot.cancel(); # So process_sequence can be called directly
sequence = self.currentSequence;
self.currentSequence = [];
self.log(sequence);
self.run_action(sequence);
def check_action(self, sequence, tree=tree):
if (len(sequence) > 0):
# Deeper into the rabbit hole
category = sequence.pop(0);
if (category in tree): return self.check_action(sequence,tree[category]);
else: return false;
else:
# We've the end of the sequence thus far, check if this is the end of a branch
# If any of {1..4} exist as keys in tree there's more options
return not any(i in tree for i in range(1,4));
2022-05-16 07:00:43 +00:00
def run_action(self, sequence, tree=tree):
if (len(sequence) > 0):
# Deeper into the rabbit hole
category = sequence.pop(0);
if (category in tree): self.run_action(sequence,tree[category]);
2022-05-19 20:31:53 +00:00
else: self.log("No match");
2022-05-16 07:00:43 +00:00
else:
2022-05-19 20:31:53 +00:00
# Perform an action from here and log the description
2022-05-16 07:00:43 +00:00
if ("function" in tree): tree["function"](self);
elif ("service" in tree): self.call_service(tree["service"]);
2022-05-19 20:31:53 +00:00
else:
self.log("No match");
return;
if ("description" in tree): self.log(tree["description"]);
else: self.log(inspect.getsource(tree["function"]).strip());