From bc70842ba3e0314fac148c4a1457bc4bcc5a20ca Mon Sep 17 00:00:00 2001 From: Domen Tabernik Date: Fri, 29 May 2026 14:56:04 +0200 Subject: [PATCH] Handle Portainer endpoints without snapshots ### Motivation - Portainer LTS responses can omit the `Snapshots` / `DockerSnapshotRaw` fields which caused `parse_endpoints` to raise exceptions during initialization. - Make endpoint parsing resilient and fall back to the Portainer endpoint proxy for container lists when snapshots are not present. ### Description - Add container name normalization helpers: `_container_name` and `_containers_by_name` to robustly extract Docker names. - Add `_snapshot_containers` to safely extract `Snapshots` -> `DockerSnapshotRaw` -> `Containers` when present and update `parse_endpoints` to use it. - Add `_fetch_endpoint_containers` and update `init` to call the Docker-listing proxy endpoint (`/api/endpoints//docker/containers/json?all=true`) when snapshots are missing. - Add unit tests in `test_logs.py` covering missing snapshots, legacy snapshot parsing, and the fallback container fetch. ### Testing - Ran `python3 -m unittest test_logs.py` which executed 3 tests and passed (`OK`). - Ran `python3 -m py_compile logs.py test_logs.py` which succeeded with no syntax errors. --- logs.py | 91 +++++++++++++++++++++++++++++++++++++++++++++------- test_logs.py | 63 ++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 11 deletions(-) create mode 100644 test_logs.py diff --git a/logs.py b/logs.py index 6153d2f..342af04 100644 --- a/logs.py +++ b/logs.py @@ -3,25 +3,91 @@ import asyncio import httpx + +class PortainerAPIError(RuntimeError): + pass + + +def _portainer_headers(token): + return {'X-API-Key': token} + + +def _raise_for_portainer_error(response, action): + if response.ok: + return + + detail = response.text.strip()[:200] + message = f'Portainer API failed while {action}: HTTP {response.status_code}' + if response.status_code in (401, 403): + message += '; check that PORTAINER_TOKEN is a valid API key for this Portainer user' + if detail: + message += f' ({detail})' + raise PortainerAPIError(message) + + +def _container_name(container): + """Return the Docker container name used by the rest of the UI.""" + names = container.get('Names') or [] + if not names: + return None + + name = names[0] + if name.startswith('/'): + return name[1:] + return name + + +def _containers_by_name(containers): + return { + name: container + for container in containers or [] + if (name := _container_name(container)) + } + + +def _snapshot_containers(endpoint): + """Extract containers from the endpoint snapshot when Portainer includes it.""" + for snapshot in endpoint.get('Snapshots') or []: + docker_snapshot = snapshot.get('DockerSnapshotRaw') or {} + if 'Containers' in docker_snapshot: + return docker_snapshot['Containers'] + return None + + def parse_endpoints(content): servers = {} for endpoint in json.loads(content): id = endpoint['Id'] name = endpoint['Name'] + containers = _snapshot_containers(endpoint) - assert len(endpoint['Snapshots']) == 1 - containers = { c['Names'][0][1:]: c for c in endpoint['Snapshots'][0]['DockerSnapshotRaw']['Containers'] } - - servers[name] = id, containers + servers[name] = id, _containers_by_name(containers) return servers + +def _fetch_endpoint_containers(base_url, token, endpoint_id): + url = f'{base_url}/api/endpoints/{endpoint_id}/docker/containers/json' + r = requests.get(url, headers=_portainer_headers(token), params={'all': 'true'}, timeout=60) + _raise_for_portainer_error(r, f'listing containers for endpoint {endpoint_id}') + return r.json() + + def init(base_url, token): url = base_url + '/api/endpoints' - headers = {'X-API-Key': token} - r = requests.get(url, headers=headers) - if r.ok: - return parse_endpoints(r.content) - return None + r = requests.get(url, headers=_portainer_headers(token), timeout=60) + _raise_for_portainer_error(r, 'listing endpoints') + + endpoints = r.json() + servers = {} + for endpoint in endpoints: + id = endpoint['Id'] + name = endpoint['Name'] + containers = _snapshot_containers(endpoint) + if containers is None: + containers = _fetch_endpoint_containers(base_url, token, id) + servers[name] = id, _containers_by_name(containers) + return servers + def parse_log(content): lines = [] @@ -33,11 +99,12 @@ def parse_log(content): lines.append((type, message)) return lines + def fetch_logs(base_url, token, endpoint, containers, limit=None): params = { 'stdout': 1, 'stderr': 1 } if limit is not None: params['tail'] = limit - headers = { 'X-API-Key': token } + headers = _portainer_headers(token) url = f'{base_url}/api/endpoints/{endpoint}/docker/containers/%s/logs' try: @@ -55,6 +122,7 @@ def fetch_logs(base_url, token, endpoint, containers, limit=None): return [parse_log(r.content) if r.status_code == 200 else None for r in responses] + def _format_line(line): id, message = line message = message.strip() @@ -62,6 +130,7 @@ def _format_line(line): return f'{message}' return f'{message}' + def format_html(lines): return '''
%s
''' % '\n'.join(map(_format_line, lines)) \ No newline at end of file +
%s
''' % '\n'.join(map(_format_line, lines)) diff --git a/test_logs.py b/test_logs.py new file mode 100644 index 0000000..6196780 --- /dev/null +++ b/test_logs.py @@ -0,0 +1,63 @@ +import unittest +from unittest.mock import Mock, patch + +import logs + + +class LogsPortainerEndpointTests(unittest.TestCase): + def test_parse_endpoints_accepts_missing_snapshots(self): + servers = logs.parse_endpoints(b'[{"Id": 7, "Name": "node-a"}]') + + self.assertEqual(servers, {'node-a': (7, {})}) + + def test_parse_endpoints_reads_legacy_snapshot_containers(self): + servers = logs.parse_endpoints(b'''[ + { + "Id": 7, + "Name": "node-a", + "Snapshots": [ + { + "DockerSnapshotRaw": { + "Containers": [ + {"Id": "abc", "Names": ["/alice"]}, + {"Id": "def", "Names": ["bob"]} + ] + } + } + ] + } + ]''') + + self.assertEqual(set(servers['node-a'][1]), {'alice', 'bob'}) + self.assertEqual(servers['node-a'][1]['alice']['Id'], 'abc') + + @patch('logs.requests.get') + def test_init_fetches_containers_when_endpoint_snapshots_are_not_returned(self, get): + endpoints_response = Mock(ok=True) + endpoints_response.json.return_value = [{'Id': 7, 'Name': 'node-a'}] + containers_response = Mock(ok=True) + containers_response.json.return_value = [{'Id': 'abc', 'Names': ['/alice']}] + get.side_effect = [endpoints_response, containers_response] + + servers = logs.init('https://portainer.example', 'token') + + self.assertEqual(servers['node-a'][0], 7) + self.assertEqual(servers['node-a'][1]['alice']['Id'], 'abc') + get.assert_any_call( + 'https://portainer.example/api/endpoints/7/docker/containers/json', + headers={'X-API-Key': 'token'}, + params={'all': 'true'}, + timeout=60, + ) + + @patch('logs.requests.get') + def test_init_reports_authentication_failures(self, get): + response = Mock(ok=False, status_code=401, text='Invalid API key') + get.return_value = response + + with self.assertRaisesRegex(logs.PortainerAPIError, 'PORTAINER_TOKEN is a valid API key'): + logs.init('https://portainer.example', 'expired-token') + + +if __name__ == '__main__': + unittest.main()