diff --git a/BlocksScreen/lib/panels/filamentTab.py b/BlocksScreen/lib/panels/filamentTab.py index 3c1aef49..f1054068 100644 --- a/BlocksScreen/lib/panels/filamentTab.py +++ b/BlocksScreen/lib/panels/filamentTab.py @@ -118,7 +118,7 @@ def __init__( def handle_moonraker_components(self): components = self.ws._moonRest.get_server_info() - if "spoolman" not in components["result"].get("components", []): + if "spoolman" not in components.get("result", {}).get("components", []): self.fp_button_2.hide() self.spoolman_btn.hide() else: diff --git a/BlocksScreen/lib/panels/mainWindow.py b/BlocksScreen/lib/panels/mainWindow.py index 7a64caa5..0da62b0f 100644 --- a/BlocksScreen/lib/panels/mainWindow.py +++ b/BlocksScreen/lib/panels/mainWindow.py @@ -976,7 +976,7 @@ def _on_probe_notification( @api_handler def _handle_notify_gcode_response_message(self, method, data, metadata) -> None: """Handle websocket gcode responses messages""" - _gcode_response = data.get("params") + _gcode_response = data.get("params", []) self.gcode_response[list].emit(_gcode_response) if _gcode_response: if self._popup_toggle: diff --git a/BlocksScreen/lib/panels/networkWindow.py b/BlocksScreen/lib/panels/networkWindow.py index 07c98bed..ea16b4d8 100644 --- a/BlocksScreen/lib/panels/networkWindow.py +++ b/BlocksScreen/lib/panels/networkWindow.py @@ -35,7 +35,6 @@ from lib.utils.icon_button import IconButton from lib.utils.list_model import EntryDelegate, EntryListModel, ListItem from PyQt6 import QtCore, QtGui, QtWidgets -from PyQt6.QtCore import QTimer, pyqtSlot logger = logging.getLogger(__name__) @@ -255,7 +254,7 @@ def _prefill_ip_from_os(self) -> None: except OSError: continue - @pyqtSlot() + @QtCore.pyqtSlot() def _on_reconnect_complete(self) -> None: """Navigate back to the main panel after a static-IP or DHCP-reset operation.""" logger.debug("reconnect_complete received — navigating to main_network_page") @@ -263,7 +262,7 @@ def _on_reconnect_complete(self) -> None: def _init_timers(self) -> None: """Initialize timers.""" - self._load_timer = QTimer(self) + self._load_timer = QtCore.QTimer(self) self._load_timer.setSingleShot(True) self._load_timer.timeout.connect(self._handle_load_timeout) @@ -277,7 +276,7 @@ def _init_model_view(self) -> None: self._entry_delegate.item_selected.connect(self._on_ssid_item_clicked) self._configure_list_view_palette() - @pyqtSlot(NetworkState) + @QtCore.pyqtSlot(NetworkState) def _on_network_state_changed(self, state: NetworkState) -> None: """React to a NetworkState update: sync toggles, populate header and connection info.""" logger.debug( @@ -438,7 +437,7 @@ def _on_network_state_changed(self, state: NetworkState) -> None: self._emit_status_icon(state) self._sync_active_network_list_icon(state) - @pyqtSlot(list) + @QtCore.pyqtSlot(list) def _on_scan_complete(self, networks: list[NetworkInfo]) -> None: """Receive scan results, filter/sort them, and rebuild the SSID list view. @@ -457,9 +456,11 @@ def _on_scan_complete(self, networks: list[NetworkInfo]) -> None: # Stamp the connected AP as ACTIVE so the list is correct on first # render even when the scan ran before the connection fully settled. filtered = [ - replace(net, network_status=NetworkStatus.ACTIVE) - if net.ssid == current_ssid - else net + ( + replace(net, network_status=NetworkStatus.ACTIVE) + if net.ssid == current_ssid + else net + ) for net in filtered ] active = next((n for n in filtered if n.ssid == current_ssid), None) @@ -478,12 +479,12 @@ def _on_scan_complete(self, networks: list[NetworkInfo]) -> None: state = self._nm.current_state self._emit_status_icon(state) - @pyqtSlot(list) + @QtCore.pyqtSlot(list) def _on_saved_networks_loaded(self, networks: list[SavedNetwork]) -> None: """Receive saved-network data and update the priority spinbox for the active SSID.""" logger.debug("Loaded %d saved networks", len(networks)) - @pyqtSlot(ConnectionResult) + @QtCore.pyqtSlot(ConnectionResult) def _on_operation_complete(self, result: ConnectionResult) -> None: """Handle network operation completion.""" logger.debug("Operation: success=%s, msg=%s", result.success, result.message) @@ -570,7 +571,7 @@ def _on_operation_complete(self, result: ConnectionResult) -> None: result.message, ) ssid = self._target_ssid - QTimer.singleShot( + QtCore.QTimer.singleShot( 2000, lambda _ssid=ssid: self._nm.connect_network(_ssid) ) return # Keep loading visible; state machine handles completion @@ -578,7 +579,7 @@ def _on_operation_complete(self, result: ConnectionResult) -> None: self._clear_loading() self._show_error_popup(result.message) - @pyqtSlot(str, str) + @QtCore.pyqtSlot(str, str) def _on_network_error(self, operation: str, message: str) -> None: """Log network errors and surface critical failures in the info box.""" logger.error("Network error [%s]: %s", operation, message) @@ -658,13 +659,15 @@ def _sync_active_network_list_icon(self, state: NetworkState) -> None: # Update the cached entry with the authoritative signal and status updated = [ - replace( - net, - signal_strength=self._active_signal, - network_status=NetworkStatus.ACTIVE, + ( + replace( + net, + signal_strength=self._active_signal, + network_status=NetworkStatus.ACTIVE, + ) + if net.ssid == state.current_ssid + else net ) - if net.ssid == state.current_ssid - else net for net in self._cached_scan_networks ] @@ -1063,7 +1066,9 @@ def _handle_wifi_toggle(self, is_on: bool) -> None: # Non-blocking: disable hotspot then connect self._nm.toggle_hotspot(False) _ssid_to_connect = self._target_ssid - QTimer.singleShot(500, lambda: self._nm.connect_network(_ssid_to_connect)) + QtCore.QTimer.singleShot( + 500, lambda: self._nm.connect_network(_ssid_to_connect) + ) def _handle_hotspot_toggle(self, is_on: bool) -> None: """Enable or disable the hotspot, enforcing the ethernet/Wi-Fi mutual-exclusion rule.""" diff --git a/BlocksScreen/lib/panels/utilitiesTab.py b/BlocksScreen/lib/panels/utilitiesTab.py index f16ea6ec..8eaef3e6 100644 --- a/BlocksScreen/lib/panels/utilitiesTab.py +++ b/BlocksScreen/lib/panels/utilitiesTab.py @@ -1,693 +1,697 @@ -import re -import typing -from dataclasses import dataclass -from enum import Enum, auto -from functools import partial - -from lib.moonrakerComm import MoonWebSocket -from lib.panels.widgets.basePopup import BasePopup -from lib.panels.widgets.inputshaperPage import InputShaperPage -from lib.panels.widgets.optionCardWidget import OptionCard -from lib.panels.widgets.troubleshootPage import TroubleshootPage -from lib.printer import Printer -from lib.ui.utilitiesStackedWidget_ui import Ui_utilitiesStackedWidget -from lib.utils.blocks_button import BlocksCustomButton -from lib.utils.toggleAnimatedButton import ToggleAnimatedButton -from PyQt6 import QtCore, QtGui, QtWidgets - - -@dataclass -class LedState: - """Represents the state of an LED light.""" - - led_type: str - red: int = 0 - green: int = 0 - blue: int = 0 - white: int = 255 - state: str = "on" - - def get_gcode(self, name: str) -> str: - """Generates the G-code command for the current state.""" - if self.state == "off": - return f"SET_LED LED={name} RED=0 GREEN=0 BLUE=0 WHITE=0" - if self.led_type == "white": - return f"SET_LED LED={name} WHITE={self.white / 255:.2f}" - # Default to RGB - return ( - f"SET_LED LED={name} RED={self.red / 255:.2f} " - f"GREEN={self.green / 255:.2f} BLUE={self.blue / 255:.2f} " - f"WHITE={self.white / 255:.2f}" - ) - - -class Process(Enum): - FAN = auto() - AXIS = auto() - BED_HEATER = auto() - EXTRUDER = auto() - AXIS_MAINTENANCE = auto() - - -class UtilitiesTab(QtWidgets.QStackedWidget): - _LED_TYPES: frozenset[str] = frozenset({"led", "neopixel", "dotstar"}) - - request_back: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( - name="request-back" - ) - request_change_page: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( - int, int, name="request-change-page" - ) - request_available_objects_signal: typing.ClassVar[QtCore.pyqtSignal] = ( - QtCore.pyqtSignal(name="get-available-objects") - ) - run_gcode_signal: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( - str, name="run-gcode" - ) - request_numpad_signal: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( - int, - str, - str, - "PyQt_PyObject", - QtWidgets.QStackedWidget, - name="request-numpad", - ) - subscribe_config: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( - [list, "PyQt_PyObject"], - [str, "PyQt_PyObject"], - name="on-subscribe-config", - ) - on_update_message: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( - dict, name="handle-update-message" - ) - - update_available: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( - bool, name="update-available" - ) - - show_update_page: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( - bool, name="show-update-page" - ) - call_load_panel = QtCore.pyqtSignal(bool, str, name="call-load-panel") - - def __init__( - self, parent: QtWidgets.QWidget, ws: MoonWebSocket, printer: Printer - ) -> None: - super().__init__(parent) - - self.panel = Ui_utilitiesStackedWidget() - self.panel.setupUi(self) - - self.ws = ws - self.printer: Printer = printer - self.troubleshoot_page: TroubleshootPage = TroubleshootPage(self) - - # --- State Variables --- - self.objects: dict = { - "fans": {}, - "axis": {"x": "indf", "y": "indf", "z": "indf"}, - "bheat": {"Bed_Heater": "indf"}, - "extrude": {"extruder": "indf"}, - "leds": {}, - } - self.x_inputshaper: dict = {} - self.stepper_limits: dict = {} - - self.current_object: typing.Optional[str] = None - self.current_process: typing.Optional[Process] = None - self.axis_in: str = "x" - self.amount: int = 1 - self.tb: bool = False - self.cg = None - self.aut: bool = False - - self._is_timeout_timer: QtCore.QTimer = QtCore.QTimer(self) - self._is_timeout_timer.setSingleShot(True) - self._is_timeout_timer.setInterval(300_000) # 5 min: longest plausible IS run - self._is_timeout_timer.timeout.connect(self._on_is_timeout) - - # --- PixMap --- - self._led_pixmap = QtGui.QPixmap(":/ui/media/btn_icons/LEDs.svg") - # --- UI Setup --- - self.setLayoutDirection(QtCore.Qt.LayoutDirection.LeftToRight) - self.panel.update_btn.clicked.connect( - lambda: self.show_update_page[bool].emit(False) - ) - - self.is_page = InputShaperPage(self) - self.is_page.call_load_panel.connect(self.call_load_panel) - self.addWidget(self.is_page) - - self.dialog_page = BasePopup(self, dialog=True, floating=True) - self.addWidget(self.dialog_page) - - # --- Back Buttons --- - for button in ( - self.panel.leds_back_btn, - self.panel.info_back_btn, - self.panel.leds_slider_back_btn, - self.panel.input_shaper_back_btn, - self.panel.routine_check_back_btn, - self.is_page.update_back_btn, - ): - button.clicked.connect(self.back_button) - - # --- Page Navigation --- - self._connect_page_change(self.panel.utilities_axes_btn, self.panel.axes_page) - self._connect_page_change( - self.panel.utilities_input_shaper_btn, self.panel.input_shaper_page - ) - self._connect_page_change(self.panel.utilities_info_btn, self.panel.info_page) - self._connect_page_change( - self.panel.utilities_routine_check_btn, self.panel.routines_page - ) - self._connect_page_change(self.panel.am_cancel, self.panel.utilities_page) - - self._connect_page_change(self.panel.axes_back_btn, self.panel.utilities_page) - self._connect_page_change( - self.troubleshoot_page.tb_back_btn, self.panel.utilities_page - ) - - # --- Routines --- - self.panel.rc_fans.clicked.connect(partial(self.run_routine, Process.FAN)) - self.panel.rc_bheat.clicked.connect( - partial(self.run_routine, Process.BED_HEATER) - ) - self.panel.rc_ext.clicked.connect(partial(self.run_routine, Process.EXTRUDER)) - self.panel.rc_axis.clicked.connect(partial(self.run_routine, Process.AXIS)) - self.panel.rc_no.clicked.connect(self.on_routine_answer) - self.panel.rc_yes.clicked.connect(self.on_routine_answer) - - # --- Axis Maintenance --- - self.panel.axis_x_btn.clicked.connect(partial(self.axis_maintenance, "x")) - self.panel.axis_y_btn.clicked.connect(partial(self.axis_maintenance, "y")) - self.panel.axis_z_btn.clicked.connect(partial(self.axis_maintenance, "z")) - - self.panel.toggle_led_button.state = ToggleAnimatedButton.State.ON - - # --- LEDs --- - # self.panel.leds_r_slider.sliderReleased.connect(self.update_led_values) - # self.panel.leds_g_slider.sliderReleased.connect(self.update_led_values) - # self.panel.leds_b_slider.sliderReleased.connect(self.update_led_values) - self.panel.leds_w_slider.sliderReleased.connect(self.update_led_values) - self.panel.toggle_led_button.clicked.connect(self.toggle_led_state) - - # --- Websocket/Printer Signals --- - self.run_gcode_signal.connect(self.ws.api.run_gcode) - self.is_page.run_gcode_signal.connect(self.ws.api.run_gcode) - self.subscribe_config[str, "PyQt_PyObject"].connect( - self.printer.on_subscribe_config - ) - self.subscribe_config[list, "PyQt_PyObject"].connect( - self.printer.on_subscribe_config - ) - self.printer.gcode_response.connect(self.handle_gcode_response) - - # --- Initialize Printer Communication --- - self.printer.printer_config.connect(self.on_printer_config_received) - self.printer.gcode_move_update.connect(self.on_gcode_move_update) - - self.panel.update_btn.setPixmap( - QtGui.QPixmap(":/system/media/btn_icons/update-software-icon.svg") - ) - - # ---- Input Shaper ---- - self.automatic_is = OptionCard( - self, - "Automatic\nInput Shaper", - "Automatic Input Shaper", - QtGui.QPixmap(":/input_shaper/media/btn_icons/input_shaper_auto.svg"), - ) # type: ignore - self.automatic_is.setObjectName("Automatic_IS_Card") - self.panel.is_content_layout.addWidget( - self.automatic_is, alignment=QtCore.Qt.AlignmentFlag.AlignHCenter - ) - self.automatic_is.continue_clicked.connect( - lambda: self.handle_is("SHAPER_CALIBRATE") - ) - - self.manual_is = OptionCard( - self, - "Manual\nInput Shaper", - "Manual Input Shaper", - QtGui.QPixmap(":/input_shaper/media/btn_icons/input_shaper_manual.svg"), - ) # type: ignore - self.manual_is.setObjectName("Manual_IS_Card") - self.panel.is_content_layout.addWidget( - self.manual_is, alignment=QtCore.Qt.AlignmentFlag.AlignHCenter - ) - self.manual_is.continue_clicked.connect(lambda: self.handle_is("")) - - self.is_types: dict = {} - self.is_aut_types: dict = {} - self.dialog_page.accepted.connect( - lambda: self.handle_is("SHAPER_CALIBRATE AXIS=Y") - ) - self.dialog_page.rejected.connect( - lambda: self.handle_is("SHAPER_CALIBRATE AXIS=X") - ) - - self.is_page.action_btn.clicked.connect( - lambda: self.change_page(self.indexOf(self.panel.input_shaper_page)) - ) - - def handle_gcode_response(self, data: list[str]) -> None: - """ - Parses a Klipper Input Shaper console message and updates self.is_types. - """ - - if not isinstance(data, list) or len(data) != 1 or not isinstance(data[0], str): - print( - f"WARNING: Invalid input format. Expected a list with one string. Received: {data}" - ) - return - - message = data[0] - - pattern_fitted = re.compile( - r"Fitted shaper '(?P\w+)' frequency = (?P[\d\.]+) Hz \(vibrations = (?P[\d\.]+)%" - ) - match_fitted = pattern_fitted.search(message) - - if match_fitted: - name = match_fitted.group("name") - freq = float(match_fitted.group("freq")) - vib = float(match_fitted.group("vib")) - current_data = self.is_types.get(name, {}) - current_data.update( - { - "frequency": freq, - "vibration": vib, - "max_accel": current_data.get("max_accel", 0.0), - } - ) - self.is_types[name] = current_data - - return - pattern_accel = re.compile( - r"To avoid too much smoothing with '(?P\w+)', suggested max_accel <= (?P[\d\.]+) mm/sec\^2" - ) - match_accel = pattern_accel.search(message) - - if match_accel: - name = match_accel.group("name") - accel = float(match_accel.group("accel")) - - if name in self.is_types and isinstance(self.is_types[name], dict): - self.is_types[name]["max_accel"] = accel - else: - self.is_types[name] = self.is_types.get(name, {}) - self.is_types[name]["max_accel"] = accel - return - - pattern_recommended = re.compile( - r"Recommended shaper_type_(?P[xy]) = (?P\w+), shaper_freq_(?P=axis) = (?P[\d\.]+) Hz" - ) - match_recommended = pattern_recommended.search(message) - if match_recommended: - axis = match_recommended.group("axis") - recommended_type = match_recommended.group("type") - self.is_types["Axis"] = axis - if self.aut: - self.is_aut_types[axis] = recommended_type - if len(self.is_aut_types) == 2: - self.run_gcode_signal.emit("SAVE_CONFIG") - self._is_timeout_timer.stop() - self.call_load_panel.emit(False, "") - self.aut = False - return - return - - reordered = {recommended_type: self.is_types[recommended_type]} - for key, value in self.is_types.items(): - if key not in ("suggested_type", recommended_type, "Axis"): - reordered[key] = value - - self.is_page.set_type_dictionary(self.is_types) - first_key = next(iter(reordered.keys()), None) - for key in reordered.keys(): - if key == first_key: - self.is_page.add_type_entry(key, "Recommended type") - else: - self.is_page.add_type_entry(key) - - self.is_page.build_model_list() - self._is_timeout_timer.stop() - self.call_load_panel.emit(False, "") - return - - def handle_is(self, gcode: str) -> None: - if gcode == "SHAPER_CALIBRATE": - self.run_gcode_signal.emit("G28\nM400") - self.aut = True - self.run_gcode_signal.emit(gcode) - elif gcode == "": - self.dialog_page.confirm_background_color("#dfdfdf") - self.dialog_page.cancel_background_color("#dfdfdf") - self.dialog_page.cancel_font_color("#000000") - self.dialog_page.confirm_font_color("#000000") - self.dialog_page.cancel_button_text("X axis") - self.dialog_page.confirm_button_text("Y axis") - self.dialog_page.set_message( - "Select the axis you want to execute the input shaper on:" - ) - self.dialog_page.show() - return - else: - self.run_gcode_signal.emit("G28\nM400") - self.run_gcode_signal.emit(gcode) - self.change_page(self.indexOf(self.is_page)) - - self._is_timeout_timer.start() - self.call_load_panel.emit(True, "Running Input Shaper...") - - def _on_is_timeout(self) -> None: - self.call_load_panel.emit(False, "") - - @QtCore.pyqtSlot(list, name="on_object_list") - def on_object_list(self, object_list: list) -> None: - """Handle receiving printer object list""" - self.cg = object_list - for obj in self.cg: - base_name = obj.split()[0] - - # Only accept 'fan_generic' or 'fan' - if base_name == "fan_generic" or base_name == "fan": - self.objects["fans"][obj.removeprefix(base_name + " ")] = "indef" - self._update_leds_from_config() - - @QtCore.pyqtSlot(dict, name="on_object_config") - @QtCore.pyqtSlot(list, name="on_object_config") - def on_object_config(self, config: typing.Union[dict, list]) -> None: - """Handle receiving printer object configurations""" - if not config: - return - config_items = [config] if isinstance(config, dict) else config - for item in config_items: - for key, value in item.items(): - if ( - key.startswith("stepper_") - and isinstance(value, dict) - and key not in self.stepper_limits - ): - pos_min = value.get("position_min") - pos_max = value.get("position_max") - if pos_min is not None or pos_max is not None: - self.stepper_limits[key] = { - "min": float(pos_min) - if pos_min is not None - else -float("inf"), - "max": float(pos_max) - if pos_max is not None - else float("inf"), - } - - def on_printer_config_received(self, config: dict) -> None: - """Handle printer configuration""" - for axis in ("x", "y", "z"): - self.subscribe_config[str, "PyQt_PyObject"].emit( - f"stepper_{axis}", self.on_object_config - ) - - @QtCore.pyqtSlot(str, list, name="on_gcode_move_update") - def on_gcode_move_update(self, name: str, value: list) -> None: - """Handle gcode move""" - if not value: - return - if name == "gcode_position": - ... - - def run_routine(self, process: Process): - """Run check routine for available processes""" - self.current_process = process - routine_configs = { - Process.FAN: ("fans", "fan is spinning"), - Process.AXIS: ("axis", "axis is moving"), - Process.BED_HEATER: ("bheat", "bed is heating"), - Process.EXTRUDER: ("extrude", "extruder is being tested"), - } - if process not in routine_configs: - return - obj_key, message = routine_configs[process] - obj_list = list(self.objects.get(obj_key, {}).keys()) - if not self._advance_routine_object(obj_list): - if process == Process.FAN: - self.run_gcode_signal.emit("M107") - for i in self.objects["fans"]: - self.run_gcode_signal.emit( - f"SET_FAN_SPEED FAN={i.removeprefix('fan_generic ')} SPEED=0\nM400" - ) - - if self.tb: - self.troubleshoot_request() - self.tb = False - else: - self.change_page(self.indexOf(self.panel.utilities_page)) - - return - - message = f"Please check if the {self.current_object} is functioning correctly." - if process == Process.AXIS: - message = f"Please ensure the {self.current_object} axis moves correctly." - elif process in [Process.BED_HEATER, Process.EXTRUDER]: - message = "Please check if the temperature reaches 60°C. \n you may need to wait a few moments." - - self.set_routine_check_page( - f"Running routine for: {self.current_object}", message - ) - self.show_waiting_page( - self.indexOf(self.panel.rc_page), - f"Please check if the {message}", - 10000 if process == Process.AXIS else 0, - ) - self._send_routine_gcode() - - def _advance_routine_object(self, obj_list: list) -> bool: - if not obj_list: - is_first_run = self.current_object is None - self.current_object = obj_list[0] if is_first_run and obj_list else "done" - return is_first_run - if self.current_object not in obj_list: - if self.current_process == Process.AXIS: - self.run_gcode_signal.emit("G28") - self.current_object = obj_list[0] - return True - try: - current_index = obj_list.index(self.current_object) - if current_index + 1 < len(obj_list): - self.current_object = obj_list[current_index + 1] - return True - else: - self.current_object = None - return False - except ValueError: - self.current_object = obj_list[0] - return True - - def on_routine_answer(self) -> None: - """Handle routine ongoing process""" - if self.current_process is None or self.current_object is None: - return - if self.sender() == self.panel.rc_yes: - answer = "yes" - else: - answer = "no" - self.tb = True - process_map = { - Process.FAN: ("fans", self.current_object), - Process.AXIS: ("axis", self.current_object), - Process.BED_HEATER: ("bheat", "Bed_Heater"), - Process.EXTRUDER: ("extrude", "extruder"), - } - if self.current_process in process_map: - obj_key, item_key = process_map[self.current_process] - self.objects[obj_key][item_key] = answer - if self.current_process in [Process.BED_HEATER, Process.EXTRUDER]: - self.run_gcode_signal.emit("TURN_OFF_HEATERS") - self.run_routine(self.current_process) - elif self.current_process == Process.AXIS_MAINTENANCE: - if answer == "yes": - self._run_axis_maintenance_gcode(self.current_object) - else: - self.change_page(self.indexOf(self.panel.axes_page)) - - def _send_routine_gcode(self): - """Send the correct G-code for the current process and object.""" - if self.current_process == Process.FAN: - fan_name = self.current_object or next(iter(self.objects["fans"]), None) - if fan_name: - if fan_name == "fan": - self.run_gcode_signal.emit("M106 S255\nM400") - else: - self.run_gcode_signal.emit( - f"SET_FAN_SPEED FAN={fan_name.removeprefix('fan_generic ')} SPEED=0.8\nM400" - ) - - return - - gcode_map = { - Process.BED_HEATER: "SET_HEATER_TEMPERATURE HEATER=heater_bed TARGET=60", - Process.EXTRUDER: "SET_HEATER_TEMPERATURE HEATER=extruder TARGET=60", - (Process.AXIS, "x"): "G91\nG1 X250 F1000\nG1 X-250 F1000", - (Process.AXIS, "y"): "G91\nG1 Y250 F1000\nG1 Y-250 F1000", - (Process.AXIS, "z"): "G91\nG1 Z250 F1000\nG1 Z-250 F1000", - } - - key = ( - (self.current_process, self.current_object) - if self.current_process == Process.AXIS - else self.current_process - ) - - if gcode := gcode_map.get(key): - self.run_gcode_signal.emit(f"{gcode}\nM400") - - def set_routine_check_page(self, title: str, label: str): - """Set text on routine page""" - self.panel.rc_tittle.setText(title) - self.panel.rc_label.setText(label) - - def update_led_values(self) -> None: - """Update led state and color values""" - if self.current_object not in self.objects["leds"]: - return - led_state: LedState = self.objects["leds"][self.current_object] - led_state.white = int(self.panel.leds_w_slider.value() * 255 / 100) - self.save_led_state() - - def _update_leds_from_config(self): - layout = self.panel.leds_content_layout - - while layout.count(): - if (child := layout.takeAt(0)) and child.widget(): - child.widget().deleteLater() # type: ignore - - led_names = [] - if not self.cg: - return - - # Collect LED names - match Klipper LED hardware types only - for obj in self.cg: - parts = obj.split() - if len(parts) >= 2 and parts[0] in self._LED_TYPES: - name = parts[1] - led_names.append(name) - self.objects["leds"][name] = LedState(led_type="white") - - max_columns = 3 - buttons = [] # store references to created buttons - - # Create LED buttons - for i, name in enumerate(led_names): - if self.panel.leds_widget: - button = BlocksCustomButton() - button.setFixedSize(200, 70) - button.setText(name) - button.setProperty("class", "menu_btn") - button.setPixmap(self._led_pixmap) - row, col = divmod(i, max_columns) - layout.addWidget(button, row, col) - button.clicked.connect(partial(self.handle_led_button, name)) - buttons.append(button) - - try: - self.panel.utilities_leds_btn.clicked.disconnect() - except (RuntimeError, TypeError): - pass - if len(buttons) == 1: - self.panel.utilities_leds_btn.clicked.connect( - partial(self.handle_led_button, led_names[0]) - ) - elif len(buttons) > 1: - self._connect_page_change( - self.panel.utilities_leds_btn, self.panel.leds_page - ) - - def toggle_led_state(self) -> None: - """Toggle leds""" - if self.current_object not in self.objects["leds"]: - return - led_state: LedState = self.objects["leds"][self.current_object] - if led_state.state == "off": - led_state.state = "on" - self.panel.toggle_led_button.state = ToggleAnimatedButton.State.ON - else: - led_state.state = "off" - self.panel.toggle_led_button.state = ToggleAnimatedButton.State.OFF - self.save_led_state() - - def handle_led_button(self, name: str) -> None: - """Handle led button clicked""" - self.current_object = name - led_state: LedState = self.objects["leds"].get(name) - if not led_state: - return - is_rgb = led_state.led_type == "rgb" - self.panel.leds_w_slider.setVisible(not is_rgb) - self.panel.leds_w_slider.setValue(led_state.white) - self.change_page(self.indexOf(self.panel.leds_slider_page)) - - def save_led_state(self): - """Save led state""" - if self.current_object: - if self.current_object in self.objects["leds"]: - led_state: LedState = self.objects["leds"][self.current_object] - self.run_gcode_signal.emit(led_state.get_gcode(self.current_object)) - - def axis_maintenance(self, axis: str) -> None: - """Routine, checks axis movement for printer debugging""" - self.current_process = Process.AXIS_MAINTENANCE - self.current_object = axis - self.run_gcode_signal.emit(f"G28 {axis.upper()}\nM400") - if axis == "x": - self.run_gcode_signal.emit("G1 X10 Y250 F18000") - self.set_routine_check_page( - "Axis Maintenance", - f"Insert oil on the {axis.upper()} axis before confirming.", - ) - self.show_waiting_page( - self.indexOf(self.panel.rc_page), - f"Homing {axis.upper()} axis...", - 5000, - ) - - def _run_axis_maintenance_gcode(self, axis: str): - stepper_key = f"stepper_{axis}" - if stepper_key in self.stepper_limits: - max_pos = self.stepper_limits[stepper_key].get("max", 20) - distance = int(max_pos) - 20 - self.run_gcode_signal.emit( - f"G1 {axis.upper()}{distance} F3000\nM400\nG28\nM400" - ) - self.show_waiting_page( - self.indexOf(self.panel.axes_page), - f"Running maintenance cycle on {axis.upper()} axis...", - 5000, - ) - else: - self.change_page(self.indexOf(self.panel.axes_page)) - - def troubleshoot_request(self) -> None: - """Show troubleshoot page""" - self.troubleshoot_page.show() - - def show_waiting_page(self, page_to_go_to: int, label: str, time_ms: int): - """Show placeholder page""" - self.call_load_panel.emit(True, label) - QtCore.QTimer.singleShot(time_ms, lambda: self.change_page(page_to_go_to)) - - def _connect_page_change(self, button: QtWidgets.QWidget, page: QtWidgets.QWidget): - if isinstance(button, QtWidgets.QAbstractButton): - button.clicked.connect(lambda: self.change_page(self.indexOf(page))) - - def change_page(self, index: int): - """Request change page by index""" - self.call_load_panel.emit(False, "") - self.troubleshoot_page.hide() - if index < self.count(): - self.request_change_page.emit(3, index) - - @QtCore.pyqtSlot(name="request-back") - def back_button(self) -> None: - """Request back""" - self.request_back.emit() +import logging +import re +import typing +from dataclasses import dataclass +from enum import Enum, auto +from functools import partial + +from lib.moonrakerComm import MoonWebSocket +from lib.panels.widgets.basePopup import BasePopup +from lib.panels.widgets.inputshaperPage import InputShaperPage +from lib.panels.widgets.optionCardWidget import OptionCard +from lib.panels.widgets.troubleshootPage import TroubleshootPage +from lib.printer import Printer +from lib.ui.utilitiesStackedWidget_ui import Ui_utilitiesStackedWidget +from lib.utils.blocks_button import BlocksCustomButton +from lib.utils.toggleAnimatedButton import ToggleAnimatedButton +from PyQt6 import QtCore, QtGui, QtWidgets + +logger = logging.getLogger(__name__) + + +@dataclass +class LedState: + """Represents the state of an LED light.""" + + led_type: str + red: int = 0 + green: int = 0 + blue: int = 0 + white: int = 255 + state: str = "on" + + def get_gcode(self, name: str) -> str: + """Generates the G-code command for the current state.""" + if self.state == "off": + return f"SET_LED LED={name} RED=0 GREEN=0 BLUE=0 WHITE=0" + if self.led_type == "white": + return f"SET_LED LED={name} WHITE={self.white / 255:.2f}" + # Default to RGB + return ( + f"SET_LED LED={name} RED={self.red / 255:.2f} " + f"GREEN={self.green / 255:.2f} BLUE={self.blue / 255:.2f} " + f"WHITE={self.white / 255:.2f}" + ) + + +class Process(Enum): + FAN = auto() + AXIS = auto() + BED_HEATER = auto() + EXTRUDER = auto() + AXIS_MAINTENANCE = auto() + + +class UtilitiesTab(QtWidgets.QStackedWidget): + _LED_TYPES: frozenset[str] = frozenset({"led", "neopixel", "dotstar"}) + + request_back: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( + name="request-back" + ) + request_change_page: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( + int, int, name="request-change-page" + ) + request_available_objects_signal: typing.ClassVar[QtCore.pyqtSignal] = ( + QtCore.pyqtSignal(name="get-available-objects") + ) + run_gcode_signal: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( + str, name="run-gcode" + ) + request_numpad_signal: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( + int, + str, + str, + "PyQt_PyObject", + QtWidgets.QStackedWidget, + name="request-numpad", + ) + subscribe_config: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( + [list, "PyQt_PyObject"], + [str, "PyQt_PyObject"], + name="on-subscribe-config", + ) + on_update_message: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( + dict, name="handle-update-message" + ) + + update_available: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( + bool, name="update-available" + ) + + show_update_page: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( + bool, name="show-update-page" + ) + call_load_panel = QtCore.pyqtSignal(bool, str, name="call-load-panel") + + def __init__( + self, parent: QtWidgets.QWidget, ws: MoonWebSocket, printer: Printer + ) -> None: + super().__init__(parent) + + self.panel = Ui_utilitiesStackedWidget() + self.panel.setupUi(self) + + self.ws = ws + self.printer: Printer = printer + self.troubleshoot_page: TroubleshootPage = TroubleshootPage(self) + + # --- State Variables --- + self.objects: dict = { + "fans": {}, + "axis": {"x": "indf", "y": "indf", "z": "indf"}, + "bheat": {"Bed_Heater": "indf"}, + "extrude": {"extruder": "indf"}, + "leds": {}, + } + self.x_inputshaper: dict = {} + self.stepper_limits: dict = {} + + self.current_object: str | None = None + self.current_process: Process | None = None + self.axis_in: str = "x" + self.amount: int = 1 + self.tb: bool = False + self.cg = None + self.aut: bool = False + + self._is_timeout_timer: QtCore.QTimer = QtCore.QTimer(self) + self._is_timeout_timer.setSingleShot(True) + self._is_timeout_timer.setInterval(300_000) # 5 min: longest plausible IS run + self._is_timeout_timer.timeout.connect(self._on_is_timeout) + + # --- PixMap --- + self._led_pixmap = QtGui.QPixmap(":/ui/media/btn_icons/LEDs.svg") + # --- UI Setup --- + self.setLayoutDirection(QtCore.Qt.LayoutDirection.LeftToRight) + self.panel.update_btn.clicked.connect( + lambda: self.show_update_page[bool].emit(False) + ) + + self.is_page = InputShaperPage(self) + self.is_page.call_load_panel.connect(self.call_load_panel) + self.addWidget(self.is_page) + + self.dialog_page = BasePopup(self, dialog=True, floating=True) + self.addWidget(self.dialog_page) + + # --- Back Buttons --- + for button in ( + self.panel.leds_back_btn, + self.panel.info_back_btn, + self.panel.leds_slider_back_btn, + self.panel.input_shaper_back_btn, + self.panel.routine_check_back_btn, + self.is_page.update_back_btn, + ): + button.clicked.connect(self.back_button) + + # --- Page Navigation --- + self._connect_page_change(self.panel.utilities_axes_btn, self.panel.axes_page) + self._connect_page_change( + self.panel.utilities_input_shaper_btn, self.panel.input_shaper_page + ) + self._connect_page_change(self.panel.utilities_info_btn, self.panel.info_page) + self._connect_page_change( + self.panel.utilities_routine_check_btn, self.panel.routines_page + ) + self._connect_page_change(self.panel.am_cancel, self.panel.utilities_page) + + self._connect_page_change(self.panel.axes_back_btn, self.panel.utilities_page) + self._connect_page_change( + self.troubleshoot_page.tb_back_btn, self.panel.utilities_page + ) + + # --- Routines --- + self.panel.rc_fans.clicked.connect(partial(self.run_routine, Process.FAN)) + self.panel.rc_bheat.clicked.connect( + partial(self.run_routine, Process.BED_HEATER) + ) + self.panel.rc_ext.clicked.connect(partial(self.run_routine, Process.EXTRUDER)) + self.panel.rc_axis.clicked.connect(partial(self.run_routine, Process.AXIS)) + self.panel.rc_no.clicked.connect(self.on_routine_answer) + self.panel.rc_yes.clicked.connect(self.on_routine_answer) + + # --- Axis Maintenance --- + self.panel.axis_x_btn.clicked.connect(partial(self.axis_maintenance, "x")) + self.panel.axis_y_btn.clicked.connect(partial(self.axis_maintenance, "y")) + self.panel.axis_z_btn.clicked.connect(partial(self.axis_maintenance, "z")) + + self.panel.toggle_led_button.state = ToggleAnimatedButton.State.ON + + # --- LEDs --- + # self.panel.leds_r_slider.sliderReleased.connect(self.update_led_values) + # self.panel.leds_g_slider.sliderReleased.connect(self.update_led_values) + # self.panel.leds_b_slider.sliderReleased.connect(self.update_led_values) + self.panel.leds_w_slider.sliderReleased.connect(self.update_led_values) + self.panel.toggle_led_button.clicked.connect(self.toggle_led_state) + + # --- Websocket/Printer Signals --- + self.run_gcode_signal.connect(self.ws.api.run_gcode) + self.is_page.run_gcode_signal.connect(self.ws.api.run_gcode) + self.subscribe_config[str, "PyQt_PyObject"].connect( + self.printer.on_subscribe_config + ) + self.subscribe_config[list, "PyQt_PyObject"].connect( + self.printer.on_subscribe_config + ) + self.printer.gcode_response.connect(self.handle_gcode_response) + + # --- Initialize Printer Communication --- + self.printer.printer_config.connect(self.on_printer_config_received) + self.printer.gcode_move_update.connect(self.on_gcode_move_update) + + self.panel.update_btn.setPixmap( + QtGui.QPixmap(":/system/media/btn_icons/update-software-icon.svg") + ) + + # ---- Input Shaper ---- + self.automatic_is = OptionCard( + self, + "Automatic\nInput Shaper", + "Automatic Input Shaper", + QtGui.QPixmap(":/input_shaper/media/btn_icons/input_shaper_auto.svg"), + ) # type: ignore + self.automatic_is.setObjectName("Automatic_IS_Card") + self.panel.is_content_layout.addWidget( + self.automatic_is, alignment=QtCore.Qt.AlignmentFlag.AlignHCenter + ) + self.automatic_is.continue_clicked.connect( + lambda: self.handle_is("SHAPER_CALIBRATE") + ) + + self.manual_is = OptionCard( + self, + "Manual\nInput Shaper", + "Manual Input Shaper", + QtGui.QPixmap(":/input_shaper/media/btn_icons/input_shaper_manual.svg"), + ) # type: ignore + self.manual_is.setObjectName("Manual_IS_Card") + self.panel.is_content_layout.addWidget( + self.manual_is, alignment=QtCore.Qt.AlignmentFlag.AlignHCenter + ) + self.manual_is.continue_clicked.connect(lambda: self.handle_is("")) + + self.is_types: dict = {} + self.is_aut_types: dict = {} + self.dialog_page.accepted.connect( + lambda: self.handle_is("SHAPER_CALIBRATE AXIS=Y") + ) + self.dialog_page.rejected.connect( + lambda: self.handle_is("SHAPER_CALIBRATE AXIS=X") + ) + + self.is_page.action_btn.clicked.connect( + lambda: self.change_page(self.indexOf(self.panel.input_shaper_page)) + ) + + def handle_gcode_response(self, data: list[str]) -> None: + """ + Parses a Klipper Input Shaper console message and updates self.is_types. + """ + + if not isinstance(data, list) or len(data) != 1 or not isinstance(data[0], str): + logger.warning( + "handle_gcode_response: invalid input format. Expected list[str], received: %r", + data, + ) + return + + message = data[0] + + pattern_fitted = re.compile( + r"Fitted shaper '(?P\w+)' frequency = (?P[\d\.]+) Hz \(vibrations = (?P[\d\.]+)%" + ) + match_fitted = pattern_fitted.search(message) + + if match_fitted: + name = match_fitted.group("name") + freq = float(match_fitted.group("freq")) + vib = float(match_fitted.group("vib")) + current_data = self.is_types.get(name, {}) + current_data.update( + { + "frequency": freq, + "vibration": vib, + "max_accel": current_data.get("max_accel", 0.0), + } + ) + self.is_types[name] = current_data + + return + pattern_accel = re.compile( + r"To avoid too much smoothing with '(?P\w+)', suggested max_accel <= (?P[\d\.]+) mm/sec\^2" + ) + match_accel = pattern_accel.search(message) + + if match_accel: + name = match_accel.group("name") + accel = float(match_accel.group("accel")) + + if name in self.is_types and isinstance(self.is_types[name], dict): + self.is_types[name]["max_accel"] = accel + else: + self.is_types[name] = self.is_types.get(name, {}) + self.is_types[name]["max_accel"] = accel + return + + pattern_recommended = re.compile( + r"Recommended shaper_type_(?P[xy]) = (?P\w+), shaper_freq_(?P=axis) = (?P[\d\.]+) Hz" + ) + match_recommended = pattern_recommended.search(message) + if match_recommended: + axis = match_recommended.group("axis") + recommended_type = match_recommended.group("type") + self.is_types["Axis"] = axis + if self.aut: + self.is_aut_types[axis] = recommended_type + if len(self.is_aut_types) == 2: + self.run_gcode_signal.emit("SAVE_CONFIG") + self._is_timeout_timer.stop() + self.call_load_panel.emit(False, "") + self.aut = False + return + return + + reordered = {recommended_type: self.is_types[recommended_type]} + for key, value in self.is_types.items(): + if key not in ("suggested_type", recommended_type, "Axis"): + reordered[key] = value + + self.is_page.set_type_dictionary(self.is_types) + first_key = next(iter(reordered.keys()), None) + for key in reordered: + if key == first_key: + self.is_page.add_type_entry(key, "Recommended type") + else: + self.is_page.add_type_entry(key) + + self.is_page.build_model_list() + self._is_timeout_timer.stop() + self.call_load_panel.emit(False, "") + return + + def handle_is(self, gcode: str) -> None: + if gcode == "SHAPER_CALIBRATE": + self.run_gcode_signal.emit("G28\nM400") + self.aut = True + self.run_gcode_signal.emit(gcode) + elif gcode == "": + self.dialog_page.confirm_background_color("#dfdfdf") + self.dialog_page.cancel_background_color("#dfdfdf") + self.dialog_page.cancel_font_color("#000000") + self.dialog_page.confirm_font_color("#000000") + self.dialog_page.cancel_button_text("X axis") + self.dialog_page.confirm_button_text("Y axis") + self.dialog_page.set_message( + "Select the axis you want to execute the input shaper on:" + ) + self.dialog_page.show() + return + else: + self.run_gcode_signal.emit("G28\nM400") + self.run_gcode_signal.emit(gcode) + self.change_page(self.indexOf(self.is_page)) + + self._is_timeout_timer.start() + self.call_load_panel.emit(True, "Running Input Shaper...") + + def _on_is_timeout(self) -> None: + self.call_load_panel.emit(False, "") + + @QtCore.pyqtSlot(list, name="on_object_list") + def on_object_list(self, object_list: list) -> None: + """Handle receiving printer object list""" + self.cg = object_list + for obj in self.cg: + base_name = obj.split()[0] + + # Only accept 'fan_generic' or 'fan' + if base_name == "fan_generic" or base_name == "fan": + self.objects["fans"][obj.removeprefix(base_name + " ")] = "indef" + self._update_leds_from_config() + + @QtCore.pyqtSlot(dict, name="on_object_config") + @QtCore.pyqtSlot(list, name="on_object_config") + def on_object_config(self, config: dict | list) -> None: + """Handle receiving printer object configurations""" + if not config: + return + config_items = [config] if isinstance(config, dict) else config + for item in config_items: + for key, value in item.items(): + if ( + key.startswith("stepper_") + and isinstance(value, dict) + and key not in self.stepper_limits + ): + pos_min = value.get("position_min") + pos_max = value.get("position_max") + if pos_min is not None or pos_max is not None: + self.stepper_limits[key] = { + "min": ( + float(pos_min) if pos_min is not None else -float("inf") + ), + "max": ( + float(pos_max) if pos_max is not None else float("inf") + ), + } + + def on_printer_config_received(self, config: dict) -> None: + """Handle printer configuration""" + for axis in ("x", "y", "z"): + self.subscribe_config[str, "PyQt_PyObject"].emit( + f"stepper_{axis}", self.on_object_config + ) + + @QtCore.pyqtSlot(str, list, name="on_gcode_move_update") + def on_gcode_move_update(self, name: str, value: list) -> None: + """Handle gcode move""" + if not value: + return + if name == "gcode_position": + ... + + def run_routine(self, process: Process): + """Run check routine for available processes""" + self.current_process = process + routine_configs = { + Process.FAN: ("fans", "fan is spinning"), + Process.AXIS: ("axis", "axis is moving"), + Process.BED_HEATER: ("bheat", "bed is heating"), + Process.EXTRUDER: ("extrude", "extruder is being tested"), + } + if process not in routine_configs: + return + obj_key, message = routine_configs[process] + obj_list = list(self.objects.get(obj_key, {}).keys()) + if not self._advance_routine_object(obj_list): + if process == Process.FAN: + self.run_gcode_signal.emit("M107") + for i in self.objects["fans"]: + self.run_gcode_signal.emit( + f"SET_FAN_SPEED FAN={i.removeprefix('fan_generic ')} SPEED=0\nM400" + ) + + if self.tb: + self.troubleshoot_request() + self.tb = False + else: + self.change_page(self.indexOf(self.panel.utilities_page)) + + return + + message = f"Please check if the {self.current_object} is functioning correctly." + if process == Process.AXIS: + message = f"Please ensure the {self.current_object} axis moves correctly." + elif process in [Process.BED_HEATER, Process.EXTRUDER]: + message = "Please check if the temperature reaches 60°C. \n you may need to wait a few moments." + + self.set_routine_check_page( + f"Running routine for: {self.current_object}", message + ) + self.show_waiting_page( + self.indexOf(self.panel.rc_page), + f"Please check if the {message}", + 10000 if process == Process.AXIS else 0, + ) + self._send_routine_gcode() + + def _advance_routine_object(self, obj_list: list) -> bool: + if not obj_list: + is_first_run = self.current_object is None + self.current_object = obj_list[0] if is_first_run and obj_list else "done" + return is_first_run + if self.current_object not in obj_list: + if self.current_process == Process.AXIS: + self.run_gcode_signal.emit("G28") + self.current_object = obj_list[0] + return True + try: + current_index = obj_list.index(self.current_object) + if current_index + 1 < len(obj_list): + self.current_object = obj_list[current_index + 1] + return True + else: + self.current_object = None + return False + except ValueError: + self.current_object = obj_list[0] + return True + + def on_routine_answer(self) -> None: + """Handle routine ongoing process""" + if self.current_process is None or self.current_object is None: + return + if self.sender() == self.panel.rc_yes: + answer = "yes" + else: + answer = "no" + self.tb = True + process_map = { + Process.FAN: ("fans", self.current_object), + Process.AXIS: ("axis", self.current_object), + Process.BED_HEATER: ("bheat", "Bed_Heater"), + Process.EXTRUDER: ("extrude", "extruder"), + } + if self.current_process in process_map: + obj_key, item_key = process_map[self.current_process] + self.objects[obj_key][item_key] = answer + if self.current_process in [Process.BED_HEATER, Process.EXTRUDER]: + self.run_gcode_signal.emit("TURN_OFF_HEATERS") + self.run_routine(self.current_process) + elif self.current_process == Process.AXIS_MAINTENANCE: + if answer == "yes": + self._run_axis_maintenance_gcode(self.current_object) + else: + self.change_page(self.indexOf(self.panel.axes_page)) + + def _send_routine_gcode(self): + """Send the correct G-code for the current process and object.""" + if self.current_process == Process.FAN: + fan_name = self.current_object or next(iter(self.objects["fans"]), None) + if fan_name: + if fan_name == "fan": + self.run_gcode_signal.emit("M106 S255\nM400") + else: + self.run_gcode_signal.emit( + f"SET_FAN_SPEED FAN={fan_name.removeprefix('fan_generic ')} SPEED=0.8\nM400" + ) + + return + + gcode_map = { + Process.BED_HEATER: "SET_HEATER_TEMPERATURE HEATER=heater_bed TARGET=60", + Process.EXTRUDER: "SET_HEATER_TEMPERATURE HEATER=extruder TARGET=60", + (Process.AXIS, "x"): "G91\nG1 X250 F1000\nG1 X-250 F1000", + (Process.AXIS, "y"): "G91\nG1 Y250 F1000\nG1 Y-250 F1000", + (Process.AXIS, "z"): "G91\nG1 Z250 F1000\nG1 Z-250 F1000", + } + + key = ( + (self.current_process, self.current_object) + if self.current_process == Process.AXIS + else self.current_process + ) + + if gcode := gcode_map.get(key): + self.run_gcode_signal.emit(f"{gcode}\nM400") + + def set_routine_check_page(self, title: str, label: str): + """Set text on routine page""" + self.panel.rc_tittle.setText(title) + self.panel.rc_label.setText(label) + + def update_led_values(self) -> None: + """Update led state and color values""" + if self.current_object not in self.objects["leds"]: + return + led_state: LedState = self.objects["leds"][self.current_object] + led_state.white = int(self.panel.leds_w_slider.value() * 255 / 100) + self.save_led_state() + + def _update_leds_from_config(self): + layout = self.panel.leds_content_layout + + while layout.count(): + if (child := layout.takeAt(0)) and child.widget(): + child.widget().deleteLater() # type: ignore + + led_names = [] + if not self.cg: + return + + # Collect LED names - match Klipper LED hardware types only + for obj in self.cg: + parts = obj.split() + if len(parts) >= 2 and parts[0] in self._LED_TYPES: + name = parts[1] + led_names.append(name) + self.objects["leds"][name] = LedState(led_type="white") + + max_columns = 3 + buttons = [] # store references to created buttons + + # Create LED buttons + for i, name in enumerate(led_names): + if self.panel.leds_widget: + button = BlocksCustomButton() + button.setFixedSize(200, 70) + button.setText(name) + button.setProperty("class", "menu_btn") + button.setPixmap(self._led_pixmap) + row, col = divmod(i, max_columns) + layout.addWidget(button, row, col) + button.clicked.connect(partial(self.handle_led_button, name)) + buttons.append(button) + + try: + self.panel.utilities_leds_btn.clicked.disconnect() + except (RuntimeError, TypeError): + pass + if len(buttons) == 1: + self.panel.utilities_leds_btn.clicked.connect( + partial(self.handle_led_button, led_names[0]) + ) + elif len(buttons) > 1: + self._connect_page_change( + self.panel.utilities_leds_btn, self.panel.leds_page + ) + + def toggle_led_state(self) -> None: + """Toggle leds""" + if self.current_object not in self.objects["leds"]: + return + led_state: LedState = self.objects["leds"][self.current_object] + if led_state.state == "off": + led_state.state = "on" + self.panel.toggle_led_button.state = ToggleAnimatedButton.State.ON + else: + led_state.state = "off" + self.panel.toggle_led_button.state = ToggleAnimatedButton.State.OFF + self.save_led_state() + + def handle_led_button(self, name: str) -> None: + """Handle led button clicked""" + self.current_object = name + led_state: LedState = self.objects["leds"].get(name) + if not led_state: + return + is_rgb = led_state.led_type == "rgb" + self.panel.leds_w_slider.setVisible(not is_rgb) + self.panel.leds_w_slider.setValue(led_state.white) + self.change_page(self.indexOf(self.panel.leds_slider_page)) + + def save_led_state(self): + """Save led state""" + if self.current_object: + if self.current_object in self.objects["leds"]: + led_state: LedState = self.objects["leds"][self.current_object] + self.run_gcode_signal.emit(led_state.get_gcode(self.current_object)) + + def axis_maintenance(self, axis: str) -> None: + """Routine, checks axis movement for printer debugging""" + self.current_process = Process.AXIS_MAINTENANCE + self.current_object = axis + self.run_gcode_signal.emit(f"G28 {axis.upper()}\nM400") + if axis == "x": + self.run_gcode_signal.emit("G1 X10 Y250 F18000") + self.set_routine_check_page( + "Axis Maintenance", + f"Insert oil on the {axis.upper()} axis before confirming.", + ) + self.show_waiting_page( + self.indexOf(self.panel.rc_page), + f"Homing {axis.upper()} axis...", + 5000, + ) + + def _run_axis_maintenance_gcode(self, axis: str): + stepper_key = f"stepper_{axis}" + if stepper_key in self.stepper_limits: + max_pos = self.stepper_limits[stepper_key].get("max", 20) + distance = int(max_pos) - 20 + self.run_gcode_signal.emit( + f"G1 {axis.upper()}{distance} F3000\nM400\nG28\nM400" + ) + self.show_waiting_page( + self.indexOf(self.panel.axes_page), + f"Running maintenance cycle on {axis.upper()} axis...", + 5000, + ) + else: + self.change_page(self.indexOf(self.panel.axes_page)) + + def troubleshoot_request(self) -> None: + """Show troubleshoot page""" + self.troubleshoot_page.show() + + def show_waiting_page(self, page_to_go_to: int, label: str, time_ms: int): + """Show placeholder page""" + self.call_load_panel.emit(True, label) + QtCore.QTimer.singleShot(time_ms, lambda: self.change_page(page_to_go_to)) + + def _connect_page_change(self, button: QtWidgets.QWidget, page: QtWidgets.QWidget): + if isinstance(button, QtWidgets.QAbstractButton): + button.clicked.connect(lambda: self.change_page(self.indexOf(page))) + + def change_page(self, index: int): + """Request change page by index""" + self.call_load_panel.emit(False, "") + self.troubleshoot_page.hide() + if index < self.count(): + self.request_change_page.emit(3, index) + + @QtCore.pyqtSlot(name="request-back") + def back_button(self) -> None: + """Request back""" + self.request_back.emit() diff --git a/BlocksScreen/lib/panels/widgets/basePopup.py b/BlocksScreen/lib/panels/widgets/basePopup.py index 199f9bf2..12e8d195 100644 --- a/BlocksScreen/lib/panels/widgets/basePopup.py +++ b/BlocksScreen/lib/panels/widgets/basePopup.py @@ -1,5 +1,3 @@ -import typing - from PyQt6 import QtCore, QtGui, QtWidgets @@ -47,13 +45,11 @@ def __init__( self.setAttribute(QtCore.Qt.WidgetAttribute.WA_TranslucentBackground, True) self.setWindowModality(QtCore.Qt.WindowModality.ApplicationModal) else: - self.setStyleSheet( - """ + self.setStyleSheet(""" #MyParent { background-image: url(:/background/media/1st_background.png); } - """ - ) + """) def _update_button_style(self) -> None: """Applies the current color variables and adds the central border to the stylesheets.""" @@ -61,26 +57,21 @@ def _update_button_style(self) -> None: return if not self.floating: - self.confirm_button.setStyleSheet( - f""" + self.confirm_button.setStyleSheet(f""" background-color: {self.confirm_bk_color}; color: {self.confirm_ft_color}; border: none; padding: 10px; - """ - ) + """) - self.cancel_button.setStyleSheet( - f""" + self.cancel_button.setStyleSheet(f""" background-color: {self.cancel_bk_color}; color: {self.cancel_ft_color}; border: none; padding: 10px; - """ - ) + """) else: - self.confirm_button.setStyleSheet( - f""" + self.confirm_button.setStyleSheet(f""" background-color: {self.confirm_bk_color}; color: {self.confirm_ft_color}; border-top: none; @@ -89,11 +80,9 @@ def _update_button_style(self) -> None: border-right: 1px solid #80807e; border-bottom-left-radius: 16px; padding: 10px; - """ - ) + """) - self.cancel_button.setStyleSheet( - f""" + self.cancel_button.setStyleSheet(f""" background-color: {self.cancel_bk_color}; color: {self.cancel_ft_color}; border-left: 1px solid #80807e;; @@ -101,8 +90,7 @@ def _update_button_style(self) -> None: border-right: 2px solid #80807e; border-bottom-right-radius: 16px; padding: 10px; - """ - ) + """) def set_message(self, message: str) -> None: self.label.setText(message) @@ -152,7 +140,7 @@ def add_widget(self, widget: QtWidgets.QWidget) -> None: layout.insertWidget(index, self.ui) self.ui.show() - def _get_mainWindow_widget(self) -> typing.Optional[QtWidgets.QMainWindow]: + def _get_mainWindow_widget(self) -> QtWidgets.QMainWindow | None: """Get the main application window""" app_instance = QtWidgets.QApplication.instance() if not app_instance: diff --git a/BlocksScreen/lib/panels/widgets/cancelPage.py b/BlocksScreen/lib/panels/widgets/cancelPage.py index e16fb2e6..832c2f29 100644 --- a/BlocksScreen/lib/panels/widgets/cancelPage.py +++ b/BlocksScreen/lib/panels/widgets/cancelPage.py @@ -1,17 +1,16 @@ +import logging +import typing + from lib.utils.blocks_button import BlocksCustomButton from lib.utils.blocks_frame import BlocksCustomFrame from lib.utils.blocks_label import BlocksLabel from PyQt6 import QtCore, QtGui, QtWidgets -import typing -from lib.moonrakerComm import MoonWebSocket +logger = logging.getLogger(__name__) class CancelPage(QtWidgets.QWidget): - """Update GUI Page, - retrieves from moonraker available clients and adds functionality - for updating or recovering them - """ + """Displayed when a print is cancelled; offers reprint or ignore.""" request_file_info: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( str, name="request_file_info" @@ -23,16 +22,13 @@ class CancelPage(QtWidgets.QWidget): str, name="run_gcode" ) - def __init__(self, parent: QtWidgets.QWidget, ws: MoonWebSocket) -> None: + def __init__(self, parent: QtWidgets.QWidget) -> None: super().__init__(parent) - self.ws: MoonWebSocket = ws self._setupUI() self.filename = "" - - self.reprint_start.connect(self.ws.api.start_print) + self._thumbnail_scan_done: bool = False self.confirm_button.clicked.connect(lambda: self._handle_accept()) - self.refuse_button.clicked.connect(lambda: self._handle_refuse()) self.setAttribute(QtCore.Qt.WidgetAttribute.WA_StyledBackground, True) @@ -52,8 +48,10 @@ def _handle_refuse(self): def on_print_stats_update(self, field: str, value: dict | float | str) -> None: if isinstance(value, str): if "filename" in field: + if value != self.filename: + self._thumbnail_scan_done = False self.filename = value - if self.isVisible: + if self.isVisible(): self.set_file_name(value) def show(self): @@ -91,22 +89,25 @@ def set_pixmap(self, pixmap: QtGui.QPixmap) -> None: def set_file_name(self, file_name: str) -> None: self.cf_file_name.setText(file_name) - def _show_screen_thumbnail(self, dict): - try: - thumbnails = dict["thumbnail_images"] + def _show_screen_thumbnail(self, metadata: dict | None) -> None: + """Display the largest thumbnail from file metadata. - last_thumb = QtGui.QPixmap.fromImage(thumbnails[-1]) - - if last_thumb.isNull(): - last_thumb = QtGui.QPixmap( - "BlocksScreen/lib/ui/resources/media/logoblocks400x300.png" - ) - except Exception as e: - print(e) - last_thumb = QtGui.QPixmap( - "BlocksScreen/lib/ui/resources/media/logoblocks400x300.png" - ) - self.set_pixmap(last_thumb) + ``thumbnail_images`` values are pre-loaded ``QImage`` + objects produced by ``Files._process_metadata``. + """ + fallback = QtGui.QPixmap( + "BlocksScreen/lib/ui/resources/media/logoblocks400x300.png" + ) + thumbnails = metadata.get("thumbnail_images", []) if metadata else [] + if not thumbnails: + self.set_pixmap(fallback) + return + + last_thumb = thumbnails[-1] + if isinstance(last_thumb, QtGui.QImage) and not last_thumb.isNull(): + self.set_pixmap(QtGui.QPixmap.fromImage(last_thumb)) + else: + self.set_pixmap(fallback) def _setupUI(self) -> None: """Setup widget ui""" @@ -119,11 +120,9 @@ def _setupUI(self) -> None: sizePolicy.setHeightForWidth(self.sizePolicy().hasHeightForWidth()) self.setSizePolicy(sizePolicy) self.setObjectName("cancelPage") - self.setStyleSheet( - """#cancelPage { + self.setStyleSheet("""#cancelPage { background-image: url(:/background/media/1st_background.png); - }""" - ) + }""") self.setMinimumSize(QtCore.QSize(800, 480)) self.setMaximumSize(QtCore.QSize(800, 480)) self.setLayoutDirection(QtCore.Qt.LayoutDirection.LeftToRight) diff --git a/BlocksScreen/lib/panels/widgets/confirmPage.py b/BlocksScreen/lib/panels/widgets/confirmPage.py index 0f35ba39..d759f0ea 100644 --- a/BlocksScreen/lib/panels/widgets/confirmPage.py +++ b/BlocksScreen/lib/panels/widgets/confirmPage.py @@ -1,3 +1,4 @@ +import logging import os import typing @@ -8,8 +9,12 @@ from lib.utils.icon_button import IconButton from PyQt6 import QtCore, QtGui, QtWidgets +logger = logging.getLogger(__name__) + class ConfirmWidget(QtWidgets.QWidget): + """Widget displayed when a user selects a file to print.""" + on_accept: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( str, name="on_accept" ) @@ -26,7 +31,6 @@ def __init__(self, parent) -> None: self.setMouseTracking(True) self.setAttribute(QtCore.Qt.WidgetAttribute.WA_AcceptTouchEvents, True) self.thumbnail: QtGui.QImage = self._blocksthumbnail - self._thumbnails: typing.List = [] self.directory = "gcodes" self.filename = "" self.confirm_button.clicked.connect( @@ -39,32 +43,34 @@ def __init__(self, parent) -> None: lambda: self.on_delete.emit(self.filename, self.directory) ) - @QtCore.pyqtSlot(str, dict, name="on_show_widget") - def on_show_widget(self, text: str, filedata: dict | None = None) -> None: - """Handle widget show""" - if not filedata: - return + @QtCore.pyqtSlot(str, object, name="on_show_widget") + def on_show_widget(self, text: str, metadata: dict | None = None) -> None: + """Handle widget show.""" directory = os.path.dirname(text) filename = os.path.basename(text) self.directory = directory self.filename = filename self.cf_file_name.setText(self.filename) - self._thumbnails = filedata.get("thumbnail_images", []) - if self._thumbnails: - _biggest_thumbnail = self._thumbnails[-1] # Show last which is biggest - self.thumbnail = QtGui.QImage(_biggest_thumbnail) - else: + if metadata is None: self.thumbnail = self._blocksthumbnail - _total_filament = filedata.get("filament_weight_total") - _estimated_time = filedata.get("estimated_time") - if isinstance(_estimated_time, str): - seconds = 0 - else: - seconds = _estimated_time + self.cf_info_tf.setText("Total Filament: loading...") + self.cf_info_tr.setText("Slicer time: loading...") + self.update() + return + self._update_metadata_labels(metadata) + self.update() + + def _update_metadata_labels(self, metadata: dict) -> None: + """Update thumbnail and text labels from metadata.""" + self._apply_thumbnail(metadata) + raw_weight = metadata.get("filament_weight_total", 0) + _total_filament: float | str = raw_weight if raw_weight > 0 else 0 + seconds = metadata.get("estimated_time", 0) + seconds = seconds if seconds > 0 else 0 days, hours, minutes, _ = helper_methods.estimate_print_time(seconds) if seconds <= 0: - time_str = "??" + time_str = "Unknown" elif seconds < 60: time_str = "less than 1 minute" else: @@ -83,9 +89,39 @@ def on_show_widget(self, text: str, filedata: dict | None = None) -> None: _total_filament = str("%.2f" % _total_filament) + "g" filament_label = f"Total Filament: {_total_filament}" time_label = f"Slicer time: {time_str}" - self.cf_info_tf.setText(f"{filament_label}") - self.cf_info_tr.setText(f"{time_label}") - self.repaint() + self.cf_info_tf.setText(filament_label) + self.cf_info_tr.setText(time_label) + + def _apply_thumbnail(self, metadata: dict) -> None: + """Set self.thumbnail from metadata, falling back to the logo.""" + thumbnails = metadata.get("thumbnail_images", []) + if thumbnails: + last = thumbnails[-1] + if isinstance(last, QtGui.QImage) and not last.isNull(): + self.thumbnail = last + return + self.thumbnail = self._blocksthumbnail + + @QtCore.pyqtSlot(dict, name="on_fileinfo") + def on_fileinfo(self, metadata: dict) -> None: + """Update thumbnail and metadata labels when new data arrives.""" + if not metadata or not self.filename: + return + incoming = metadata.get("filename", "") + current = ( + f"{self.directory}/{self.filename}" if self.directory else self.filename + ) + # Also accept bare-filename match for USB files: Moonraker may strip the + # USB directory prefix from the returned filename. + is_usb_bare_match = ( + incoming == self.filename + and self.directory.startswith("USB-") + and incoming == os.path.basename(incoming) + ) + if incoming != current and not is_usb_bare_match: + return + self._update_metadata_labels(metadata) + self.update() def estimate_print_time(self, seconds: int) -> list: """Convert time in seconds format to days, hours, minutes, seconds. @@ -142,8 +178,8 @@ def paintEvent(self, event: QtGui.QPaintEvent) -> None: def showEvent(self, a0: QtGui.QShowEvent) -> None: """Re-implemented method, Handle widget show event""" - if not self.thumbnail: - self.cf_thumbnail.close() + if self.thumbnail.isNull(): + self.cf_thumbnail.hide() return super().showEvent(a0) def _setupUI(self) -> None: @@ -252,7 +288,6 @@ def _setupUI(self) -> None: "icon_pixmap", QtGui.QPixmap(":/dialog/media/btn_icons/yes.svg") ) self.confirm_button.setText("Print") - # 2. Align buttons to the right self.cf_confirm_layout.addWidget( self.confirm_button, 0, QtCore.Qt.AlignmentFlag.AlignCenter ) @@ -266,7 +301,6 @@ def _setupUI(self) -> None: "icon_pixmap", QtGui.QPixmap(":/ui/media/btn_icons/garbage-icon.svg") ) self.delete_file_button.setText("Delete") - # 2. Align buttons to the right self.cf_confirm_layout.addWidget( self.delete_file_button, 0, QtCore.Qt.AlignmentFlag.AlignCenter ) diff --git a/BlocksScreen/lib/panels/widgets/connectionPage.py b/BlocksScreen/lib/panels/widgets/connectionPage.py index fcf6e8d3..83f0a54e 100644 --- a/BlocksScreen/lib/panels/widgets/connectionPage.py +++ b/BlocksScreen/lib/panels/widgets/connectionPage.py @@ -138,8 +138,6 @@ def __init__(self, parent: QtWidgets.QWidget, ws: MoonWebSocket, /) -> None: self.update_page_button.clicked.connect(self._on_update_page_clicked) self.wifi_button.clicked.connect(self.wifi_button_clicked.emit) - self.installEventFilter(self.parent()) - def _apply_shutdown_guard(self, state: ConnectionState, context: str) -> bool: if ( self._state == ConnectionState.KLIPPER_SHUTDOWN diff --git a/BlocksScreen/lib/panels/widgets/inputshaperPage.py b/BlocksScreen/lib/panels/widgets/inputshaperPage.py index ead82cfb..dd890fb5 100644 --- a/BlocksScreen/lib/panels/widgets/inputshaperPage.py +++ b/BlocksScreen/lib/panels/widgets/inputshaperPage.py @@ -1,11 +1,11 @@ +import typing + from lib.utils.blocks_button import BlocksCustomButton from lib.utils.blocks_frame import BlocksCustomFrame from lib.utils.icon_button import IconButton from lib.utils.list_model import EntryDelegate, EntryListModel, ListItem from PyQt6 import QtCore, QtGui, QtWidgets -import typing - class InputShaperPage(QtWidgets.QWidget): """Update GUI Page, @@ -24,6 +24,7 @@ def __init__(self, parent=None) -> None: else: super().__init__() self._setupUI() + self.currentItem: ListItem | None = None self.selected_item: ListItem | None = None self.ongoing_update: bool = False self.type_dict: dict = {} @@ -96,21 +97,26 @@ def on_item_clicked(self, item: ListItem) -> None: if not current_info: return - self.vib_label.setText(str("%.0f" % current_info.get("vibration", "N/A")) + "%") + _vib = current_info.get("vibration") + self.vib_label.setText(f"{float(_vib):.0f}%" if _vib is not None else "N/A%") + _accel = current_info.get("max_accel") self.sug_accel_label.setText( - str("%.0f" % current_info.get("max_accel", "N/A")) + "mm/s²" + f"{float(_accel):.0f}mm/s²" if _accel is not None else "N/Amm/s²" ) self.action_btn.show() def handle_ism_confirm(self) -> None: + """Apply the selected input shaper type to the printer and save the config.""" + if self.currentItem is None: + return current_info = self.type_dict.get(self.currentItem.text, {}) frequency = current_info.get("frequency", "N/A") - if self.type_dict["Axis"] == "x": + if self.type_dict.get("Axis") == "x": self.run_gcode_signal.emit( f"SET_INPUT_SHAPER SHAPER_TYPE_X={self.currentItem.text} SHAPER_FREQ_X={frequency}" ) - elif self.type_dict["Axis"] == "y": + elif self.type_dict.get("Axis") == "y": self.run_gcode_signal.emit( f"SET_INPUT_SHAPER SHAPER_TYPE_Y={self.currentItem.text} SHAPER_FREQ_Y={frequency}" ) @@ -138,7 +144,8 @@ def _setupUI(self) -> None: font_id = QtGui.QFontDatabase.addApplicationFont( ":/font/media/fonts for text/Momcake-Bold.ttf" ) - font_family = QtGui.QFontDatabase.applicationFontFamilies(font_id)[0] + _families = QtGui.QFontDatabase.applicationFontFamilies(font_id) + font_family = _families[0] if _families else "" sizePolicy = QtWidgets.QSizePolicy( QtWidgets.QSizePolicy.Policy.MinimumExpanding, QtWidgets.QSizePolicy.Policy.MinimumExpanding, diff --git a/BlocksScreen/lib/panels/widgets/sensorWidget.py b/BlocksScreen/lib/panels/widgets/sensorWidget.py index c479e5dc..8c4ee2c0 100644 --- a/BlocksScreen/lib/panels/widgets/sensorWidget.py +++ b/BlocksScreen/lib/panels/widgets/sensorWidget.py @@ -104,6 +104,13 @@ def change_fil_sensor_state(self, state: FilamentState): self.filament_state = SensorWidget.FilamentState(not state.value) self.update() + def set_filament_state(self, state: FilamentState) -> None: + """Set filament state directly without inversion.""" + if not isinstance(state, SensorWidget.FilamentState): + return + self.filament_state = state + self.update() + def toggle_button_state(self, state: ToggleAnimatedButton.State) -> None: """Called when the Klipper firmware reports an update to the filament sensor state""" self.toggle_button.setDisabled(False) diff --git a/BlocksScreen/lib/panels/widgets/sensorsPanel.py b/BlocksScreen/lib/panels/widgets/sensorsPanel.py index df63cfb5..6ef78dba 100644 --- a/BlocksScreen/lib/panels/widgets/sensorsPanel.py +++ b/BlocksScreen/lib/panels/widgets/sensorsPanel.py @@ -11,15 +11,12 @@ class SensorsWindow(QtWidgets.QWidget): run_gcode_signal: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( str, name="run_gcode" ) - change_fil_sensor_state: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( - SensorWidget.FilamentState, name="change_fil_sensor_state" - ) request_back: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( name="request_back" ) def __init__(self, parent): - super(SensorsWindow, self).__init__(parent) + super().__init__(parent) self.model = EntryListModel() self.entry_delegate = EntryDelegate() self.sensor_tracking_widget = {} @@ -32,11 +29,15 @@ def __init__(self, parent): self.fs_back_button.clicked.connect(self.request_back) def reset_view_model(self) -> None: - """Clears items from ListView - (Resets `QAbstractListModel` by clearing entries) - """ + """Clears items from ListView and removes existing sensor widgets.""" self.model.clear() self.entry_delegate.clear() + for widget in self.sensor_tracking_widget.values(): + self.info_box_layout.removeWidget(widget) + widget.deleteLater() + self.sensor_tracking_widget.clear() + self.sensor_list.clear() + self.current_widget = None @QtCore.pyqtSlot(dict, name="handle_available_fil_sensors") def handle_available_fil_sensors(self, sensors: dict) -> None: @@ -46,7 +47,7 @@ def handle_available_fil_sensors(self, sensors: dict) -> None: self.reset_view_model() filtered_sensors = [ sensor - for sensor in sensors.keys() + for sensor in sensors if sensor.startswith( ("filament_switch_sensor", "filament_motion_sensor", "cutter_sensor") ) @@ -63,14 +64,20 @@ def handle_available_fil_sensors(self, sensors: dict) -> None: def handle_fil_state_change( self, sensor_name: str, parameter: str, value: bool ) -> None: - """Handle Klipper signals for filament sensor changes""" + """Handle Klipper signals for filament sensor changes.""" _item = self.sensor_tracking_widget.get(sensor_name) - if _item: - if parameter == "filament_detected": - state = SensorWidget.FilamentState(not value) - _item.change_fil_sensor_state(state) - elif parameter == "enabled": - _item.toggle_button_state(SensorWidget.SensorState(value)) + if not _item: + return + if parameter == "filament_detected": + # filament_detected=True means filament IS present + state = ( + SensorWidget.FilamentState.PRESENT + if value + else SensorWidget.FilamentState.MISSING + ) + _item.set_filament_state(state) + elif parameter == "enabled": + _item.toggle_button_state(SensorWidget.SensorState(value)) def showEvent(self, event: QtGui.QShowEvent | None) -> None: """Re-add clients to update list""" @@ -108,7 +115,8 @@ def create_sensor_widget(self, name: str) -> SensorWidget: else: _item_widget.show() self.current_widget = _item_widget - name_id = str(name).split(" ")[1] + _parts = str(name).split(" ", 1) + name_id = _parts[1] if len(_parts) > 1 else _parts[0] item = ListItem( text=name_id, right_text="", @@ -133,7 +141,8 @@ def _setupUi(self) -> None: font_id = QtGui.QFontDatabase.addApplicationFont( ":/font/media/fonts for text/Momcake-Bold.ttf" ) - font_family = QtGui.QFontDatabase.applicationFontFamilies(font_id)[0] + _families = QtGui.QFontDatabase.applicationFontFamilies(font_id) + font_family = _families[0] if _families else "" sizePolicy = QtWidgets.QSizePolicy( QtWidgets.QSizePolicy.Policy.MinimumExpanding, QtWidgets.QSizePolicy.Policy.MinimumExpanding, diff --git a/BlocksScreen/lib/panels/widgets/troubleshootPage.py b/BlocksScreen/lib/panels/widgets/troubleshootPage.py index 0c327ac7..8673f045 100644 --- a/BlocksScreen/lib/panels/widgets/troubleshootPage.py +++ b/BlocksScreen/lib/panels/widgets/troubleshootPage.py @@ -1,6 +1,5 @@ -from PyQt6 import QtCore, QtGui, QtWidgets - from lib.utils.icon_button import IconButton +from PyQt6 import QtCore, QtGui, QtWidgets class TroubleshootPage(QtWidgets.QDialog): @@ -9,14 +8,12 @@ def __init__( parent: QtWidgets.QWidget, ) -> None: super().__init__(parent) - self.setStyleSheet( - """ + self.setStyleSheet(""" #troubleshoot_page { background-image: url(:/background/media/1st_background.png); border: none; } - """ - ) + """) self.setWindowFlags( QtCore.Qt.WindowType.Popup | QtCore.Qt.WindowType.FramelessWindowHint ) diff --git a/BlocksScreen/lib/printer.py b/BlocksScreen/lib/printer.py index 20c7f1b7..94232bff 100644 --- a/BlocksScreen/lib/printer.py +++ b/BlocksScreen/lib/printer.py @@ -120,7 +120,7 @@ class Printer(QtCore.QObject): current_loaded_file_metadata: str = "" def __init__(self, parent: QtCore.QObject, ws: MoonWebSocket, /) -> None: - super(Printer, self).__init__(parent) + super().__init__(parent) self.ws = ws self.active_extruder_name: str = "" @@ -162,6 +162,18 @@ def clear_printer_objs(self) -> None: self.printer_busy = False self.current_loaded_file = "" self.current_loaded_file_metadata = "" + _heater_attributes: dict = { + "current_temperature": 0.0, + "target_temperature": 0.0, + "can_extrude": False, + } + self.heaters_object = { + "extruder": _heater_attributes.copy(), + "bed": _heater_attributes.copy(), + } + self.active_extruder_name = "" + self.available_filament_sensors = {} + self.has_chamber = False def __inject_callback( self, object_type: str, callback: typing.Callable[[dict, str], None] @@ -281,7 +293,7 @@ def get_config(self, section_name: str) -> dict: return _config[0].get(section_name, {}) def search_config_list( - self, search_list: list[str], _objects: typing.Optional[list] = None + self, search_list: list[str], _objects: list | None = None ) -> list: """ Search a list of printer objects recursively @@ -355,7 +367,7 @@ def _gcode_response(self, report: list) -> None: self.gcode_response.emit(report) def _webhook_printcore_updated(self, value: dict): - self.on_printcore_update[dict].emit(value) + self.on_printcore_update.emit(value) def _webhooks_object_updated(self, value: dict, name: str = "webhooks") -> None: """Sends an event type according to the received state @@ -365,7 +377,7 @@ def _webhooks_object_updated(self, value: dict, name: str = "webhooks") -> None: value (dict): _description_ name (str, optional): _description_. Defaults to "". """ - if "state" in value.keys() and "state_message" in value.keys(): + if "state" in value and "state_message" in value: self.webhooks_update.emit(value["state"], value["state_message"]) logger.debug("Webhooks message received") _state: str = value["state"] @@ -380,7 +392,7 @@ def _webhooks_object_updated(self, value: dict, name: str = "webhooks") -> None: event = _event_callback(value["state"], value["state_message"]) instance = QtWidgets.QApplication.instance() if instance is not None and isinstance(event, QtCore.QEvent): - instance.sendEvent(self.parent(), event) + instance.postEvent(self.parent(), event) else: raise TypeError("QApplication.instance is None type.") except Exception as e: @@ -419,60 +431,60 @@ def _mmu_object_updated(self, value: dict, name: str = "mmu") -> None: # i only putted the most relevant ones, there are some other parameters that can be added later if needed def _gcode_move_object_updated(self, value: dict, name: str = "gcode_move") -> None: - if "speed_factor" in value.keys(): + if "speed_factor" in value: self.gcode_move_update[str, float].emit( "speed_factor", value["speed_factor"] ) - if "speed" in value.keys(): + if "speed" in value: self.gcode_move_update[str, float].emit("speed", value["speed"]) - if "extrude_factor" in value.keys(): + if "extrude_factor" in value: self.gcode_move_update[str, float].emit( "extruder_factor", value["extrude_factor"] ) - if "absolute_coordinates" in value.keys(): + if "absolute_coordinates" in value: self.gcode_move_update[str, bool].emit( "absolute_coordinates", value["absolute_coordinates"] ) - if "absolute_extrude" in value.keys(): + if "absolute_extrude" in value: self.gcode_move_update[str, bool].emit( "absolute_extrude", value["absolute_extrude"] ) - if "homing_origin" in value.keys(): + if "homing_origin" in value: self.gcode_move_update[str, list].emit( "homing_origin", value["homing_origin"] ) - if "position" in value.keys(): + if "position" in value: self.gcode_move_update[str, list].emit("position", value["position"]) - if "gcode_position" in value.keys(): + if "gcode_position" in value: self.gcode_move_update[str, list].emit( "gcode_position", value["gcode_position"] ) def _toolhead_object_updated(self, values: dict, name: str = "toolhead") -> None: - if "homed_axes" in values.keys(): + if "homed_axes" in values: self.toolhead_update[str, str].emit("homed_axes", values["homed_axes"]) - if "print_time" in values.keys(): + if "print_time" in values: self.toolhead_update[str, float].emit("print_time", values["print_time"]) - if "estimated_print_time" in values.keys(): + if "estimated_print_time" in values: self.toolhead_update[str, float].emit( "estimated_print_time", values["estimated_print_time"] ) - if "extruder" in values.keys(): + if "extruder" in values: self.toolhead_update[str, str].emit("extruder", values["extruder"]) self.active_extruder_name = values["extruder"] - if "position" in values.keys(): + if "position" in values: self.toolhead_update[str, list].emit("position", values["position"]) - if "max_velocity" in values.keys(): + if "max_velocity" in values: self.toolhead_update[str, float].emit( "max_velocity", values["max_velocity"] ) - if "max_accel" in values.keys(): + if "max_accel" in values: self.toolhead_update[str, float].emit("max_accel", values["max_accel"]) - if "max_accel_to_decel" in values.keys(): + if "max_accel_to_decel" in values: self.toolhead_update[str, float].emit( "max_accel_to_decel", values["max_accel_to_decel"] ) - if "square_corner_velocity" in values.keys(): + if "square_corner_velocity" in values: self.toolhead_update[str, float].emit( "square_corner_velocity", values["square_corner_velocity"] ) @@ -480,73 +492,76 @@ def _toolhead_object_updated(self, values: dict, name: str = "toolhead") -> None def _extruder_object_updated( self, value: dict, extruder_name: str = "extruder" ) -> None: - if "temperature" in value.keys(): + """Handle extruder object updates and emit corresponding signals.""" + if extruder_name not in self.heaters_object: + self.heaters_object[extruder_name] = {} + if "temperature" in value: self.extruder_update.emit( extruder_name, "temperature", value["temperature"] ) self.heaters_object[f"{extruder_name}"]["actual_temperature"] = value[ "temperature" ] - if "target" in value.keys(): + if "target" in value: self.extruder_update.emit(extruder_name, "target", value["target"]) self.heaters_object[f"{extruder_name}"]["target_temperature"] = value[ "target" ] - if "can_extrude" in value.keys(): + if "can_extrude" in value: self.heaters_object[f"{extruder_name}"]["can_extrude"] = value[ "can_extrude" ] - if "power" in value.keys(): + if "power" in value: self.extruder_update.emit(extruder_name, "power", value["power"]) - if "pressure_advance" in value.keys(): + if "pressure_advance" in value: self.extruder_update.emit( extruder_name, "pressure_advance", value["pressure_advance"] ) - if "smooth_time" in value.keys(): + if "smooth_time" in value: self.extruder_update.emit( extruder_name, "smooth_time", value["smooth_time"] ) - if "can_extrude" in value.keys(): + if "can_extrude" in value: pass def _heater_bed_object_updated( self, value: dict, heater_name: str = "heater_bed" ) -> None: - if "temperature" in value.keys(): + if "temperature" in value: self.heater_bed_update.emit( heater_name, "temperature", value["temperature"] ) self.heaters_object["bed"]["actual_temperature"] = value["temperature"] - if "target" in value.keys(): + if "target" in value: self.heater_bed_update.emit(heater_name, "target", value["target"]) self.heaters_object["bed"]["target_temperature"] = value["target"] - if "power" in value.keys(): + if "power" in value: self.heater_bed_update.emit(heater_name, "power", value["power"]) def _fan_object_updated(self, value: dict, fan_name: str = "fan") -> None: - if "speed" in value.keys(): + if "speed" in value: self.fan_update[str, str, float].emit("fan", "speed", value["speed"]) - if "rpm" in value.keys(): + if "rpm" in value: self.fan_update[str, str, int].emit("fan", "rpm", value["rpm"]) def _fan_generic_object_updated(self, value: dict, fan_name: str = "") -> None: _names = ["fan_generic", fan_name] object_name = " ".join(_names) - if "speed" in value.keys(): + if "speed" in value: self.fan_update[str, str, float].emit( object_name, "speed", value.get("speed") ) - if "rpm" in value.keys(): + if "rpm" in value: self.fan_update[str, str, int].emit(object_name, "rpm", value.get("rpm")) def _controller_fan_object_updated(self, value: dict, fan_name: str = "") -> None: _names = ["controller_fan", fan_name] object_name = " ".join(_names) - if "speed" in value.keys(): + if "speed" in value: self.fan_update[str, str, float].emit( object_name, "speed", value.get("speed") ) - elif "rpm" in value.keys(): + elif "rpm" in value: self.fan_update[str, str, int].emit(object_name, "rpm", value.get("rpm")) def _heater_fan_object_updated(self, value: dict, fan_name: str = "") -> None: @@ -556,20 +571,20 @@ def _heater_fan_object_updated(self, value: dict, fan_name: str = "") -> None: # object_name = " ".join(_names) def _z_tilt_object_updated(self, value: dict, name: str = "") -> None: - if value["applied"]: + if value.get("applied"): self.z_tilt_update[str, bool].emit("applied", value["applied"]) def _idle_timeout_object_updated( self, value: dict, name: str = "idle_timeout" ) -> None: - if "state" in value.keys(): + if "state" in value: self.idle_timeout_update[str, str].emit("state", value["state"]) if "printing" in value["state"]: self.printer_busy = True elif self.printing_state != "printing" and value["state"] != "printing": # It's also busy if the printer is printing or paused self.printer_busy = False - if "printing_time" in value.keys(): + if "printing_time" in value: self.idle_timeout_update[str, float].emit( "printing_time", value["printing_time"] ) @@ -577,11 +592,11 @@ def _idle_timeout_object_updated( def _virtual_sdcard_object_updated( self, values: dict, name: str = "virtual_sdcard" ) -> None: - if "progress" in values.keys(): + if "progress" in values: self.virtual_sdcard_update[str, float].emit("progress", values["progress"]) - if "is_active" in values.keys(): + if "is_active" in values: self.virtual_sdcard_update[str, bool].emit("is_active", values["is_active"]) - if "file_position" in values.keys(): + if "file_position" in values: self.virtual_sdcard_update[str, float].emit( "file_position", float(values["file_position"]) ) @@ -595,6 +610,8 @@ def send_print_event(self, event: str): Raises: TypeError: Thrown when QApplication is None """ + if not event: + return _print_state_upper = event[0].upper() _print_state_call = f"{_print_state_upper}{event[1:]}" if hasattr(events, f"Print{_print_state_call}"): @@ -603,15 +620,16 @@ def send_print_event(self, event: str): _print_state_call, f"Print{_print_state_call}", ) - _event_callback: QtCore.QEvent = getattr( - events, f"Print{_print_state_call}" - ) + _event_callback = getattr(events, f"Print{_print_state_call}") if callable(_event_callback): try: instance = QtWidgets.QApplication.instance() - if instance: - instance.postEvent(self.window(), _event_callback) - else: + # Printer is a QObject, not QWidget — use parent() + # to reach the MainWindow (which has the event handler). + target = self.parent() + if instance and target: + instance.postEvent(target, _event_callback()) + elif not instance: raise TypeError("QApplication.instance expected non None value") except Exception as e: logger.info( @@ -621,24 +639,24 @@ def send_print_event(self, event: str): def _print_stats_object_updated( self, values: dict, name: str = "print_stats" ) -> None: - if "filename" in values.keys(): + if "filename" in values: self.print_stats_update[str, str].emit("filename", values["filename"]) self.print_file_loaded = True - if "total_duration" in values.keys(): + if "total_duration" in values: self.print_stats_update[str, float].emit( "total_duration", values["total_duration"] ) - if "print_duration" in values.keys(): + if "print_duration" in values: self.print_stats_update[str, float].emit( "print_duration", values["print_duration"] ) - if "filament_used" in values.keys(): + if "filament_used" in values: self.print_stats_update[str, float].emit( "filament_used", values["filament_used"] ) - if "state" in values.keys(): + if "state" in values: self.print_stats_update[str, str].emit("state", values["state"]) - self.printing_state = values.get("state", None) + self.printing_state = values.get("state") or "" if not self.printing_state: return self.send_print_event(self.printing_state) @@ -649,33 +667,33 @@ def _print_stats_object_updated( self.print_file_loaded = True if values["state"] == "printing" or values["state"] == "pause": self.printing = True - if "message" in values.keys(): + if "message" in values: self.print_stats_update[str, str].emit("message", values["message"]) - if "info" in values.keys(): + if "info" in values: self.print_stats_update[str, dict].emit("info", values["info"]) def _display_status_object_updated( self, values: dict, name: str = "display_status" ) -> None: - if "message" in values.keys(): + if "message" in values: self.display_update[str, str].emit("message", values["message"]) - if "progress" in values.keys(): + if "progress" in values: self.display_update[str, float].emit("progress", values["progress"]) def _temperature_sensor_object_updated( self, values: dict, temperature_sensor_name: str ) -> None: - if "temperature" in values.keys(): + if "temperature" in values: self.sensor_update.emit( temperature_sensor_name, "temperature", values["temperature"] ) - if "measured_min_temp" in values.keys(): + if "measured_min_temp" in values: self.sensor_update.emit( temperature_sensor_name, "measured_min_temp", values["measured_min_temp"], ) - if "measured_max_temp" in values.keys(): + if "measured_max_temp" in values: self.sensor_update.emit( temperature_sensor_name, "measured_max_temp", @@ -701,19 +719,19 @@ def _temperature_fan_object_updated( ) -> None: _names = ["temperature_fan", temperature_fan_name] object_name = " ".join(_names) - if "speed" in values.keys(): + if "speed" in values: self.temperature_fan_update.emit( object_name, "speed", values["speed"], ) - if "temperature" in values.keys(): + if "temperature" in values: self.temperature_fan_update.emit( object_name, "temperature", values["temperature"], ) - if "target" in values.keys(): + if "target" in values: self.temperature_fan_update.emit( object_name, "target", @@ -723,14 +741,14 @@ def _temperature_fan_object_updated( def _filament_switch_sensor_object_updated( self, values: dict, filament_switch_name: str ) -> None: - if "filament_detected" in values.keys(): + if "filament_detected" in values: self.filament_switch_sensor_update.emit( filament_switch_name, "filament_detected", values["filament_detected"], ) self.available_filament_sensors.update({f"{filament_switch_name}": values}) - if "enabled" in values.keys(): + if "enabled" in values: self.filament_switch_sensor_update.emit( filament_switch_name, "enabled", values["enabled"] ) @@ -739,7 +757,7 @@ def _filament_switch_sensor_object_updated( def _filament_motion_sensor_object_updated( self, values: dict, filament_motion_name: str ) -> None: - if "filament_detected" in values.keys(): + if "filament_detected" in values: self.filament_motion_sensor_update.emit( filament_motion_name, "filament_detected", @@ -749,18 +767,18 @@ def _filament_motion_sensor_object_updated( {f"{filament_motion_name}": values["filament_detected"]} ) - if "enabled" in values.keys(): + if "enabled" in values: self.filament_motion_sensor_update.emit( filament_motion_name, "enabled", values["enabled"] ) self.available_filament_sensors.update({f"{filament_motion_name}": values}) def _cutter_sensor_object_updated(self, values: dict, cutter_name: str) -> None: - if "filament_detected" in values.keys(): + if "filament_detected" in values: self.filament_switch_sensor_update.emit( cutter_name, "filament_detected", values["filament_detected"] ) - if "enabled" in values.keys(): + if "enabled" in values: self.filament_switch_sensor_update.emit( cutter_name, "enabled", values["enabled"] ) @@ -768,7 +786,7 @@ def _cutter_sensor_object_updated(self, values: dict, cutter_name: str) -> None: self.available_filament_sensors.update({f"{cutter_name}": values}) def _output_pin_object_updated(self, values: dict, output_pin_name: str) -> None: - if "value" in values.keys(): + if "value" in values: self.output_pin_update.emit(output_pin_name, "value", values["value"]) def _bed_mesh_object_updated(self, values: dict, name: str = "bed_mesh") -> None: @@ -785,17 +803,17 @@ def _configfile_object_updated( self, values: dict, name: str = "configfile" ) -> None: self.configfile.update(values) - if "config" in values.keys(): + if "config" in values: self.printer_config.emit(values["config"]) - if "settings" in values.keys(): + if "settings" in values: # TODO ... - if "save_config_pending" in values.keys(): + if "save_config_pending" in values: self.save_config_pending.emit() - if "save_config_pending_items" in values.keys(): + if "save_config_pending_items" in values: # TODO ... - if "warnings" in values.keys(): + if "warnings" in values: # TODO ... diff --git a/BlocksScreen/screensaver.py b/BlocksScreen/screensaver.py index de02ba02..20cda0cf 100644 --- a/BlocksScreen/screensaver.py +++ b/BlocksScreen/screensaver.py @@ -3,15 +3,22 @@ class ScreenSaver(QtCore.QObject): + """Screensaver that uses X11 DPMS to blank the display after inactivity.""" + timer = QtCore.QTimer() - dpms_off_timeout = helper_methods.get_dpms_timeouts().get("off_timeout") - dpms_suspend_timeout = helper_methods.get_dpms_timeouts().get("suspend_timeout") - dpms_standby_timeout = helper_methods.get_dpms_timeouts().get("standby_timeout") touch_blocked: bool = False + _dpms_available: bool = hasattr(helper_methods, "get_dpms_timeouts") def __init__(self, parent) -> None: super().__init__() + dpms_timeouts = ( + helper_methods.get_dpms_timeouts() if self._dpms_available else {} + ) + self.dpms_off_timeout = dpms_timeouts.get("off_timeout") + self.dpms_suspend_timeout = dpms_timeouts.get("suspend_timeout") + self.dpms_standby_timeout = dpms_timeouts.get("standby_timeout") + self.screensaver_config = parent.config.get_section( "screensaver", fallback=None ) @@ -29,9 +36,11 @@ def __init__(self, parent) -> None: self.timer.start() def eventFilter(self, object, event) -> bool: - """Filter touch events considering DPMS Screen state""" + """Filter touch events considering DPMS screen state.""" + if not self._dpms_available: + return False - if event.type() in ( # Block Touch Filter and Wake Touch Filter + if event.type() in ( QtCore.QEvent.Type.TouchBegin, QtCore.QEvent.Type.TouchUpdate, QtCore.QEvent.Type.TouchEnd, @@ -52,14 +61,15 @@ def eventFilter(self, object, event) -> bool: self.touch_blocked = False helper_methods.set_dpms_mode(helper_methods.DPMSState.ON) self.timer.start() - return True # filter out the event, block touch events on the application + return True else: self.timer.stop() self.timer.start() return False def check_dpms(self) -> None: - """Checks the X11 extension dpms for the status of the screen""" + """Blank the display via DPMS standby.""" self.touch_blocked = True - helper_methods.set_dpms_mode(helper_methods.DPMSState.STANDBY) + if self._dpms_available: + helper_methods.set_dpms_mode(helper_methods.DPMSState.STANDBY) self.timer.stop() diff --git a/tests/network/test_network_ui.py b/tests/network/test_network_ui.py index 466ec14a..a60a330a 100644 --- a/tests/network/test_network_ui.py +++ b/tests/network/test_network_ui.py @@ -693,7 +693,7 @@ def test_transient_mismatch_retries(self, win, qapp): message="not compatible with device", error_code="nm_error", ) - with patch("BlocksScreen.lib.panels.networkWindow.QTimer") as mock_timer: + with patch("BlocksScreen.lib.panels.networkWindow.QtCore.QTimer") as mock_timer: w._on_operation_complete(result) mock_timer.singleShot.assert_called_once() # Loading should still be visible — retry is pending @@ -740,7 +740,7 @@ def test_wifi_on_with_saved_networks_starts_connect(self, win): ) ] nm.saved_networks = saved - with patch("BlocksScreen.lib.panels.networkWindow.QTimer") as mock_timer: + with patch("BlocksScreen.lib.panels.networkWindow.QtCore.QTimer") as mock_timer: w._handle_wifi_toggle(True) mock_timer.singleShot.assert_called() assert w._pending_operation == PendingOperation.WIFI_ON