Files
OctoPrint-CalibrationTools/octoprint_CalibrationTools/hooks.py
T
sergiuToporjinschi c0204b9de6 - disable button when action has been triggered
- removed auto-kill treada after 3 cycles (need to look again)
2022-02-03 12:08:25 +02:00

104 lines
4.1 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import collections
import re
import threading
import traceback
import types
class Hooks():
trackTemp = True
events = []
gCodeWaiters = []
def gCodeReceived(self, comm, line, *args, **kwargs):
try:
if len(self.gCodeWaiters) <= 0:
return line
for waiter in self.gCodeWaiters:
reg = waiter["regex"]
waiter["callCount"] = waiter["callCount"] + 1
if reg.match(line):
args = (line, *waiter["args"])
if isinstance(waiter["func"], types.FunctionType):
args = (self, *args)
threading.Thread(target=waiter["func"], args=args).start()
waiter["dead"] = True
break
# if waiter["callCount"] >= 3:
# self._logger.warning("Hanging thread: Removing waiter regex: %(regex)s" % waiter)
# waiter["dead"] = True
self.gCodeWaiters = [w for w in self.gCodeWaiters if "dead" not in w or not w["dead"]]
except Exception as e:
self._logger.error(traceback.format_exc())
self.gCodeWaiters = []
return line
def firmwareInfo(self, comm_instance, firmware_name, firmware_data, *args, **kwargs):
self.data["info"] = {
"firmware": firmware_data
}
# Hook for temperature messages
# This is active only when there are events registered to temperature changes
def processTemp(self, comm_instance, parsed_temperatures, *args, **kwargs):
if len(self.events) <= 0:
return parsed_temperatures
try:
self.checkAndTriggerEvent(parsed_temperatures.copy())
except Exception as e:
self._logger.error(traceback.format_exc())
return parsed_temperatures
# Check if the current message contains changes concerning registered extruder
# if the criteria is meat then the execution function is called
def checkAndTriggerEvent(self, temps):
for tool, values in temps.items():
(curTemp, trgTemp) = values
for event in self.events:
if event["tool"] == tool and curTemp >= event["targetTemp"]:
arg = (temps, *event["args"])
if isinstance(event["func"], types.FunctionType):
arg = (self, *arg)
threading.Thread(target=event["func"], args=arg).start()
self.events.remove(event)
# Registering a temp event
# Trigger a function when the temparature
def registerEventTemp(self, tool, targetTemp, func, *arguments):
if func is None or not isinstance(func, collections.Callable):
self._logger.warn("registerEventTemp: Attempt to register event without a function")
return
event = {
"tool": tool,
"targetTemp": targetTemp,
"func": func,
"args": arguments
}
self._logger.debug("Registering event [%s, isFunction: %s]", event, isinstance(func, types.FunctionType))
self.events.append(event)
# Registering a gCodeWaiter
def registerGCodeWaiter(self, command, func, *arguments):
reg = re.compile(".*\s*(?P<gCode>[M,G]\d{1,4})")
if command is None or not reg.match(command.upper()):
self._logger.warn("registerGCodeAnswer: Attempt to register gCodeAnswer without a function or valid gCode command")
return
self.registerRegexMsg(".*\s*(?P<gCode>[M,G]\d{1,4})", func, *arguments)
def registerRegexMsg(self, regex, func, *arguments):
if regex is None or func is None or not isinstance(func, collections.Callable):
self._logger.warn("registerRegexMsg: Attempt to register gCodeAnswer without a function or regex")
return
self.gCodeWaiters.append({
"regex": re.compile(regex),
"func": func,
"args": arguments,
"callCount": 0
})