diff --git a/docs/api/pylabrobot.capabilities.rst b/docs/api/pylabrobot.capabilities.rst
index 666ce292e20..99be84cc2da 100644
--- a/docs/api/pylabrobot.capabilities.rst
+++ b/docs/api/pylabrobot.capabilities.rst
@@ -192,6 +192,38 @@ Barcode Scanning
BarcodeScannerBackend
+Rack Reading
+------------
+
+.. currentmodule:: pylabrobot.capabilities.rack_reading.rack_reader
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ RackReader
+
+.. currentmodule:: pylabrobot.capabilities.rack_reading.backend
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ RackReaderBackend
+
+.. currentmodule:: pylabrobot.capabilities.rack_reading.standard
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ RackScanEntry
+ RackScanResult
+
+
Microscopy
----------
diff --git a/docs/api/pylabrobot.micronic.rst b/docs/api/pylabrobot.micronic.rst
new file mode 100644
index 00000000000..712b80f98aa
--- /dev/null
+++ b/docs/api/pylabrobot.micronic.rst
@@ -0,0 +1,68 @@
+.. currentmodule:: pylabrobot.micronic
+
+pylabrobot.micronic package
+===========================
+
+Micronic integrations built on the rack-reading capability.
+
+Device
+------
+
+.. currentmodule:: pylabrobot.micronic.code_reader.code_reader
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ MicronicCodeReader
+
+
+Driver
+------
+
+.. currentmodule:: pylabrobot.micronic.code_reader.driver
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ MicronicCodeReaderDriver
+
+.. currentmodule:: pylabrobot.micronic.code_reader.errors
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ MicronicError
+
+
+Scanners
+--------
+
+.. currentmodule:: pylabrobot.micronic.code_reader.scanner
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ Scanner
+ TwainScanner
+ SaneScanner
+
+
+Capabilities
+------------
+
+.. currentmodule:: pylabrobot.micronic.code_reader.rack_reading_backend
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ MicronicCodeReaderRackReadingBackend
diff --git a/docs/api/pylabrobot.rst b/docs/api/pylabrobot.rst
index 53a04635937..5f46dd6be3f 100644
--- a/docs/api/pylabrobot.rst
+++ b/docs/api/pylabrobot.rst
@@ -41,6 +41,7 @@ Manufacturers
pylabrobot.inheco
pylabrobot.liconic
pylabrobot.mettler_toledo
+ pylabrobot.micronic
pylabrobot.molecular_devices
pylabrobot.opentrons
pylabrobot.qinstruments
diff --git a/docs/user_guide/capabilities/index.md b/docs/user_guide/capabilities/index.md
index 2a091c83f64..fb4e0f7ddad 100644
--- a/docs/user_guide/capabilities/index.md
+++ b/docs/user_guide/capabilities/index.md
@@ -55,6 +55,7 @@ loading-tray
pumping
weighing
barcode-scanning
+rack-reading
microscopy
automated-retrieval
absorbance
diff --git a/docs/user_guide/capabilities/rack-reading.ipynb b/docs/user_guide/capabilities/rack-reading.ipynb
new file mode 100644
index 00000000000..66b0d0323ae
--- /dev/null
+++ b/docs/user_guide/capabilities/rack-reading.ipynb
@@ -0,0 +1,107 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Rack Reading\n",
+ "\n",
+ "The `rack_reading` capability standardizes rack-scale code readers that decode a tube rack and\n",
+ "return structured per-position scan results.\n",
+ "\n",
+ "Unlike one-at-a-time code reads, rack reading is job-oriented and returns the full decoded rack map.\n",
+ "\n",
+ "## Walkthrough"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "metadata": {},
+ "source": [
+ "from pylabrobot.capabilities.rack_reading import RackReader\n",
+ "from pylabrobot.capabilities.rack_reading.chatterbox import RackReaderChatterboxBackend\n",
+ "from pylabrobot.resources import ResourceHolder, TubeRack, create_ordered_items_2d\n",
+ "\n",
+ "reader = RackReader(backend=RackReaderChatterboxBackend())\n",
+ "await reader._on_setup()"
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "metadata": {},
+ "source": [
+ "rack = TubeRack(\n",
+ " name=\"rack\",\n",
+ " size_x=85.0,\n",
+ " size_y=127.0,\n",
+ " size_z=20.0,\n",
+ " ordered_items=create_ordered_items_2d(\n",
+ " ResourceHolder,\n",
+ " num_items_x=12,\n",
+ " num_items_y=8,\n",
+ " dx=0,\n",
+ " dy=0,\n",
+ " dz=0,\n",
+ " item_dx=9.0,\n",
+ " item_dy=9.0,\n",
+ " size_x=9.0,\n",
+ " size_y=9.0,\n",
+ " size_z=20.0,\n",
+ " ),\n",
+ ")\n",
+ "\n",
+ "result = await reader.scan_rack(rack=rack, timeout=60.0, poll_interval=1.0)\n",
+ "print(result.rack_id)\n",
+ "print(result.entries[0].position, result.entries[0].tube_id)"
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "metadata": {},
+ "source": [
+ "rack_id = await reader.scan_rack_id(timeout=60.0, poll_interval=1.0)\n",
+ "print(rack_id)"
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Public API\n",
+ "\n",
+ "- `scan_rack(rack, timeout, poll_interval)` — scan a `TubeRack` and return a `RackScanResult`. The\n",
+ " backend validates that the rack shape matches what the hardware supports.\n",
+ "- `scan_rack_id(timeout, poll_interval)` — read just the rack barcode (no per-position decoding)\n",
+ " and return the rack identifier.\n",
+ "\n",
+ "## Supported hardware\n",
+ "\n",
+ "```{supported-devices} rack reading\n",
+ "```\n",
+ "\n",
+ "## API reference\n",
+ "\n",
+ "See {class}`~pylabrobot.capabilities.rack_reading.rack_reader.RackReader` and {class}`~pylabrobot.capabilities.rack_reading.backend.RackReaderBackend`."
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "name": "python",
+ "version": "3.12.0"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/docs/user_guide/index.md b/docs/user_guide/index.md
index bb273f350cf..3f1ad03ea17 100644
--- a/docs/user_guide/index.md
+++ b/docs/user_guide/index.md
@@ -40,6 +40,7 @@ inheco/index
liconic/index
mettler_toledo/index
molecular_devices/index
+micronic/index
opentrons/index
qinstruments/index
thermo_fisher/index
diff --git a/docs/user_guide/machines.md b/docs/user_guide/machines.md
index dc057d79b15..db5a5fbcebb 100644
--- a/docs/user_guide/machines.md
+++ b/docs/user_guide/machines.md
@@ -188,6 +188,12 @@ tr > td:nth-child(5) { width: 15%; }
|--------------|---------|-------------|--------|
| Mettler Toledo | WXS205SDU | Full | [PLR](02_analytical/scales/mettler-toledo-WXS205SDU.ipynb) / [OEM](https://www.mt.com/us/en/home/products/Industrial_Weighing_Solutions/high-precision-weigh-sensors/weigh-module-wxs205sdu-15-11121008.html) |
+### Rack Readers
+
+| Manufacturer | Machine | Features | PLR-Support | Links |
+|--------------|---------|----------|-------------|--------|
+| Micronic | Direct local scanner + serial control | rack reading | Basics | [PLR](https://docs.pylabrobot.org/user_guide/micronic/index.html) / [OEM](https://www.micronic.com/products/code-reader/) |
+
---
## Understanding the Tables
diff --git a/docs/user_guide/micronic/code_reader/hello-world.ipynb b/docs/user_guide/micronic/code_reader/hello-world.ipynb
new file mode 100644
index 00000000000..314f1cb2da7
--- /dev/null
+++ b/docs/user_guide/micronic/code_reader/hello-world.ipynb
@@ -0,0 +1,46 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "# Micronic\n\nPyLabRobot includes `v1b1` Micronic integrations built on the generic\n`rack_reading` capability.\n\n`MicronicCodeReader` controls the local hardware directly. The caller picks a\n`Scanner` to acquire the rack image (`TwainScanner` on Windows, `SaneScanner` on\nLinux, or a custom `Scanner` subclass for any other acquisition stack). The driver\nhandles image acquisition and side-rack serial barcode reads; the rack-reading\nbackend decodes tube DataMatrix codes locally and returns a standard\n`RackScanResult`. It does not call Micronic Code Reader or IO Monitor, and\nPyLabRobot does not package any scanner helper executable.\n\n## Supported operations\n\nRack reading (large scanner that decodes 96 tubes plus the side rack barcode):\n\n- `rack_reading.scan_rack(rack)` to trigger image acquisition, decode all 96 tube\n positions, read the side rack barcode, and return a `RackScanResult`. The device\n currently supports 8x12 racks; passing a different shape raises `MicronicError`.\n- `rack_reading.scan_rack_id()` for a rack-barcode-only read on the side reader\n\n## Hardware example\n\nThe operator is responsible for installing any OS-level scanner bridge\n(`twain_scan`, `scanimage`, or custom scanner integration), the PLR serial extra\n(`pylabrobot[serial]`), and the local Python decode dependencies in the runtime\nenvironment."
+ },
+ {
+ "cell_type": "code",
+ "metadata": {},
+ "source": "from pylabrobot.micronic import MicronicCodeReader, TwainScanner\nfrom pylabrobot.resources import ResourceHolder, TubeRack, create_ordered_items_2d\n\nrack = TubeRack(\n name=\"micronic_96_well_rack\",\n size_x=85.0,\n size_y=127.0,\n size_z=20.0,\n ordered_items=create_ordered_items_2d(\n ResourceHolder,\n num_items_x=12,\n num_items_y=8,\n dx=0,\n dy=0,\n dz=0,\n item_dx=9.0,\n item_dy=9.0,\n size_x=9.0,\n size_y=9.0,\n size_z=20.0,\n ),\n)\n\nreader = MicronicCodeReader(\n scanner=TwainScanner(\n twain_scanner_path=r\"C:\\Tools\\twain_scan.exe\",\n twain_source=\"AVA6PlusG\",\n ),\n serial_port=\"COM4\",\n image_dir=r\"C:\\ProgramData\\PyLabRobot\\micronic-images\",\n keep_images=True,\n)\nawait reader.setup()\n\ntry:\n rack_result = await reader.rack_reading.scan_rack(\n rack=rack, timeout=90.0, poll_interval=1.0\n )\n print(rack_result.rack_id)\n print(len([entry for entry in rack_result.entries if entry.tube_id]))\n\n rack_id = await reader.rack_reading.scan_rack_id(timeout=5.0, poll_interval=0.5)\n print(rack_id)\nfinally:\n await reader.stop()",
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "On Ubuntu/Linux, use `SaneScanner` if the scanner is exposed by a SANE backend:"
+ },
+ {
+ "cell_type": "code",
+ "metadata": {},
+ "source": "from pylabrobot.micronic import MicronicCodeReader, SaneScanner\n\nreader = MicronicCodeReader(\n scanner=SaneScanner(sane_device=\"avision:libusb:001:004\"),\n serial_port=\"/dev/ttyUSB0\",\n)",
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "For any other acquisition stack, subclass `Scanner` and implement `acquire` to\nwrite a rack image to the given path.\n\n## Notes\n\n- `scan_rack` reads the rack ID and every tube barcode, so it typically takes\n tens of seconds. `scan_rack_id` only reads the rack barcode and completes in a\n few seconds.\n- TWAIN is a Windows scanner-driver API. PyLabRobot does not ship a TWAIN\n bridge binary and does not install one for you; pass `twain_scanner_path` to\n `TwainScanner`, set `MICRONIC_TWAIN_SCANNER_PATH`, or put a local helper\n named `twain_scan` / `twain_scan.exe` on `PATH`.\n- Ubuntu/Linux scanner control uses SANE `scanimage` (via `SaneScanner`).\n PyLabRobot does not install SANE or vendor scanner drivers. Rack-ID reads use\n `pylabrobot.io.Serial`, which is installed through the `pylabrobot[serial]`\n extra.\n- Image decoding imports `pillow`, `opencv-python-headless`, `numpy`, and\n `zxing-cpp` at runtime. Install them in the environment that runs PyLabRobot."
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "name": "python",
+ "version": "3.12.0"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/docs/user_guide/micronic/index.md b/docs/user_guide/micronic/index.md
new file mode 100644
index 00000000000..b1a7a4a55c7
--- /dev/null
+++ b/docs/user_guide/micronic/index.md
@@ -0,0 +1,7 @@
+# Micronic
+
+```{toctree}
+:maxdepth: 1
+
+code_reader/hello-world
+```
diff --git a/pylabrobot/capabilities/__init__.py b/pylabrobot/capabilities/__init__.py
index ddf84ebd973..75c3c674762 100644
--- a/pylabrobot/capabilities/__init__.py
+++ b/pylabrobot/capabilities/__init__.py
@@ -1 +1,7 @@
from .capability import Capability, CapabilityBackend, need_capability_ready
+from .rack_reading import (
+ RackReader,
+ RackReaderBackend,
+ RackScanEntry,
+ RackScanResult,
+)
diff --git a/pylabrobot/capabilities/rack_reading/__init__.py b/pylabrobot/capabilities/rack_reading/__init__.py
new file mode 100644
index 00000000000..9ef4c9936fa
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/__init__.py
@@ -0,0 +1,4 @@
+from .backend import RackReaderBackend
+from .chatterbox import RackReaderChatterboxBackend
+from .rack_reader import RackReader
+from .standard import RackScanEntry, RackScanResult
diff --git a/pylabrobot/capabilities/rack_reading/backend.py b/pylabrobot/capabilities/rack_reading/backend.py
new file mode 100644
index 00000000000..d8657504443
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/backend.py
@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+from abc import ABCMeta, abstractmethod
+
+from pylabrobot.capabilities.capability import CapabilityBackend
+from pylabrobot.resources.tube_rack import TubeRack
+
+from .standard import RackScanResult
+
+
+class RackReaderBackend(CapabilityBackend, metaclass=ABCMeta):
+ """Abstract backend for rack readers that decode position-indexed rack contents."""
+
+ @abstractmethod
+ async def scan_rack(self, rack: TubeRack, timeout: float, poll_interval: float) -> RackScanResult:
+ """Scan ``rack`` and return its decoded contents."""
+
+ @abstractmethod
+ async def scan_rack_id(self, timeout: float, poll_interval: float) -> str:
+ """Read the rack barcode only and return the rack identifier."""
diff --git a/pylabrobot/capabilities/rack_reading/chatterbox.py b/pylabrobot/capabilities/rack_reading/chatterbox.py
new file mode 100644
index 00000000000..f380ff667b9
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/chatterbox.py
@@ -0,0 +1,32 @@
+from __future__ import annotations
+
+from pylabrobot.resources.barcode import Barcode
+from pylabrobot.resources.tube_rack import TubeRack
+
+from .backend import RackReaderBackend
+from .standard import RackScanEntry, RackScanResult
+
+
+class RackReaderChatterboxBackend(RackReaderBackend):
+ """Device-free rack-reading backend for tests and examples."""
+
+ async def scan_rack(self, rack: TubeRack, timeout: float, poll_interval: float) -> RackScanResult:
+ del rack, timeout, poll_interval
+ return RackScanResult(
+ rack_id="CHATTERBOX",
+ entries=[
+ RackScanEntry(
+ position="A1",
+ tube_id="SIMULATED",
+ status="OK",
+ barcode=Barcode(data="SIMULATED", symbology="DataMatrix", position_on_resource="bottom"),
+ ),
+ ],
+ rack_barcode=Barcode(
+ data="CHATTERBOX", symbology="Code 128 (Subset B and C)", position_on_resource="right"
+ ),
+ )
+
+ async def scan_rack_id(self, timeout: float, poll_interval: float) -> str:
+ del timeout, poll_interval
+ return "CHATTERBOX"
diff --git a/pylabrobot/capabilities/rack_reading/rack_reader.py b/pylabrobot/capabilities/rack_reading/rack_reader.py
new file mode 100644
index 00000000000..0241e9aec56
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/rack_reader.py
@@ -0,0 +1,32 @@
+from __future__ import annotations
+
+from pylabrobot.capabilities.capability import Capability, need_capability_ready
+from pylabrobot.resources.tube_rack import TubeRack
+
+from .backend import RackReaderBackend
+from .standard import RackScanResult
+
+
+class RackReader(Capability):
+ """Rack-reading capability."""
+
+ def __init__(self, backend: RackReaderBackend):
+ super().__init__(backend=backend)
+ self.backend: RackReaderBackend = backend
+
+ @need_capability_ready
+ async def scan_rack(
+ self,
+ rack: TubeRack,
+ timeout: float = 60.0,
+ poll_interval: float = 1.0,
+ ) -> RackScanResult:
+ return await self.backend.scan_rack(rack=rack, timeout=timeout, poll_interval=poll_interval)
+
+ @need_capability_ready
+ async def scan_rack_id(
+ self,
+ timeout: float = 60.0,
+ poll_interval: float = 1.0,
+ ) -> str:
+ return await self.backend.scan_rack_id(timeout=timeout, poll_interval=poll_interval)
diff --git a/pylabrobot/capabilities/rack_reading/rack_reader_tests.py b/pylabrobot/capabilities/rack_reading/rack_reader_tests.py
new file mode 100644
index 00000000000..a05ffd51ffc
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/rack_reader_tests.py
@@ -0,0 +1,66 @@
+import unittest
+from unittest.mock import AsyncMock, MagicMock
+
+from pylabrobot.capabilities.rack_reading.rack_reader import RackReader
+from pylabrobot.capabilities.rack_reading.standard import RackScanEntry, RackScanResult
+from pylabrobot.resources.barcode import Barcode
+from pylabrobot.resources.tube_rack import TubeRack
+
+SCAN_RESULT = RackScanResult(
+ rack_id="5500135415",
+ entries=[
+ RackScanEntry(
+ position="A1",
+ tube_id="7518613629",
+ status="OK",
+ barcode=Barcode(data="7518613629", symbology="DataMatrix", position_on_resource="bottom"),
+ ),
+ ],
+ rack_barcode=Barcode(
+ data="5500135415", symbology="Code 128 (Subset B and C)", position_on_resource="right"
+ ),
+)
+
+
+def _make_backend() -> MagicMock:
+ backend = MagicMock()
+ backend.scan_rack = AsyncMock(return_value=SCAN_RESULT)
+ backend.scan_rack_id = AsyncMock(return_value=SCAN_RESULT.rack_id)
+ backend._on_setup = AsyncMock()
+ backend._on_stop = AsyncMock()
+ return backend
+
+
+class TestRackReader(unittest.IsolatedAsyncioTestCase):
+ async def test_scan_rack_delegates_to_backend(self):
+ backend = _make_backend()
+ rack = MagicMock(spec=TubeRack)
+ reader = RackReader(backend=backend)
+ await reader._on_setup()
+
+ result = await reader.scan_rack(rack=rack, timeout=1.0, poll_interval=0.01)
+
+ self.assertEqual(result, SCAN_RESULT)
+ backend.scan_rack.assert_awaited_once_with(rack=rack, timeout=1.0, poll_interval=0.01)
+
+ async def test_scan_rack_id_delegates_to_backend(self):
+ backend = _make_backend()
+ reader = RackReader(backend=backend)
+ await reader._on_setup()
+
+ rack_id = await reader.scan_rack_id(timeout=1.0, poll_interval=0.01)
+
+ self.assertEqual(rack_id, SCAN_RESULT.rack_id)
+ backend.scan_rack_id.assert_awaited_once_with(timeout=1.0, poll_interval=0.01)
+
+ async def test_requires_setup(self):
+ backend = _make_backend()
+ rack = MagicMock(spec=TubeRack)
+ reader = RackReader(backend=backend)
+
+ with self.assertRaises(RuntimeError):
+ await reader.scan_rack(rack=rack)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/pylabrobot/capabilities/rack_reading/standard.py b/pylabrobot/capabilities/rack_reading/standard.py
new file mode 100644
index 00000000000..021c500f379
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/standard.py
@@ -0,0 +1,25 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Literal, Optional
+
+from pylabrobot.resources.barcode import Barcode
+
+
+@dataclass
+class RackScanEntry:
+ """One decoded rack position."""
+
+ position: str
+ tube_id: Optional[str]
+ status: Literal["OK", "NOREAD"]
+ barcode: Optional[Barcode] = None
+
+
+@dataclass
+class RackScanResult:
+ """A decoded rack scan."""
+
+ rack_id: str
+ entries: list[RackScanEntry]
+ rack_barcode: Optional[Barcode] = None
diff --git a/pylabrobot/hamilton/liquid_handlers/star/star.py b/pylabrobot/hamilton/liquid_handlers/star/star.py
index 67a46b54549..4b106de1375 100644
--- a/pylabrobot/hamilton/liquid_handlers/star/star.py
+++ b/pylabrobot/hamilton/liquid_handlers/star/star.py
@@ -12,7 +12,7 @@
from pylabrobot.device import Device
from pylabrobot.resources import Coordinate
from pylabrobot.resources.hamilton import HamiltonDeck, STARDeck, STARLetDeck
-from pylabrobot.resources.hamilton.hamilton_decks import HamiltonCoreGrippers, HamiltonSTARDeck
+from pylabrobot.resources.hamilton.hamilton_decks import HamiltonCoreGrippers
from .chatterbox import STARChatterboxDriver
from .core import CoreGripper
diff --git a/pylabrobot/micronic/__init__.py b/pylabrobot/micronic/__init__.py
new file mode 100644
index 00000000000..2c37b8b19b9
--- /dev/null
+++ b/pylabrobot/micronic/__init__.py
@@ -0,0 +1,9 @@
+from pylabrobot.micronic.code_reader import (
+ MicronicCodeReader,
+ MicronicCodeReaderDriver,
+ MicronicCodeReaderRackReadingBackend,
+ MicronicError,
+ SaneScanner,
+ Scanner,
+ TwainScanner,
+)
diff --git a/pylabrobot/micronic/code_reader/__init__.py b/pylabrobot/micronic/code_reader/__init__.py
new file mode 100644
index 00000000000..0f67135b25a
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/__init__.py
@@ -0,0 +1,7 @@
+from pylabrobot.micronic.code_reader.code_reader import MicronicCodeReader
+from pylabrobot.micronic.code_reader.driver import MicronicCodeReaderDriver
+from pylabrobot.micronic.code_reader.errors import MicronicError
+from pylabrobot.micronic.code_reader.rack_reading_backend import (
+ MicronicCodeReaderRackReadingBackend,
+)
+from pylabrobot.micronic.code_reader.scanner import SaneScanner, Scanner, TwainScanner
diff --git a/pylabrobot/micronic/code_reader/code_reader.py b/pylabrobot/micronic/code_reader/code_reader.py
new file mode 100644
index 00000000000..1ecd7dfb507
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/code_reader.py
@@ -0,0 +1,41 @@
+"""Micronic Code Reader device."""
+
+from __future__ import annotations
+
+from typing import Optional
+
+from pylabrobot.capabilities.rack_reading import RackReader
+from pylabrobot.device import Device
+
+from .driver import MicronicCodeReaderDriver
+from .rack_reading_backend import MicronicCodeReaderRackReadingBackend
+from .scanner import Scanner
+
+
+class MicronicCodeReader(Device):
+ """Micronic rack reader device.
+
+ The rack-reading capability is driven by ``MicronicCodeReaderDriver``.
+ """
+
+ def __init__(
+ self,
+ scanner: Scanner,
+ serial_port: str,
+ image_dir: Optional[str] = None,
+ scanner_timeout: float = 90.0,
+ serial_timeout_ms: int = 2500,
+ keep_images: bool = False,
+ ):
+ driver = MicronicCodeReaderDriver(
+ scanner=scanner,
+ serial_port=serial_port,
+ image_dir=image_dir,
+ scanner_timeout_ms=int(scanner_timeout * 1000),
+ serial_timeout_ms=serial_timeout_ms,
+ keep_images=keep_images,
+ )
+ super().__init__(driver=driver)
+ self.driver: MicronicCodeReaderDriver = driver
+ self.rack_reading = RackReader(backend=MicronicCodeReaderRackReadingBackend(driver))
+ self._capabilities = [self.rack_reading]
diff --git a/pylabrobot/micronic/code_reader/driver.py b/pylabrobot/micronic/code_reader/driver.py
new file mode 100644
index 00000000000..5b6dc3f1dd7
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/driver.py
@@ -0,0 +1,434 @@
+"""Hardware driver for the Micronic rack scanner.
+
+This driver does not call Micronic Code Reader or IO Monitor. It owns the local
+scanner path directly:
+
+- acquire a rack image through a caller-supplied :class:`Scanner`,
+- read barcodes through the side serial barcode reader, and
+- expose acquisition metadata for the rack-reading backend.
+"""
+
+from __future__ import annotations
+
+import re
+import tempfile
+import time
+from dataclasses import dataclass
+from datetime import datetime
+from pathlib import Path
+from typing import Iterable, Optional
+
+from pylabrobot.capabilities.capability import BackendParams
+from pylabrobot.device import Driver
+from pylabrobot.io.serial import Serial
+
+from .errors import MicronicError
+from .scanner import Scanner
+
+ROWS = "ABCDEFGH"
+COLS = 12
+RACK_ROWS = 8
+RACK_COLS = 12
+
+
+@dataclass(frozen=True)
+class DecodeResult:
+ tube_id: str
+ method: str
+
+
+class MicronicCodeReaderDriver(Driver):
+ """Driver that controls the Micronic scanner without the OEM app."""
+
+ def __init__(
+ self,
+ scanner: Scanner,
+ serial_port: str,
+ image_dir: Optional[str] = None,
+ scanner_timeout_ms: int = 90000,
+ serial_timeout_ms: int = 2500,
+ keep_images: bool = False,
+ ):
+ super().__init__()
+ self.scanner = scanner
+ self.image_dir = (
+ Path(image_dir) if image_dir else Path(tempfile.gettempdir()) / "pylabrobot-micronic"
+ )
+ self.scanner_timeout_ms = scanner_timeout_ms
+ self.serial_timeout_ms = serial_timeout_ms
+ self.keep_images = keep_images
+ self.io = Serial(
+ human_readable_device_name="Micronic rack ID reader",
+ port=serial_port,
+ baudrate=9600,
+ bytesize=7,
+ parity="E",
+ stopbits=1,
+ timeout=0.1,
+ write_timeout=1.0,
+ )
+ self.last_image_path: Optional[Path] = None
+ self.last_scan_metadata: dict[str, object] = {}
+ self.last_decode_metadata: dict[str, object] = {}
+
+ async def setup(self, backend_params: Optional[BackendParams] = None):
+ del backend_params
+ self.image_dir.mkdir(parents=True, exist_ok=True)
+ await self.io.setup()
+
+ async def stop(self):
+ await self.io.stop()
+
+ def serialize(self) -> dict:
+ return {
+ **super().serialize(),
+ "image_dir": str(self.image_dir),
+ "scanner_timeout_ms": self.scanner_timeout_ms,
+ "serial_timeout_ms": self.serial_timeout_ms,
+ "keep_images": self.keep_images,
+ }
+
+ async def read_barcode(self) -> str:
+ deadline = time.monotonic() + self.serial_timeout_ms / 1000
+ chunks: list[bytes] = []
+ try:
+ await self.io.reset_input_buffer()
+ await self.io.write(b"\r\n")
+ while time.monotonic() < deadline:
+ value = await self.io.read(1)
+ if value:
+ chunks.append(value)
+ if value in {b"\r", b"\n"}:
+ break
+ except Exception as exc:
+ raise MicronicError(
+ "Rack ID serial read failed. Install the PLR serial extra with "
+ "`pip install pylabrobot[serial]` and verify the serial port: "
+ f"{exc}"
+ ) from exc
+ text = b"".join(chunks).decode("utf-8", errors="ignore")
+ match = re.search(r"\d{6,}", text)
+ return match.group(0) if match else "NOREAD"
+
+ def acquire_image(self) -> Path:
+ self.image_dir.mkdir(parents=True, exist_ok=True)
+ image_path = (
+ self.image_dir
+ / f"micronic_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.{self.scanner.image_extension}"
+ )
+ self.last_scan_metadata = self.scanner.acquire(image_path, self.scanner_timeout_ms)
+ self.last_image_path = image_path
+ return image_path
+
+ def release_image(self, image_path: Path) -> None:
+ if not self.keep_images:
+ try:
+ image_path.unlink()
+ self.last_image_path = None
+ except OSError:
+ pass
+
+
+def decode_image(image_path: Path) -> tuple[dict[str, DecodeResult], dict[str, object]]:
+ cv2, np, zxingcpp, Image, ImageOps = import_decode_dependencies()
+ with Image.open(image_path) as loaded_image:
+ image = loaded_image.convert("L")
+ full_results = zxingcpp.read_barcodes(
+ image,
+ formats=zxingcpp.BarcodeFormat.DataMatrix,
+ try_rotate=True,
+ try_downscale=True,
+ try_invert=True,
+ )
+
+ detected: list[tuple[float, float, str]] = []
+ for result in full_results:
+ if not is_tube_id(result.text):
+ continue
+ corners = [
+ result.position.top_left,
+ result.position.top_right,
+ result.position.bottom_right,
+ result.position.bottom_left,
+ ]
+ detected.append(
+ (
+ sum(corner.x for corner in corners) / 4,
+ sum(corner.y for corner in corners) / 4,
+ result.text,
+ )
+ )
+
+ if len(detected) < 24:
+ raise MicronicError(f"Only {len(detected)} DataMatrix codes were found in the full image.")
+
+ xs = fitted_axis(cluster_axis([item[0] for item in detected], RACK_ROWS, 90), RACK_ROWS)
+ ys = fitted_axis(cluster_axis([item[1] for item in detected], RACK_COLS, 90), RACK_COLS)
+ x_pitch = abs(xs[-1] - xs[0]) / (RACK_ROWS - 1)
+ y_pitch = abs(ys[-1] - ys[0]) / (RACK_COLS - 1)
+
+ decoded: dict[str, DecodeResult] = {}
+ for x, y, tube_id in detected:
+ scan_col = min(range(RACK_ROWS), key=lambda index: abs(xs[index] - x))
+ scan_row = min(range(RACK_COLS), key=lambda index: abs(ys[index] - y))
+ if abs(xs[scan_col] - x) > x_pitch * 0.45 or abs(ys[scan_row] - y) > y_pitch * 0.45:
+ continue
+ decoded[rack_position(scan_row, scan_col)] = DecodeResult(tube_id=tube_id, method="full-image")
+
+ for scan_row in range(RACK_COLS):
+ for scan_col in range(RACK_ROWS):
+ position = rack_position(scan_row, scan_col)
+ if position in decoded:
+ continue
+ crop_result = decode_well_crop(
+ image,
+ xs[scan_col],
+ ys[scan_row],
+ cv2,
+ np,
+ zxingcpp,
+ Image,
+ ImageOps,
+ )
+ if crop_result:
+ decoded[position] = crop_result
+
+ duplicate_ids = find_duplicate_ids(decoded)
+ if duplicate_ids:
+ raise MicronicError(
+ f"Duplicate tube IDs decoded from more than one well: {', '.join(duplicate_ids)}"
+ )
+
+ metadata = {
+ "imageSize": image.size,
+ "fullImageDecoded": len(detected),
+ "gridX": [round(value, 1) for value in xs],
+ "gridY": [round(value, 1) for value in ys],
+ "decodedWells": len(decoded),
+ "missing": [position for position in iter_positions() if position not in decoded],
+ }
+ return decoded, metadata
+
+
+def import_decode_dependencies():
+ try:
+ import cv2 # type: ignore
+ import numpy as np # type: ignore
+ import zxingcpp # type: ignore
+ from PIL import Image, ImageOps # type: ignore
+ except ImportError as exc:
+ raise MicronicError(
+ "Micronic decode dependencies are missing. Install pillow, "
+ "opencv-python-headless, numpy, and zxing-cpp."
+ ) from exc
+ return cv2, np, zxingcpp, Image, ImageOps
+
+
+def cluster_axis(values: list[float], expected_count: int, tolerance: float) -> list[float]:
+ if not values:
+ raise MicronicError("No decoded barcode positions are available for grid calibration.")
+
+ clusters: list[list[float]] = []
+ for value in sorted(values):
+ if not clusters:
+ clusters.append([value])
+ continue
+ mean = sum(clusters[-1]) / len(clusters[-1])
+ if abs(value - mean) > tolerance:
+ clusters.append([value])
+ else:
+ clusters[-1].append(value)
+
+ means = [sum(cluster) / len(cluster) for cluster in clusters]
+ if len(means) == expected_count:
+ return means
+ if len(means) >= 2:
+ return [
+ means[0] + index * (means[-1] - means[0]) / (expected_count - 1)
+ for index in range(expected_count)
+ ]
+ raise MicronicError(
+ f"Could not fit {expected_count} grid clusters from {len(values)} decoded positions."
+ )
+
+
+def fitted_axis(means: list[float], expected_count: int) -> list[float]:
+ return [
+ means[0] + index * (means[-1] - means[0]) / (expected_count - 1)
+ for index in range(expected_count)
+ ]
+
+
+def rack_position(scan_row: int, scan_col: int) -> str:
+ return f"{ROWS[RACK_ROWS - 1 - scan_col]}{RACK_COLS - scan_row}"
+
+
+def iter_positions() -> Iterable[str]:
+ for row in ROWS:
+ for column in range(1, COLS + 1):
+ yield f"{row}{column}"
+
+
+def is_tube_id(value: object) -> bool:
+ return isinstance(value, str) and value.isdigit() and len(value) == 10
+
+
+def decode_well_crop(
+ image, center_x, center_y, cv2, np, zxingcpp, Image, ImageOps
+) -> Optional[DecodeResult]:
+ for size in [150, 160, 180, 200, 220, 240]:
+ crop = centered_crop(image, center_x, center_y, size)
+ decoded = decode_pil_variants(crop, zxingcpp, ImageOps)
+ if decoded:
+ return DecodeResult(tube_id=decoded, method=f"crop-{size}")
+
+ for size in [100, 120, 140, 160]:
+ crop = centered_crop(image, center_x, center_y, size)
+ decoded = decode_perspective_crop(crop, cv2, np, zxingcpp, Image, ImageOps)
+ if decoded:
+ return DecodeResult(tube_id=decoded, method=f"perspective-{size}")
+
+ return None
+
+
+def centered_crop(image, center_x: float, center_y: float, size: int):
+ half = size / 2
+ return image.crop(
+ (
+ int(round(center_x - half)),
+ int(round(center_y - half)),
+ int(round(center_x + half)),
+ int(round(center_y + half)),
+ )
+ )
+
+
+def decode_pil_variants(crop, zxingcpp, ImageOps) -> Optional[str]:
+ for variant in [crop, ImageOps.autocontrast(crop), ImageOps.equalize(crop)]:
+ decoded = decode_with_zxing(variant, zxingcpp, ImageOps)
+ if decoded:
+ return decoded
+ return None
+
+
+def decode_with_zxing(image, zxingcpp, ImageOps) -> Optional[str]:
+ binarizers = [
+ zxingcpp.Binarizer.LocalAverage,
+ zxingcpp.Binarizer.GlobalHistogram,
+ zxingcpp.Binarizer.FixedThreshold,
+ ]
+ for scale in [1, 2, 3, 4]:
+ scaled = image if scale == 1 else image.resize((image.width * scale, image.height * scale))
+ for invert in [False, True]:
+ candidate = ImageOps.invert(scaled) if invert else scaled
+ for border in [0, 20, 50]:
+ padded = ImageOps.expand(candidate, border=border, fill=255) if border else candidate
+ for binarizer in binarizers:
+ for pure in [False, True]:
+ results = zxingcpp.read_barcodes(
+ padded,
+ formats=zxingcpp.BarcodeFormat.DataMatrix,
+ try_rotate=True,
+ try_downscale=False,
+ try_invert=True,
+ binarizer=binarizer,
+ is_pure=pure,
+ )
+ for result in results:
+ if is_tube_id(result.text):
+ return str(result.text)
+ return None
+
+
+def order_box(points, np):
+ points = np.array(points, dtype=np.float32)
+ sums = points.sum(axis=1)
+ diffs = np.diff(points, axis=1).ravel()
+ return np.array(
+ [
+ points[np.argmin(sums)],
+ points[np.argmin(diffs)],
+ points[np.argmax(sums)],
+ points[np.argmax(diffs)],
+ ],
+ dtype=np.float32,
+ )
+
+
+def decode_perspective_crop(crop, cv2, np, zxingcpp, Image, ImageOps) -> Optional[str]:
+ crop_array = np.array(crop)
+ for threshold in [30, 40, 50, 60, 70, 80, 90, 100, 120, 140]:
+ mask = (crop_array < threshold).astype(np.uint8) * 255
+ for candidate_mask in candidate_masks(mask, cv2, np):
+ if not candidate_mask.any():
+ continue
+ points = np.column_stack(np.where(candidate_mask > 0))[:, ::-1].astype(np.float32)
+ if len(points) < 40:
+ continue
+ rect = cv2.minAreaRect(points)
+ (rect_x, rect_y), (rect_w, rect_h), _angle = rect
+ if rect_w < 25 or rect_h < 25 or rect_w > crop.width * 0.9 or rect_h > crop.height * 0.9:
+ continue
+ if max(rect_w, rect_h) / max(1, min(rect_w, rect_h)) > 2:
+ continue
+
+ box = cv2.boxPoints(rect)
+ center = np.array([rect_x, rect_y], dtype=np.float32)
+ for margin in [0.9, 1.0, 1.1, 1.2, 1.35]:
+ source = order_box((box - center) * margin + center, np)
+ for output_size in [60, 80, 100, 120, 160]:
+ destination = np.array(
+ [
+ [0, 0],
+ [output_size - 1, 0],
+ [output_size - 1, output_size - 1],
+ [0, output_size - 1],
+ ],
+ dtype=np.float32,
+ )
+ matrix = cv2.getPerspectiveTransform(source, destination)
+ warped = cv2.warpPerspective(
+ crop_array, matrix, (output_size, output_size), borderValue=255
+ )
+ for mode_array in perspective_variants(warped, threshold, cv2, Image, ImageOps):
+ decoded = decode_with_zxing(mode_array, zxingcpp, ImageOps)
+ if decoded:
+ return decoded
+ return None
+
+
+def candidate_masks(mask, cv2, np):
+ yield mask
+ number, labels, stats, centroids = cv2.connectedComponentsWithStats(mask, 8)
+ combined = np.zeros_like(mask)
+ size = mask.shape[0]
+ for index in range(1, number):
+ _x, _y, width, height, area = stats[index]
+ center_x, center_y = centroids[index]
+ if area < 15 or width < 8 or height < 8:
+ continue
+ if abs(center_x - size / 2) > size * 0.33 or abs(center_y - size / 2) > size * 0.33:
+ continue
+ if width > size * 0.85 or height > size * 0.85:
+ continue
+ combined[labels == index] = 255
+ yield combined
+
+
+def perspective_variants(warped, threshold: int, cv2, Image, ImageOps):
+ yield Image.fromarray(warped)
+ yield ImageOps.autocontrast(Image.fromarray(warped))
+ _, binary = cv2.threshold(warped, min(220, threshold + 70), 255, cv2.THRESH_BINARY)
+ yield Image.fromarray(binary)
+ yield Image.fromarray(255 - binary)
+
+
+def find_duplicate_ids(decoded: dict[str, DecodeResult]) -> list[str]:
+ seen: dict[str, str] = {}
+ duplicates: list[str] = []
+ for position, result in decoded.items():
+ previous = seen.get(result.tube_id)
+ if previous and previous != position:
+ duplicates.append(result.tube_id)
+ seen[result.tube_id] = position
+ return sorted(set(duplicates))
diff --git a/pylabrobot/micronic/code_reader/errors.py b/pylabrobot/micronic/code_reader/errors.py
new file mode 100644
index 00000000000..ac8d9529598
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/errors.py
@@ -0,0 +1,2 @@
+class MicronicError(Exception):
+ """Raised when Micronic driver or scanner operations fail."""
diff --git a/pylabrobot/micronic/code_reader/micronic_tests.py b/pylabrobot/micronic/code_reader/micronic_tests.py
new file mode 100644
index 00000000000..f98c11e0ff3
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/micronic_tests.py
@@ -0,0 +1,357 @@
+import os
+import tempfile
+import unittest
+from pathlib import Path
+from typing import cast
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from pylabrobot.micronic import MicronicCodeReader, SaneScanner, TwainScanner
+from pylabrobot.micronic.code_reader.driver import (
+ DecodeResult,
+ MicronicCodeReaderDriver,
+ MicronicError,
+)
+from pylabrobot.micronic.code_reader.rack_reading_backend import (
+ MicronicCodeReaderRackReadingBackend,
+)
+from pylabrobot.resources.tube_rack import TubeRack
+
+
+def _rack(num_items_x: int = 12, num_items_y: int = 8, num_items: int = 96) -> TubeRack:
+ rack = MagicMock(spec=TubeRack)
+ rack.num_items_x = num_items_x
+ rack.num_items_y = num_items_y
+ rack.num_items = num_items
+ return rack
+
+
+def _mock_scanner(image_extension: str = "bmp") -> MagicMock:
+ scanner = MagicMock()
+ scanner.image_extension = image_extension
+ scanner.acquire = MagicMock(return_value={"source": "test"})
+ return scanner
+
+
+class TestScannerClasses(unittest.IsolatedAsyncioTestCase):
+ def test_sane_scanner_invokes_scanimage(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ output_path = Path(image_dir) / "rack.tiff"
+ with (
+ patch(
+ "pylabrobot.micronic.code_reader.scanner.shutil.which",
+ return_value="/usr/bin/scanimage",
+ ),
+ patch(
+ "pylabrobot.micronic.code_reader.scanner._run_scan_command",
+ return_value={"source": "sane"},
+ ) as run_scan_command,
+ ):
+ scanner = SaneScanner(sane_device="avision:libusb:001:004")
+ metadata = scanner.acquire(output_path, timeout_ms=1000)
+
+ self.assertEqual(metadata["source"], "sane")
+ self.assertEqual(scanner.image_extension, "tiff")
+ run_scan_command.assert_called_once_with(
+ [
+ "/usr/bin/scanimage",
+ "--device-name",
+ "avision:libusb:001:004",
+ "--format=tiff",
+ "--output-file",
+ str(output_path),
+ ],
+ output_path,
+ 1000,
+ source="sane",
+ )
+
+ def test_sane_scanner_raises_when_scanimage_missing(self):
+ with patch("pylabrobot.micronic.code_reader.scanner.shutil.which", return_value=None):
+ with self.assertRaises(MicronicError):
+ SaneScanner()
+
+ def test_twain_scanner_resolves_path_from_env(self):
+ with (
+ patch.dict(os.environ, {"MICRONIC_TWAIN_SCANNER_PATH": "/opt/twain_scan"}, clear=False),
+ patch("pylabrobot.micronic.code_reader.scanner.shutil.which", return_value=None),
+ ):
+ scanner = TwainScanner()
+ self.assertEqual(scanner.twain_scanner_path, "/opt/twain_scan")
+
+ def test_twain_scanner_raises_when_helper_missing(self):
+ with (
+ patch.dict(os.environ, {}, clear=True),
+ patch("pylabrobot.micronic.code_reader.scanner.shutil.which", return_value=None),
+ ):
+ with self.assertRaises(MicronicError):
+ TwainScanner()
+
+ def test_twain_scanner_acquire_runs_helper(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ output_path = Path(image_dir) / "rack.bmp"
+ with patch(
+ "pylabrobot.micronic.code_reader.scanner._run_scan_command",
+ return_value={"source": "twain"},
+ ) as run_scan_command:
+ scanner = TwainScanner(twain_scanner_path="/opt/twain_scan", twain_source="AVA6PlusG")
+ scanner.acquire(output_path, timeout_ms=1000)
+
+ run_scan_command.assert_called_once_with(
+ ["/opt/twain_scan", str(output_path), "AVA6PlusG", "1000"],
+ output_path,
+ 1000,
+ source="twain",
+ )
+
+
+class TestMicronicCodeReaderDriver(unittest.IsolatedAsyncioTestCase):
+ def test_acquire_image_runs_scanner_and_tracks_metadata(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ scanner = _mock_scanner()
+ driver = MicronicCodeReaderDriver(
+ scanner=scanner,
+ serial_port="/dev/ttyUSB0",
+ image_dir=image_dir,
+ keep_images=True,
+ )
+ image_path = driver.acquire_image()
+
+ self.assertEqual(driver.last_image_path, image_path)
+ self.assertEqual(driver.last_scan_metadata, {"source": "test"})
+ scanner.acquire.assert_called_once()
+ self.assertTrue(image_path.name.startswith("micronic_"))
+ self.assertEqual(image_path.suffix, ".bmp")
+
+ async def test_read_barcode_uses_plr_serial(self):
+ instances: list[object] = []
+
+ class FakeSerial:
+ def __init__(self, **kwargs):
+ self.kwargs = kwargs
+ self.reads = iter([b"9", b"5", b"0", b"0", b"0", b"1", b"7", b"7", b"2", b"2", b"\r"])
+ self.calls: list[str] = []
+ instances.append(self)
+
+ async def setup(self):
+ self.calls.append("setup")
+
+ async def reset_input_buffer(self):
+ self.calls.append("reset_input_buffer")
+
+ async def write(self, data: bytes):
+ self.calls.append(f"write:{data!r}")
+
+ async def read(self, num_bytes: int = 1) -> bytes:
+ self.calls.append(f"read:{num_bytes}")
+ return next(self.reads)
+
+ async def stop(self):
+ self.calls.append("stop")
+
+ with patch("pylabrobot.micronic.code_reader.driver.Serial", FakeSerial):
+ driver = MicronicCodeReaderDriver(scanner=_mock_scanner(), serial_port="/dev/ttyUSB0")
+ await driver.setup()
+ try:
+ rack_id = await driver.read_barcode()
+ finally:
+ await driver.stop()
+
+ self.assertEqual(len(instances), 1)
+ fake_serial = cast(FakeSerial, instances[0])
+ self.assertEqual(rack_id, "9500017722")
+ self.assertEqual(fake_serial.kwargs["port"], "/dev/ttyUSB0")
+ self.assertEqual(fake_serial.kwargs["bytesize"], 7)
+ self.assertEqual(fake_serial.kwargs["parity"], "E")
+ self.assertIn("setup", fake_serial.calls)
+ self.assertIn("reset_input_buffer", fake_serial.calls)
+ self.assertIn("write:b'\\r\\n'", fake_serial.calls)
+ self.assertEqual(fake_serial.calls[-1], "stop")
+
+
+class TestMicronicCodeReaderRackReadingBackend(unittest.IsolatedAsyncioTestCase):
+ async def test_scan_rack_populates_standard_rack_result(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ scanner = _mock_scanner()
+ driver = MicronicCodeReaderDriver(
+ scanner=scanner,
+ serial_port="/dev/ttyUSB0",
+ image_dir=image_dir,
+ keep_images=True,
+ )
+ backend = MicronicCodeReaderRackReadingBackend(driver=driver)
+ decoded = {
+ "A1": DecodeResult(tube_id="1111111111", method="test"),
+ "A2": DecodeResult(tube_id="2222222222", method="test"),
+ }
+ with (
+ patch.object(driver, "read_barcode", AsyncMock(return_value="9500017722")) as read_barcode,
+ patch(
+ "pylabrobot.micronic.code_reader.rack_reading_backend.decode_image",
+ return_value=(decoded, {"decodedWells": 2}),
+ ) as decode_image_mock,
+ ):
+ result = await backend.scan_rack(_rack(num_items=2), timeout=1.0, poll_interval=0.0)
+
+ self.assertEqual(result.rack_id, "9500017722")
+ self.assertIsNotNone(result.rack_barcode)
+ self.assertEqual(result.rack_barcode.data, "9500017722")
+ self.assertEqual(result.rack_barcode.symbology, "Code 128 (Subset B and C)")
+ self.assertEqual(result.entries[0].position, "A1")
+ self.assertEqual(result.entries[0].tube_id, "1111111111")
+ self.assertIsNotNone(result.entries[0].barcode)
+ self.assertEqual(result.entries[0].barcode.data, "1111111111")
+ self.assertEqual(result.entries[0].barcode.symbology, "DataMatrix")
+ self.assertEqual(result.entries[1].tube_id, "2222222222")
+ self.assertEqual(driver.last_scan_metadata, {"source": "test"})
+ self.assertEqual(driver.last_decode_metadata, {"decodedWells": 2})
+ scanner.acquire.assert_called_once()
+ read_barcode.assert_awaited_once()
+ decode_image_mock.assert_called_once()
+
+ async def test_reader_can_scan_twice(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ scanner = _mock_scanner()
+ reader = MicronicCodeReader(
+ scanner=scanner,
+ serial_port="/dev/ttyUSB0",
+ image_dir=image_dir,
+ keep_images=True,
+ )
+ decoded = {"A1": DecodeResult(tube_id="1111111111", method="test")}
+ with (
+ patch.object(reader.driver.io, "setup", AsyncMock()),
+ patch.object(reader.driver.io, "stop", AsyncMock()),
+ patch.object(reader.driver, "read_barcode", AsyncMock(return_value="9500017722")),
+ patch(
+ "pylabrobot.micronic.code_reader.rack_reading_backend.decode_image",
+ return_value=(decoded, {"decodedWells": 1}),
+ ),
+ ):
+ await reader.setup()
+ first = await reader.rack_reading.scan_rack(
+ rack=_rack(num_items=1), timeout=1.0, poll_interval=0.0
+ )
+ second = await reader.rack_reading.scan_rack(
+ rack=_rack(num_items=1), timeout=1.0, poll_interval=0.0
+ )
+
+ self.assertEqual(first.rack_id, "9500017722")
+ self.assertEqual(second.rack_id, "9500017722")
+ self.assertEqual(scanner.acquire.call_count, 2)
+
+ async def test_backend_rejects_mismatched_rack_shape(self):
+ driver = MicronicCodeReaderDriver(scanner=_mock_scanner(), serial_port="/dev/ttyUSB0")
+ backend = MicronicCodeReaderRackReadingBackend(driver=driver)
+ with self.assertRaises(MicronicError):
+ await backend.scan_rack(_rack(num_items_x=6, num_items_y=4), timeout=1.0, poll_interval=0.0)
+
+ async def test_backend_rejects_concurrent_scan(self):
+ driver = MicronicCodeReaderDriver(scanner=_mock_scanner(), serial_port="/dev/ttyUSB0")
+ backend = MicronicCodeReaderRackReadingBackend(driver=driver)
+ await backend._scan_lock.acquire()
+ try:
+ with self.assertRaises(MicronicError):
+ await backend.scan_rack(_rack(), timeout=1.0, poll_interval=0.0)
+ finally:
+ backend._scan_lock.release()
+
+ async def test_scan_rack_times_out(self):
+ driver = MicronicCodeReaderDriver(scanner=_mock_scanner(), serial_port="/dev/ttyUSB0")
+ backend = MicronicCodeReaderRackReadingBackend(driver=driver)
+
+ async def slow(rack):
+ del rack
+ import asyncio
+
+ await asyncio.sleep(1)
+ return MagicMock()
+
+ with patch.object(backend, "_scan_rack", slow):
+ with self.assertRaises(TimeoutError):
+ await backend.scan_rack(rack=_rack(), timeout=0.01, poll_interval=0.0)
+
+ async def test_timeout_keeps_scan_lock_until_blocking_scan_finishes(self):
+ driver = MicronicCodeReaderDriver(scanner=_mock_scanner(), serial_port="/dev/ttyUSB0")
+ backend = MicronicCodeReaderRackReadingBackend(driver=driver)
+
+ def slow_blocking_scan(rack_id, expected_well_count):
+ del rack_id, expected_well_count
+ import time
+
+ time.sleep(0.05)
+ return MagicMock()
+
+ with (
+ patch.object(driver, "read_barcode", AsyncMock(return_value="9500017722")),
+ patch.object(backend, "_scan_rack_blocking", side_effect=slow_blocking_scan),
+ ):
+ with self.assertRaises(TimeoutError):
+ await backend.scan_rack(rack=_rack(), timeout=0.01, poll_interval=0.0)
+ with self.assertRaises(MicronicError):
+ await backend.scan_rack(rack=_rack(), timeout=0.01, poll_interval=0.0)
+
+ import asyncio
+
+ await asyncio.sleep(0.08)
+ self.assertFalse(backend._scan_lock.locked())
+
+ async def test_scan_rack_propagates_micronic_error(self):
+ driver = MicronicCodeReaderDriver(scanner=_mock_scanner(), serial_port="/dev/ttyUSB0")
+ backend = MicronicCodeReaderRackReadingBackend(driver=driver)
+ with self.assertRaises(MicronicError):
+ await backend.scan_rack(
+ rack=_rack(num_items_x=6, num_items_y=4), timeout=1.0, poll_interval=0.0
+ )
+
+ async def test_scan_rack_id_reads_barcode(self):
+ driver = MicronicCodeReaderDriver(scanner=_mock_scanner(), serial_port="/dev/ttyUSB0")
+ backend = MicronicCodeReaderRackReadingBackend(driver=driver)
+ with patch.object(driver, "read_barcode", AsyncMock(return_value="9500017722")) as read_barcode:
+ rack_id = await backend.scan_rack_id(timeout=5.0, poll_interval=0.5)
+
+ self.assertEqual(rack_id, "9500017722")
+ read_barcode.assert_awaited_once_with()
+
+
+class TestMicronicCodeReader(unittest.IsolatedAsyncioTestCase):
+ async def test_device_exposes_rack_reading_only(self):
+ reader = MicronicCodeReader(
+ scanner=_mock_scanner(),
+ serial_port="/dev/ttyUSB0",
+ scanner_timeout=12.0,
+ )
+ with (
+ patch.object(reader.driver.io, "setup", AsyncMock()),
+ patch.object(reader.driver.io, "stop", AsyncMock()),
+ ):
+ await reader.setup()
+ try:
+ self.assertIn(reader.rack_reading, reader._capabilities)
+ self.assertFalse(hasattr(reader, "barcode_scanning"))
+ with patch.object(
+ reader.rack_reading,
+ "scan_rack",
+ return_value=MagicMock(rack_id="9500017722"),
+ ) as scan_rack:
+ result = await reader.rack_reading.scan_rack(
+ rack=_rack(),
+ timeout=12.0,
+ poll_interval=1.0,
+ )
+ finally:
+ await reader.stop()
+
+ self.assertEqual(result.rack_id, "9500017722")
+ scan_rack.assert_called_once()
+
+ async def test_frontend_uses_driver(self):
+ reader = MicronicCodeReader(
+ scanner=_mock_scanner(),
+ serial_port="/dev/ttyUSB0",
+ )
+ self.assertIsInstance(reader.driver, MicronicCodeReaderDriver)
+ self.assertFalse(hasattr(reader, "barcode_scanning"))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/pylabrobot/micronic/code_reader/rack_reading_backend.py b/pylabrobot/micronic/code_reader/rack_reading_backend.py
new file mode 100644
index 00000000000..6c02d453b94
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/rack_reading_backend.py
@@ -0,0 +1,118 @@
+"""Rack-reading backend for the Micronic driver."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+
+from pylabrobot.capabilities.rack_reading import RackReaderBackend, RackScanEntry, RackScanResult
+from pylabrobot.resources.barcode import Barcode
+from pylabrobot.resources.tube_rack import TubeRack
+
+from .driver import RACK_COLS, RACK_ROWS, MicronicCodeReaderDriver, decode_image, iter_positions
+from .errors import MicronicError
+
+logger = logging.getLogger(__name__)
+
+
+class MicronicCodeReaderRackReadingBackend(RackReaderBackend):
+ """Rack-reading backend for the Micronic code reader."""
+
+ def __init__(self, driver: MicronicCodeReaderDriver):
+ super().__init__()
+ self.driver = driver
+ self._scan_lock = asyncio.Lock()
+
+ @staticmethod
+ def _validate_rack(rack: TubeRack) -> None:
+ if rack.num_items_x != RACK_COLS or rack.num_items_y != RACK_ROWS:
+ raise MicronicError(
+ f"Micronic code reader only supports {RACK_ROWS}x{RACK_COLS} racks; "
+ f"got {rack.num_items_y}x{rack.num_items_x}."
+ )
+
+ async def scan_rack(self, rack: TubeRack, timeout: float, poll_interval: float) -> RackScanResult:
+ del poll_interval
+ return await asyncio.wait_for(self._scan_rack(rack), timeout=timeout)
+
+ async def scan_rack_id(self, timeout: float, poll_interval: float) -> str:
+ del poll_interval
+ return await asyncio.wait_for(self.driver.read_barcode(), timeout=timeout)
+
+ async def _scan_rack(self, rack: TubeRack) -> RackScanResult:
+ self._validate_rack(rack)
+ if self._scan_lock.locked():
+ raise MicronicError("Micronic rack scan is already in progress.")
+ await self._scan_lock.acquire()
+ release_lock = True
+ try:
+ rack_id = await self.driver.read_barcode()
+ loop = asyncio.get_running_loop()
+ scan_future = loop.run_in_executor(None, self._scan_rack_blocking, rack_id, rack.num_items)
+ try:
+ return await asyncio.shield(scan_future)
+ except asyncio.CancelledError:
+ release_lock = False
+ scan_future.add_done_callback(self._finish_cancelled_scan)
+ raise
+ finally:
+ if release_lock:
+ self._release_scan_lock()
+
+ def _finish_cancelled_scan(self, future) -> None:
+ try:
+ future.exception()
+ except asyncio.CancelledError:
+ pass
+ self._release_scan_lock()
+
+ def _release_scan_lock(self) -> None:
+ if self._scan_lock.locked():
+ self._scan_lock.release()
+
+ def _scan_rack_blocking(self, rack_id: str, expected_well_count: int) -> RackScanResult:
+ image_path = self.driver.acquire_image()
+
+ try:
+ decoded, self.driver.last_decode_metadata = decode_image(image_path)
+ if len(decoded) < expected_well_count:
+ missing = ", ".join(position for position in iter_positions() if position not in decoded)
+ raise MicronicError(
+ f"Micronic decode found {len(decoded)} wells; expected at least "
+ f"{expected_well_count}. Missing: {missing}"
+ )
+
+ for position, result in decoded.items():
+ logger.debug("Micronic decoded %s via %s", position, result.method)
+
+ entries = [
+ RackScanEntry(
+ position=position,
+ tube_id=decoded[position].tube_id if position in decoded else None,
+ status="OK" if position in decoded else "NOREAD",
+ barcode=(
+ Barcode(
+ data=decoded[position].tube_id,
+ symbology="DataMatrix",
+ position_on_resource="bottom",
+ )
+ if position in decoded
+ else None
+ ),
+ )
+ for position in iter_positions()
+ ]
+
+ return RackScanResult(
+ rack_id=rack_id,
+ entries=entries,
+ rack_barcode=Barcode(
+ data=rack_id,
+ symbology="Code 128 (Subset B and C)",
+ position_on_resource="right",
+ )
+ if rack_id != "NOREAD"
+ else None,
+ )
+ finally:
+ self.driver.release_image(image_path)
diff --git a/pylabrobot/micronic/code_reader/scanner.py b/pylabrobot/micronic/code_reader/scanner.py
new file mode 100644
index 00000000000..5a4f79224bc
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/scanner.py
@@ -0,0 +1,115 @@
+"""Scanner classes that acquire a rack image for the Micronic driver."""
+
+from __future__ import annotations
+
+import os
+import shutil
+import subprocess # nosec B404 - local scanner helper execution is the interface.
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Optional, Sequence
+
+from .errors import MicronicError
+
+
+class Scanner(ABC):
+ """Abstract scanner that writes a rack image to disk on demand."""
+
+ image_extension: str
+
+ @abstractmethod
+ def acquire(self, output_path: Path, timeout_ms: int) -> dict[str, object]:
+ """Write a rack image to ``output_path`` and return acquisition metadata."""
+
+
+class TwainScanner(Scanner):
+ """Windows TWAIN scanner driven by an operator-installed helper executable.
+
+ Resolves the helper path from (in order): the ``twain_scanner_path`` argument,
+ the ``MICRONIC_TWAIN_SCANNER_PATH`` environment variable, or ``twain_scan`` /
+ ``twain_scan.exe`` on PATH. Raises ``MicronicError`` if none resolve.
+ """
+
+ image_extension = "bmp"
+
+ def __init__(
+ self,
+ twain_scanner_path: Optional[str] = None,
+ twain_source: str = "AVA6PlusG",
+ ):
+ resolved = twain_scanner_path or _resolve_twain_scanner_path()
+ if resolved is None:
+ raise MicronicError(
+ "No TWAIN helper was found. Pass twain_scanner_path, set "
+ "MICRONIC_TWAIN_SCANNER_PATH, or put twain_scan on PATH."
+ )
+ self.twain_scanner_path = resolved
+ self.twain_source = twain_source
+
+ def acquire(self, output_path: Path, timeout_ms: int) -> dict[str, object]:
+ command = [self.twain_scanner_path, str(output_path), self.twain_source, str(timeout_ms)]
+ return _run_scan_command(command, output_path, timeout_ms, source="twain")
+
+
+class SaneScanner(Scanner):
+ """Linux SANE scanner driven through the ``scanimage`` CLI."""
+
+ image_extension = "tiff"
+
+ def __init__(
+ self,
+ sane_device: Optional[str] = None,
+ scanimage_path: Optional[str] = None,
+ ):
+ resolved = scanimage_path or shutil.which("scanimage")
+ if resolved is None:
+ raise MicronicError("scanimage was not found on PATH. Install SANE or pass scanimage_path.")
+ self.scanimage_path = resolved
+ self.sane_device = sane_device
+
+ def acquire(self, output_path: Path, timeout_ms: int) -> dict[str, object]:
+ command = [self.scanimage_path]
+ if self.sane_device:
+ command.extend(["--device-name", self.sane_device])
+ command.extend(["--format=tiff", "--output-file", str(output_path)])
+ return _run_scan_command(command, output_path, timeout_ms, source="sane")
+
+
+def _run_scan_command(
+ command: Sequence[str],
+ output_path: Path,
+ timeout_ms: int,
+ source: str,
+) -> dict[str, object]:
+ try:
+ completed = subprocess.run( # nosec B603 - operator-configured command, shell=False.
+ list(command),
+ check=False,
+ capture_output=True,
+ text=True,
+ timeout=(timeout_ms / 1000) + 15,
+ )
+ except FileNotFoundError as exc:
+ raise MicronicError(f"Scan command was not found: {command[0]}") from exc
+
+ if completed.returncode != 0:
+ raise MicronicError(
+ "Scan command failed with exit code "
+ f"{completed.returncode}: {completed.stderr.strip() or completed.stdout.strip()}"
+ )
+ if not output_path.exists():
+ raise MicronicError(f"Scan command did not create image: {output_path}")
+ return {
+ "stdout": completed.stdout.strip(),
+ "stderr": completed.stderr.strip(),
+ "source": source,
+ "command": list(command),
+ }
+
+
+def _resolve_twain_scanner_path() -> Optional[str]:
+ return (
+ os.environ.get("MICRONIC_TWAIN_SCANNER_PATH")
+ or shutil.which("twain_scan.exe")
+ or shutil.which("twain_scan")
+ )