From 2ec05916b61b66cf80c828e4c85458911880e18b Mon Sep 17 00:00:00 2001 From: ishiharas <43359029+ishiharas@users.noreply.github.com> Date: Mon, 1 Jun 2026 14:09:48 +0200 Subject: [PATCH] Add custom map overlay support --- .../roborock_custom_map/image.py | 154 ++++++++++++++++-- 1 file changed, 142 insertions(+), 12 deletions(-) diff --git a/custom_components/roborock_custom_map/image.py b/custom_components/roborock_custom_map/image.py index 051dd0c..e9360ff 100644 --- a/custom_components/roborock_custom_map/image.py +++ b/custom_components/roborock_custom_map/image.py @@ -2,11 +2,12 @@ from __future__ import annotations -from datetime import datetime +from datetime import datetime, timezone import io import logging +import os -from PIL import Image, UnidentifiedImageError +from PIL import Image from roborock.devices.traits.v1.home import HomeTrait from roborock.devices.traits.v1.map_content import MapContent @@ -121,12 +122,56 @@ def __init__( self._raw_image_size: tuple[int, int] | None = None self._attr_entity_category = EntityCategory.DIAGNOSTIC + self._reload_time = dt_util.utcnow() @property def is_selected(self) -> bool: """Return if this map is the currently selected map.""" return self.map_flag == self.coordinator.properties_api.maps.current_map + @property + def image_last_updated(self) -> datetime | None: + """Return the time the image was last updated, dynamically busting caches.""" + base_dt = self._reload_time + coord_dt = self.coordinator.last_home_update + if coord_dt is not None: + base_dt = max(base_dt, coord_dt) + + # Lightly scan custom image paths to find the current mtime dynamically + try: + for path in self._get_candidate_paths(): + if os.path.isfile(path): + mtime = os.path.getmtime(path) + custom_dt = datetime.fromtimestamp(mtime, tz=timezone.utc) + return max(base_dt, custom_dt) + except Exception: + pass + + return base_dt + + def _get_candidate_paths(self) -> list[str]: + """Return the list of candidate paths for the custom map image.""" + config_dir = self.hass.config.config_dir + search_dirs = [ + os.path.join(config_dir, "www"), + os.path.join(config_dir, "media"), + "/media", + config_dir, + ] + basenames = [ + f"roborock_custom_map_{self.map_flag}_hide_rugs", + f"roborock_custom_map_{self.map_flag}", + "roborock_custom_map_hide_rugs", + "roborock_custom_map", + ] + extensions = [".webp", ".png"] + return [ + os.path.join(sdir, f"{base}{ext}") + for sdir in search_dirs + for base in basenames + for ext in extensions + ] + @property def _map_content(self) -> MapContent | None: if self._home_trait.home_map_content and ( @@ -150,6 +195,11 @@ async def async_added_to_hass(self) -> None: ) ) + # Populate initial map info if already loaded + if (map_content := self._map_content) is not None: + self.cached_map = map_content.image_content + self._raw_image_size = _png_dimensions(self.cached_map) + self.async_write_ha_state() def _handle_rotation_changed(self) -> None: @@ -169,10 +219,93 @@ def _handle_coordinator_update(self) -> None: super()._handle_coordinator_update() - def _rotate_image(self, raw: bytes, rotation: int) -> bytes: - """Rotate image in executor thread.""" + def _load_custom_image(self, target_size: tuple[int, int]) -> tuple[Image.Image, str] | tuple[None, None]: + """Find, load, and resize a custom map image from the filesystem on demand.""" + paths = self._get_candidate_paths() + + for path in paths: + if os.path.isfile(path): + try: + _LOGGER.info("Loading custom map background from %s", path) + img = Image.open(path) + img.load() + img = img.convert("RGBA") + if img.size != target_size: + img = img.resize(target_size, Image.Resampling.LANCZOS) + return img, path + except Exception as err: + _LOGGER.error("Failed to load custom map image from %s: %s", path, err) + + return None, None + + def _remove_carpet_pattern(self, img: Image.Image) -> Image.Image: + """Filter out the grey and colored carpet checkerboard patterns from the foreground map.""" + try: + img = img.convert("RGBA") + import numpy as np + img_arr = np.array(img) + r = img_arr[:, :, 0].astype(int) + g = img_arr[:, :, 1].astype(int) + b = img_arr[:, :, 2].astype(int) + a = img_arr[:, :, 3].astype(int) + + # Grey condition: R, G, B components are very close to each other + max_val = np.maximum(np.maximum(r, g), b) + min_val = np.minimum(np.minimum(r, g), b) + is_grey = (max_val - min_val < 15) + + # Green carpet condition: (169, 247, 169) + is_green_carpet = (np.abs(r - 169) < 15) & (np.abs(g - 247) < 15) & (np.abs(b - 169) < 15) + + # Sage/teal carpet condition: (101, 181, 170) + is_sage_carpet = (np.abs(r - 101) < 15) & (np.abs(g - 181) < 15) & (np.abs(b - 170) < 15) + + # Combine all carpet detections + is_carpet = is_grey | is_green_carpet | is_sage_carpet + + # Protect pure white outlines/paths + is_not_white = (r < 240) | (g < 240) | (b < 240) + + # Protect dark black outlines + is_not_black = (r > 40) | (g > 40) | (b > 40) + + # Make carpet pattern pixels transparent + carpet_mask = is_carpet & is_not_white & is_not_black & (a > 0) + img_arr[carpet_mask] = [0, 0, 0, 0] + return Image.fromarray(img_arr) + except Exception as err: + _LOGGER.error("Error filtering carpet pattern: %s", err) + return img + + def _process_image(self, raw: bytes, rotation: int) -> bytes: + """Overlay custom map image and rotate, stateless on demand.""" img = Image.open(io.BytesIO(raw)) - img = img.rotate(rotation, expand=True) + target_size = img.size + + custom_img, custom_path = self._load_custom_image(target_size) + + # Check if we should filter out carpets based on the file name indicator + filter_carpet = False + if custom_path: + filename = os.path.basename(custom_path).lower() + name_part, _ = os.path.splitext(filename) + if name_part.endswith("_hide_rugs"): + filter_carpet = True + + if custom_img is not None: + try: + if filter_carpet: + img = self._remove_carpet_pattern(img) + else: + img = img.convert("RGBA") + + # Simple and fast alpha composite overlay! + img = Image.alpha_composite(custom_img, img) + except Exception as err: + _LOGGER.error("Error overlaying custom map image: %s", err) + + if rotation != 0: + img = img.rotate(rotation, expand=True) out = io.BytesIO() img.save(out, format="PNG") @@ -199,23 +332,20 @@ def _get_rotation(self) -> int: return rotation async def async_image(self) -> bytes | None: - """Get the image (with optional rotation).""" + """Get the image (with optional rotation and custom overlay).""" if (map_content := self._map_content) is None: raise HomeAssistantError("Map flag not found in coordinator maps") raw = map_content.image_content rotation = self._get_rotation() - if rotation == DEFAULT_MAP_ROTATION: - return raw - try: return await self.hass.async_add_executor_job( - self._rotate_image, raw, rotation + self._process_image, raw, rotation ) - except (OSError, UnidentifiedImageError) as err: + except Exception as err: _LOGGER.debug( - "Failed to rotate Roborock map image: %s, returning original image", + "Failed to process Roborock map image: %s, returning original image", err, ) return raw