Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 142 additions & 12 deletions custom_components/roborock_custom_map/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

from __future__ import annotations

from datetime import datetime
from datetime import datetime, timezone
import io
import logging
import os

from PIL import Image, UnidentifiedImageError
from PIL import Image
from roborock.devices.traits.v1.home import HomeTrait
from roborock.devices.traits.v1.map_content import MapContent

Expand Down Expand Up @@ -122,12 +123,56 @@ def __init__(
self._raw_image_size: tuple[int, int] | None = None

self._attr_entity_category = EntityCategory.DIAGNOSTIC
self._reload_time = dt_util.utcnow()

@property
def is_selected(self) -> bool:
"""Return if this map is the currently selected map."""
return self.map_flag == self.coordinator.properties_api.maps.current_map

@property
def image_last_updated(self) -> datetime | None:
"""Return the time the image was last updated, dynamically busting caches."""
base_dt = self._reload_time
coord_dt = self.coordinator.last_home_update
if coord_dt is not None:
base_dt = max(base_dt, coord_dt)
Comment on lines +134 to +139

# Lightly scan custom image paths to find the current mtime dynamically
try:
for path in self._get_candidate_paths():
if os.path.isfile(path):
mtime = os.path.getmtime(path)
custom_dt = datetime.fromtimestamp(mtime, tz=timezone.utc)
return max(base_dt, custom_dt)
except Exception:
pass
Comment on lines +141 to +149

return base_dt
Comment on lines +133 to +151

def _get_candidate_paths(self) -> list[str]:
"""Return the list of candidate paths for the custom map image."""
config_dir = self.hass.config.config_dir
search_dirs = [
os.path.join(config_dir, "www"),
os.path.join(config_dir, "media"),
"/media",
config_dir,
]
basenames = [
f"roborock_custom_map_{self.map_flag}_hide_rugs",
f"roborock_custom_map_{self.map_flag}",
"roborock_custom_map_hide_rugs",
"roborock_custom_map",
]
extensions = [".webp", ".png"]
return [
os.path.join(sdir, f"{base}{ext}")
for sdir in search_dirs
for base in basenames
for ext in extensions
]

@property
def _map_content(self) -> MapContent | None:
if self._home_trait.home_map_content and (
Expand All @@ -151,6 +196,11 @@ async def async_added_to_hass(self) -> None:
)
)

# Populate initial map info if already loaded
if (map_content := self._map_content) is not None:
self.cached_map = map_content.image_content
self._raw_image_size = _png_dimensions(self.cached_map)

self.async_write_ha_state()

def _handle_rotation_changed(self) -> None:
Expand All @@ -176,10 +226,93 @@ def _handle_coordinator_update(self) -> None:

super()._handle_coordinator_update()

def _rotate_image(self, raw: bytes, rotation: int) -> bytes:
"""Rotate image in executor thread."""
def _load_custom_image(self, target_size: tuple[int, int]) -> tuple[Image.Image, str] | tuple[None, None]:
"""Find, load, and resize a custom map image from the filesystem on demand."""
paths = self._get_candidate_paths()

for path in paths:
if os.path.isfile(path):
try:
_LOGGER.info("Loading custom map background from %s", path)
img = Image.open(path)
img.load()
img = img.convert("RGBA")
if img.size != target_size:
img = img.resize(target_size, Image.Resampling.LANCZOS)
return img, path
Comment on lines +237 to +242
except Exception as err:
_LOGGER.error("Failed to load custom map image from %s: %s", path, err)

return None, None

def _remove_carpet_pattern(self, img: Image.Image) -> Image.Image:
"""Filter out the grey and colored carpet checkerboard patterns from the foreground map."""
try:
img = img.convert("RGBA")
import numpy as np
img_arr = np.array(img)
Comment on lines +251 to +253
r = img_arr[:, :, 0].astype(int)
g = img_arr[:, :, 1].astype(int)
b = img_arr[:, :, 2].astype(int)
a = img_arr[:, :, 3].astype(int)

# Grey condition: R, G, B components are very close to each other
max_val = np.maximum(np.maximum(r, g), b)
min_val = np.minimum(np.minimum(r, g), b)
is_grey = (max_val - min_val < 15)

# Green carpet condition: (169, 247, 169)
is_green_carpet = (np.abs(r - 169) < 15) & (np.abs(g - 247) < 15) & (np.abs(b - 169) < 15)

# Sage/teal carpet condition: (101, 181, 170)
is_sage_carpet = (np.abs(r - 101) < 15) & (np.abs(g - 181) < 15) & (np.abs(b - 170) < 15)

# Combine all carpet detections
is_carpet = is_grey | is_green_carpet | is_sage_carpet

# Protect pure white outlines/paths
is_not_white = (r < 240) | (g < 240) | (b < 240)

# Protect dark black outlines
is_not_black = (r > 40) | (g > 40) | (b > 40)

# Make carpet pattern pixels transparent
carpet_mask = is_carpet & is_not_white & is_not_black & (a > 0)
img_arr[carpet_mask] = [0, 0, 0, 0]
return Image.fromarray(img_arr)
except Exception as err:
_LOGGER.error("Error filtering carpet pattern: %s", err)
return img
Comment on lines +283 to +285

def _process_image(self, raw: bytes, rotation: int) -> bytes:
"""Overlay custom map image and rotate, stateless on demand."""
img = Image.open(io.BytesIO(raw))
img = img.rotate(rotation, expand=True)
target_size = img.size

custom_img, custom_path = self._load_custom_image(target_size)

# Check if we should filter out carpets based on the file name indicator
filter_carpet = False
if custom_path:
filename = os.path.basename(custom_path).lower()
name_part, _ = os.path.splitext(filename)
if name_part.endswith("_hide_rugs"):
filter_carpet = True

if custom_img is not None:
try:
if filter_carpet:
img = self._remove_carpet_pattern(img)
else:
img = img.convert("RGBA")

# Simple and fast alpha composite overlay!
img = Image.alpha_composite(custom_img, img)
except Exception as err:
_LOGGER.error("Error overlaying custom map image: %s", err)

if rotation != 0:
img = img.rotate(rotation, expand=True)

out = io.BytesIO()
img.save(out, format="PNG")
Expand All @@ -206,23 +339,20 @@ def _get_rotation(self) -> int:
return rotation

async def async_image(self) -> bytes | None:
"""Get the image (with optional rotation)."""
"""Get the image (with optional rotation and custom overlay)."""
if (map_content := self._map_content) is None:
raise HomeAssistantError("Map flag not found in coordinator maps")

raw = map_content.image_content
rotation = self._get_rotation()

if rotation == DEFAULT_MAP_ROTATION:
return raw

try:
return await self.hass.async_add_executor_job(
self._rotate_image, raw, rotation
self._process_image, raw, rotation
)
Comment on lines 349 to 352
Comment on lines 346 to 352
except (OSError, UnidentifiedImageError) as err:
except Exception as err:
_LOGGER.debug(
"Failed to rotate Roborock map image: %s, returning original image",
"Failed to process Roborock map image: %s, returning original image",
err,
)
return raw
Comment on lines +353 to 358
Comment on lines +353 to 358
Expand Down