fix(notification): prevent out-of-memory when computing digest notifications (#16691)#16735
fix(notification): prevent out-of-memory when computing digest notifications (#16691)#16735T0t1n wants to merge 1 commit into
Conversation
…cations Read the notification stream range in paginated batches and keep only the events matching the digest, instead of loading the whole range in memory. Bound the retained content by its cumulative byte size, configurable via notification_manager:max_digest_content_size.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #16735 +/- ##
==========================================
+ Coverage 23.97% 24.06% +0.09%
==========================================
Files 3268 3268
Lines 173078 173930 +852
Branches 42892 43354 +462
==========================================
+ Hits 41492 41862 +370
- Misses 131586 132068 +482
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR addresses digest-notification OOMs by changing notification-range reads from “load everything then filter” to a paginated, callback-driven scan that filters/accumulates incrementally and enforces a max retained-byte budget per digest.
Changes:
- Refactors notification-range fetching to be batch/pagination based with an early-stop callback and per-entry stored byte-size metadata.
- Updates digest computation to retain only matching trigger events while scanning the range, with truncation when a configurable byte cap is reached.
- Adds unit coverage for pagination semantics (exclusive cursor, no dup/skip across ms-seq boundaries, early-stop) and digest truncation behavior.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| opencti-platform/opencti-graphql/src/manager/notificationManager.ts | Adds byte-capped incremental digest content collection and truncation warning. |
| opencti-platform/opencti-graphql/src/database/redis-stream.ts | Implements paginated XRANGE range reads and computes per-entry stored byte size. |
| opencti-platform/opencti-graphql/src/database/stream/stream-utils.ts | Introduces SizedNotifEvent and updates the raw client contract to callback-based range reads. |
| opencti-platform/opencti-graphql/src/database/stream/stream-handler.ts | Adapts fetchRangeNotifications to the new callback-based API. |
| opencti-platform/opencti-graphql/tests/01-unit/database/redis-stream-notifications-test.ts | Adds unit tests for range pagination, cursor semantics, filtering, and early-stop. |
| opencti-platform/opencti-graphql/tests/01-unit/manager/digest-notification-test.ts | Adds unit tests for digest content collection across batches and byte-budget truncation. |
| callback: (events: Array<SizedNotifEvent<T>>) => Promise<boolean | void> | boolean | void, | ||
| ): Promise<void> => { | ||
| const client = getClientBase(); | ||
| const endId = `${end.getTime()}`; |
| export const DEFAULT_MAX_DIGEST_CONTENT_SIZE = 500 * 1024 * 1024; // 500 MB | ||
| const MAX_DIGEST_CONTENT_SIZE = conf.get('notification_manager:max_digest_content_size') || DEFAULT_MAX_DIGEST_CONTENT_SIZE; |
| const content: Array<KnowledgeNotificationEvent> = []; | ||
| let byteSize = 0; | ||
| let truncated = false; | ||
| await fetchRangeNotifications<KnowledgeNotificationEvent>(fromDate, toDate, (events) => { | ||
| for (let i = 0; i < events.length; i += 1) { | ||
| const { event: notification, byteSize: notificationSize } = events[i]; | ||
| if (triggerIds.includes(notification.notification_id)) { |
| afterEach(() => { | ||
| vi.clearAllMocks(); | ||
| }); |
Proposed changes
rawFetchRangeNotificationsnow paginates theXRANGE(exclusivems-seqcursor +COUNT) and hands each batch to the caller, which filters and transforms it on the fly. Before, the whole range of the digest period was loaded and mapped before being filtered, which caused out-of-memory on large ranges.notification_manager:max_digest_content_size(bytes, default 500 MB).Related issues
How to test this PR
Checklist
Further comments
The range read advances on the full
ms-seqstream id with an exclusive cursor, so events sharing the same timestamp across batch boundaries are read exactly once (no duplicate, no skip). Termination and the cursor use the rawXRANGEresult (before the live filter), so a batch containing only non-live entries does not stop the iteration. Both are covered by unit tests.