diff --git a/.autover/changes/durable-payload-overflow.json b/.autover/changes/durable-payload-overflow.json new file mode 100644 index 000000000..d4f5ec52f --- /dev/null +++ b/.autover/changes/durable-payload-overflow.json @@ -0,0 +1,12 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.DurableExecution", + "Type": "Patch", + "ChangelogMessages": [ + "Handle large results (over the 256 KB per-operation checkpoint limit) for `ParallelAsync`, `MapAsync`, and `RunInChildContextAsync`. When a Flat concurrent operation's summary or a child context's result exceeds the limit, the inline result is stripped from the checkpoint and the operation is flagged with `ReplayChildren`; on a later replay the unit/child bodies are re-executed to recover the stripped values without re-checkpointing the already-terminal parent. Plain `StepAsync` and `InvokeAsync` results are unaffected, matching the Python/Java/JS SDKs.", + "Enforce the `CheckpointBatcher` request byte limit: the batcher pre-flushes before an operation update would push a batch over the byte (or count) cap, and sends an oversized update on its own." + ] + } + ] +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs index 800d55bcf..1937f6312 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs @@ -33,6 +33,36 @@ internal sealed class CheckpointBatcher : IAsyncDisposable private Exception? _terminalError; private int _disposed; + // Per-update wire-footprint estimate constants. Deliberate over-estimates: + // flushing slightly early is safe, flushing late risks a request-too-large. + private const int PerOpEnvelopeOverheadBytes = 512; + private const int StackFrameOverheadBytes = 8; + + /// + /// Cheap UTF-8 byte estimate of one update's wire footprint — variable string + /// fields plus a fixed envelope. No JSON is produced (AOT-safe). Payload is + /// counted at 2x because it is already-serialized JSON re-escaped as a string + /// value, which roughly doubles for escape-heavy content. + /// + private static int EstimateUpdateBytes(SdkOperationUpdate u) + { + var size = PerOpEnvelopeOverheadBytes; + // int arithmetic is safe: payloads are bounded by the 6MB Lambda + // invocation-payload cap, so the 2x multiply can never overflow a 32-bit int. + if (u.Payload != null) size += System.Text.Encoding.UTF8.GetByteCount(u.Payload) * 2; + size += ByteCount(u.Id) + ByteCount(u.ParentId) + ByteCount(u.Name); + if (u.Error != null) + { + size += ByteCount(u.Error.ErrorType) + ByteCount(u.Error.ErrorMessage) + ByteCount(u.Error.ErrorData); + if (u.Error.StackTrace != null) + foreach (var line in u.Error.StackTrace) + size += ByteCount(line) + StackFrameOverheadBytes; + } + return size; + } + + private static int ByteCount(string? s) => s == null ? 0 : System.Text.Encoding.UTF8.GetByteCount(s); + public CheckpointBatcher( string? initialCheckpointToken, Func, CancellationToken, Task> flushAsync, @@ -113,25 +143,43 @@ public async ValueTask DisposeAsync() private async Task RunWorkerAsync(CancellationToken shutdownToken) { - // TODO: also enforce _config.MaxBatchBytes here. Today we only cap by - // operation count; an item whose serialized size pushes the batch over - // ~750 KB will be sent and rejected service-side. See CheckpointBatcherConfig. - var batch = new List(_config.MaxBatchOperations); + // Both caps are enforced: before adding an item that would push the batch + // over MaxBatchOperations OR MaxBatchBytes, the current batch is flushed. + // A lone item already over the byte cap is sent by itself (never loops). + // The byte accumulator is seeded with a fixed reserve covering the request + // prefix (checkpoint token + ARN + array framing) that the per-update + // estimate does not include. + const int RequestEnvelopeReserveBytes = 4 * 1024; + var batch = new PendingBatch(_config.MaxBatchOperations); + + async Task AddItemAsync(BatchItem item) + { + var itemBytes = EstimateUpdateBytes(item.Update); + if (batch.Count > 0 && + (batch.Count + 1 > _config.MaxBatchOperations || + RequestEnvelopeReserveBytes + batch.Bytes + itemBytes > _config.MaxBatchBytes)) + { + await FlushBatchAsync(batch.Items, shutdownToken).ConfigureAwait(false); + batch.Clear(); + } + + batch.Add(item); + + // Lone item already over the cap: send it alone, do not loop. + if (batch.Count == 1 && + RequestEnvelopeReserveBytes + batch.Bytes > _config.MaxBatchBytes) + { + await FlushBatchAsync(batch.Items, shutdownToken).ConfigureAwait(false); + batch.Clear(); + } + } try { while (await _channel.Reader.WaitToReadAsync(shutdownToken).ConfigureAwait(false)) { - // Drain everything currently queued. while (_channel.Reader.TryRead(out var item)) - { - batch.Add(item); - if (batch.Count >= _config.MaxBatchOperations) - { - await FlushBatchAsync(batch, shutdownToken).ConfigureAwait(false); - batch.Clear(); - } - } + await AddItemAsync(item).ConfigureAwait(false); // Optionally wait for late arrivals to coalesce into one batch. if (_config.FlushInterval > TimeSpan.Zero && batch.Count > 0) @@ -143,14 +191,7 @@ private async Task RunWorkerAsync(CancellationToken shutdownToken) while (await _channel.Reader.WaitToReadAsync(windowCts.Token).ConfigureAwait(false)) { while (_channel.Reader.TryRead(out var item)) - { - batch.Add(item); - if (batch.Count >= _config.MaxBatchOperations) - { - await FlushBatchAsync(batch, shutdownToken).ConfigureAwait(false); - batch.Clear(); - } - } + await AddItemAsync(item).ConfigureAwait(false); } } catch (OperationCanceledException) when (!shutdownToken.IsCancellationRequested) @@ -161,7 +202,7 @@ private async Task RunWorkerAsync(CancellationToken shutdownToken) if (batch.Count > 0) { - await FlushBatchAsync(batch, shutdownToken).ConfigureAwait(false); + await FlushBatchAsync(batch.Items, shutdownToken).ConfigureAwait(false); batch.Clear(); } } @@ -179,9 +220,9 @@ private async Task RunWorkerAsync(CancellationToken shutdownToken) } finally { - // Anything left in the channel after the worker exits — fail it. + // Anything left in the batch/channel after the worker exits — fail it. var failure = Volatile.Read(ref _terminalError) ?? new ObjectDisposedException(nameof(CheckpointBatcher)); - foreach (var leftover in batch) + foreach (var leftover in batch.Items) leftover.Completion.TrySetException(failure); while (_channel.Reader.TryRead(out var item)) item.Completion.TrySetException(failure); @@ -214,5 +255,17 @@ private async Task FlushBatchAsync(IReadOnlyList batch, CancellationT } } + /// Accumulates a batch plus its estimated byte footprint so the two + /// never drift across the worker's add/flush/clear sites. + private sealed class PendingBatch + { + public readonly List Items; + public long Bytes; + public PendingBatch(int capacity) { Items = new List(capacity); } + public int Count => Items.Count; + public void Add(BatchItem item) { Items.Add(item); Bytes += EstimateUpdateBytes(item.Update); } + public void Clear() { Items.Clear(); Bytes = 0; } + } + private readonly record struct BatchItem(SdkOperationUpdate Update, TaskCompletionSource Completion); } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs index 88913e868..81dc85d45 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs @@ -22,15 +22,13 @@ internal sealed class CheckpointBatcherConfig public int MaxBatchOperations { get; init; } = 200; /// - /// Maximum batch size in bytes. Service-side limit is ~750 KB. + /// Maximum batch size in bytes. Service-side request limit is ~750 KB. /// /// - /// TODO: not enforced today. The worker only checks ; - /// a single oversized item (or a batch whose serialized size exceeds 750 KB) - /// will be sent to the service and rejected there. Wire this in alongside - /// the async-flush operations (Map / Parallel / child-context) since those - /// are the scenarios that can actually fill a batch — today every batch is - /// 1 item with = Zero, so the gap is latent. + /// Enforced by the worker: it flushes the current batch before adding an item + /// that would push the estimated request size over this cap, and sends a lone + /// item that already exceeds the cap by itself. The per-update estimate plus a + /// fixed request-prefix reserve approximate the real wire size conservatively. /// - internal int MaxBatchBytes { get; init; } = 750 * 1024; + public int MaxBatchBytes { get; init; } = 750 * 1024; } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs index eeccf0032..5dd4671b1 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs @@ -5,6 +5,7 @@ using System.Text; using Amazon.Lambda; using Amazon.Lambda.Core; +using SdkContextOptions = Amazon.Lambda.Model.ContextOptions; using SdkErrorObject = Amazon.Lambda.Model.ErrorObject; using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; @@ -24,6 +25,10 @@ namespace Amazon.Lambda.DurableExecution.Internal; /// and throw . /// SUCCEEDED: return cached deserialized result; user func is /// NOT re-executed. +/// SUCCEEDED (overflow): ReplayChildren=true + empty +/// payload (the result was too large to checkpoint inline) → re-run the +/// user func to recover the large result value; terminal checkpoints +/// (SUCCEED/FAIL) are suppressed since the op is already terminal. /// FAILED: throw with the /// recorded error; if is /// set, the mapped exception is thrown instead. @@ -45,6 +50,8 @@ internal sealed class ChildContextOperation : DurableOperation private readonly WorkflowCancellation _workflowCancellation; private readonly CancellationToken _cooperativeBailToken; private readonly bool _isVirtual; + // Set once on overflow-replay re-execution; never reset. + private bool _suppressTerminalCheckpoint; public ChildContextOperation( string operationId, @@ -105,6 +112,14 @@ protected override Task ReplayAsync(Operation existing, CancellationToken can switch (existing.Status) { case OperationStatuses.Succeeded: + // Overflow: the result was too large to checkpoint inline + // (ReplayChildren=true, empty payload). Re-run the body to recover + // the value; the body's inner ops replay from their own + // checkpoints. Do NOT re-emit the (already terminal) SUCCEED. + if (existing.ContextDetails?.ReplayChildren == true) + { + return ExecuteFuncNoCheckpoint(cancellationToken); + } // Side-effecting code runs at most once: replay returns the // cached result without invoking the user func. return Task.FromResult(DeserializeResult(existing.ContextDetails?.Result)); @@ -127,6 +142,12 @@ protected override Task ReplayAsync(Operation existing, CancellationToken can } } + private Task ExecuteFuncNoCheckpoint(CancellationToken cancellationToken) + { + _suppressTerminalCheckpoint = true; + return ExecuteFunc(cancellationToken); + } + private async Task ExecuteFunc(CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); @@ -180,8 +201,11 @@ private async Task ExecuteFunc(CancellationToken cancellationToken) { // Virtual branches suppress the FAIL checkpoint but still propagate // the exception — the orchestrator records the failure inline on the - // parent payload. - if (!_isVirtual) + // parent payload. Overflow-replay re-execution also suppresses it: the + // op is already terminal (SUCCEEDED) in the store, so re-emitting a + // FAIL would corrupt that record (mirrors ReplayChildrenAsync, which + // never re-checkpoints). The exception still propagates below. + if (!_isVirtual && !_suppressTerminalCheckpoint) { await EnqueueAsync(new SdkOperationUpdate { @@ -205,8 +229,17 @@ await EnqueueAsync(new SdkOperationUpdate // Virtual branches suppress the SUCCEED checkpoint; the orchestrator // serializes the result inline on the parent payload instead. - if (!_isVirtual) + // _suppressTerminalCheckpoint is set on overflow replay re-execution: the + // child is already terminal in the store, so we re-run only to recover the + // in-memory value and must NOT re-emit a SUCCEED. + if (!_isVirtual && !_suppressTerminalCheckpoint) { + var serialized = SerializeResult(result); + // Overflow: result too large to checkpoint inline. Emit an empty + // payload + ReplayChildren so replay re-executes this body to recover + // the value (mirrors the concurrent-operation overflow strategy). + var overflow = Encoding.UTF8.GetByteCount(serialized) > DurableConstants.MaxOperationCheckpointBytes; + await EnqueueAsync(new SdkOperationUpdate { Id = OperationId, @@ -215,7 +248,10 @@ await EnqueueAsync(new SdkOperationUpdate Action = OperationAction.SUCCEED, SubType = _config?.SubType, Name = Name, - Payload = SerializeResult(result) + Payload = overflow ? string.Empty : serialized, + ContextOptions = overflow + ? new SdkContextOptions { ReplayChildren = true } + : null }, cancellationToken); } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ConcurrentOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ConcurrentOperation.cs index d15361b44..33ba97965 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ConcurrentOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ConcurrentOperation.cs @@ -6,6 +6,7 @@ using System.Text.Json; using Amazon.Lambda; using Amazon.Lambda.Core; +using SdkContextOptions = Amazon.Lambda.Model.ContextOptions; using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; namespace Amazon.Lambda.DurableExecution.Internal; @@ -139,8 +140,23 @@ await EnqueueAsync(new SdkOperationUpdate protected override Task> ReplayAsync(Operation existing, CancellationToken cancellationToken) { + // Overflow replay: the parent was checkpointed with a stripped summary and + // ReplayChildren=true because the inline results exceeded the checkpoint + // limit. Re-execute ONLY the units the frozen summary marks SUCCEEDED or + // FAILED to recover their stripped result VALUE / Error; units marked + // STARTED (short-circuited, never dispatched) are skipped. Per-unit status + // and completion reason stay authoritative from the frozen summary, and the + // parent — already terminal — is NOT re-checkpointed. + var replayChildren = existing.ContextDetails?.ReplayChildren == true + && (existing.Status == OperationStatuses.Succeeded + || existing.Status == OperationStatuses.Failed); + switch (existing.Status) { + case OperationStatuses.Succeeded when replayChildren: + case OperationStatuses.Failed when replayChildren: + return ReplayChildrenAsync(existing, cancellationToken); + case OperationStatuses.Succeeded: // The parent always checkpoints as SUCCEED — even when // CompletionReason is FailureToleranceExceeded. Reconstruct @@ -380,6 +396,97 @@ void SignalShortCircuit() return result; } + /// + /// Overflow-replay path. The parent was checkpointed with a stripped summary + /// (per-unit Index/Name/Status retained; Result/Error dropped) and + /// ReplayChildren=true. Re-executes ONLY the units the frozen summary + /// marks SUCCEEDED or FAILED — to recover their stripped result value / error + /// — and skips units marked STARTED so their bodies do not re-run. Per-unit + /// status and the completion reason come from the frozen summary (authoritative), + /// not from this run's outcomes; the parent is NOT re-checkpointed. + /// + private async Task> ReplayChildrenAsync(Operation frozen, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + var summary = ParseSummary(frozen.ContextDetails?.Result); + var unitCount = UnitCount; + + var items = new List>(unitCount); + for (var i = 0; i < unitCount; i++) + { + var (unitName, _) = GetUnit(i); + var summaryEntry = summary?.Units.FirstOrDefault(b => b.Index == i); + + // Frozen per-unit status is authoritative. + var status = summaryEntry != null + ? DeserializeStatus(summaryEntry.Status) + : BatchItemStatus.Started; + + // Same unit-name drift check as ReconstructFromCheckpoints: code must + // not change the order or name of concurrent units between deployments. + var checkpointedName = summaryEntry?.Name; + if (checkpointedName != null && unitName != null && checkpointedName != unitName) + { + throw new NonDeterministicExecutionException( + $"Non-deterministic execution detected for {OperationNoun.ToLowerInvariant()} unit {i} of operation " + + $"'{Name ?? OperationId}': expected name '{unitName}' but found '{checkpointedName}' " + + $"from a previous invocation. Code must not change the order or name of concurrent " + + $"units between deployments."); + } + var resolvedName = checkpointedName ?? unitName; + + T? unitResult = default; + DurableExecutionException? unitError = null; + + // Re-execute only completed units to recover the stripped value/error. + // STARTED units were short-circuited (never dispatched) originally — + // do NOT run their bodies, so there are no spurious side effects. + if (status == BatchItemStatus.Succeeded || status == BatchItemStatus.Failed) + { + var outcome = await RunSingleUnitAsync(i, cancellationToken).ConfigureAwait(false); + if (status == BatchItemStatus.Succeeded) + { + unitResult = outcome.Result; + } + else + { + // Frozen status is authoritative. If a unit frozen as Failed + // re-executes to success here (non-deterministic body), it stays + // Failed but Error stays null — the original error was stripped on + // overflow and only returns if the body re-throws. Recovering a + // frozen-Succeeded unit's value is the common, supported case. + unitError = outcome.Error; + } + } + + items.Add(new BatchItem + { + Index = i, + Name = resolvedName, + Status = status, + Result = unitResult, + Error = unitError + }); + } + + // Completion reason is pinned from the frozen summary; fall back to + // recomputing only if the summary is absent/corrupt. + var completionReason = summary != null + ? DeserializeCompletionReason(summary.CompletionReason) + : ComputeCompletionReason(items, unitCount); + + var result = new BatchResult(items, completionReason); + + // No re-checkpoint: the parent is already terminal in state. + if (completionReason == CompletionReason.FailureToleranceExceeded) + { + throw BuildException(result); + } + + return result; + } + private async Task RunUnitAsync( int index, UnitOutcome[] slots, @@ -494,6 +601,84 @@ private async Task RunUnitAsync( } } + /// + /// Builds and runs a single unit's and + /// maps the result/exception to a . Shared by the + /// concurrent dispatch loop () and the overflow + /// ReplayChildren path (). Per-unit graceful + /// failures are captured as ; workflow-level + /// and parent-token-cancellation exceptions propagate. + /// + private async Task RunSingleUnitAsync(int index, CancellationToken cancellationToken) + { + var (unitName, unitFunc) = GetUnit(index); + var childOpId = OperationIdGenerator.HashOperationId($"{OperationId}-{index + 1}"); + + var childOp = new ChildContextOperation( + childOpId, + unitName, + OperationId, + unitFunc, + new ChildContextConfig { SubType = ChildSubType }, + Serializer, + ChildContextFactory, + State, + Termination, + _workflowCancellation, + DurableExecutionArn, + Batcher, + isVirtual: _isVirtual); + + try + { + var result = await childOp.ExecuteAsync(cancellationToken).ConfigureAwait(false); + return new UnitOutcome { Status = BatchItemStatus.Succeeded, Result = result }; + } + catch (ChildContextException ex) + { + return new UnitOutcome { Status = BatchItemStatus.Failed, Error = ex }; + } + catch (DurableExecutionException) + { + // E.g. NonDeterministicExecutionException — these are not "unit + // failed gracefully" but workflow-level problems. Surface them: + // re-throw out of the operation (the orchestrator's outer flow + // handles it). + throw; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Parent-token cancellation: per cross-cutting decision Q10, OCE + // escapes unwrapped. Don't write a slot — Task.WhenAll observes + // this and the orchestrator re-throws after settling. + throw; + } + catch (OperationCanceledException ex) + { + // Unit-internal cancellation that is NOT tied to the parent token + // (e.g. the unit's own CancellationTokenSource fired). Treat it as + // a normal per-unit failure rather than killing the operation as + // cancelled. + var wrapped = new ChildContextException(ex.Message, ex) + { + SubType = ChildSubType, + ErrorType = ex.GetType().FullName + }; + return new UnitOutcome { Status = BatchItemStatus.Failed, Error = wrapped }; + } + catch (Exception ex) + { + // Wrap unexpected exceptions as ChildContextException — they're + // per-unit failures from the user's POV. + var wrapped = new ChildContextException(ex.Message, ex) + { + SubType = ChildSubType, + ErrorType = ex.GetType().FullName + }; + return new UnitOutcome { Status = BatchItemStatus.Failed, Error = wrapped }; + } + } + private CompletionReason ComputeCompletionReason(IReadOnlyList> items, int totalCount) { var succeeded = 0; @@ -526,42 +711,51 @@ private async Task CheckpointParentResultAsync( CompletionReason completionReason, CancellationToken cancellationToken) { - var summary = new BatchSummary - { - CompletionReason = SerializeCompletionReason(completionReason), - Units = new List(result.All.Count) - }; - for (var i = 0; i < result.All.Count; i++) + // Local builder: includeInline=true writes per-unit Result/Error inline + // (Flat only); includeInline=false writes the minimal index/name/status + // map (the shape Nested always uses, and the Flat overflow fallback). + BatchSummary BuildSummary(bool includeInline) { - var item = result.All[i]; - var unit = new BatchUnitSummary + var s = new BatchSummary { - Index = item.Index, - Name = item.Name, - Status = SerializeStatus(item.Status) + CompletionReason = SerializeCompletionReason(completionReason), + Units = new List(result.All.Count) }; - - // Flat (virtual) units emit no child checkpoint, so their per-unit - // result/error has nowhere to live except inline on this summary. - // Nested units leave these null — they're read from each child's own - // CONTEXT checkpoint on replay. - if (_isVirtual) + for (var i = 0; i < result.All.Count; i++) { - if (item.Status == BatchItemStatus.Succeeded) + var item = result.All[i]; + var unit = new BatchUnitSummary { - unit.Result = SerializeResult(item.Result); - } - else if (item.Status == BatchItemStatus.Failed && item.Error != null) + Index = item.Index, + Name = item.Name, + Status = SerializeStatus(item.Status) + }; + if (includeInline && _isVirtual) { - unit.Error = ErrorObject.FromException(item.Error); + if (item.Status == BatchItemStatus.Succeeded) + unit.Result = SerializeResult(item.Result); + else if (item.Status == BatchItemStatus.Failed && item.Error != null) + unit.Error = ErrorObject.FromException(item.Error); } + s.Units.Add(unit); } - - summary.Units.Add(unit); + return s; } + var summary = BuildSummary(includeInline: true); var payload = JsonSerializer.Serialize(summary, BatchJsonContext.Default.BatchSummary); + // Flat overflow: the inline per-unit results pushed the summary over the + // checkpoint limit. Re-emit a stripped summary (statuses only) and flag + // ReplayChildren so replay reconstructs the values by re-executing units. + var overflow = _isVirtual + && Encoding.UTF8.GetByteCount(payload) > DurableConstants.MaxOperationCheckpointBytes; + if (overflow) + { + summary = BuildSummary(includeInline: false); + payload = JsonSerializer.Serialize(summary, BatchJsonContext.Default.BatchSummary); + } + // Always checkpoint as SUCCEED — even when FailureToleranceExceeded. // The completion reason lives inside the payload, matching the wire // format of the Python/JS/Java SDKs. The exception is thrown SDK-side @@ -573,7 +767,10 @@ await EnqueueAsync(new SdkOperationUpdate Action = OperationAction.SUCCEED, SubType = ParentSubType, Name = Name, - Payload = payload + Payload = payload, + ContextOptions = overflow + ? new SdkContextOptions { ReplayChildren = true } + : null }, cancellationToken); } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/DurableConstants.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/DurableConstants.cs new file mode 100644 index 000000000..99225a6fa --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/DurableConstants.cs @@ -0,0 +1,20 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace Amazon.Lambda.DurableExecution.Internal; + +/// +/// Size limits for durable-execution payload overflow handling. These are the +/// SDK's chosen overflow *trigger* thresholds for cross-SDK parity (Python/Java +/// use the same 256 KB), not the AWSSDK.Lambda hard field caps (those are 6 MB). +/// +internal static class DurableConstants +{ + /// + /// Serialized-payload byte length above which a concurrent/child-context + /// operation switches to the ReplayChildren overflow strategy: + /// strip the inline result from the checkpoint and reconstruct on replay by + /// re-executing the unit/child bodies. 256 KB (262,144 bytes). + /// + internal const int MaxOperationCheckpointBytes = 256 * 1024; +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs index ca358a46b..412dd2111 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs @@ -143,6 +143,14 @@ public sealed class ContextDetails /// Error from the child context, if any. [JsonPropertyName("Error")] public ErrorObject? Error { get; set; } + + /// + /// When true on a completed CONTEXT operation, the operation's result + /// was too large to checkpoint inline; per-unit/child state is reconstructed + /// on replay by re-executing the children rather than read from this payload. + /// + [JsonPropertyName("ReplayChildren")] + public bool? ReplayChildren { get; set; } } /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs index a38dda31b..d787a529b 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs @@ -161,7 +161,8 @@ private static Operation MapFromSdkOperation(SdkOperation sdkOp) ContextDetails = sdkOp.ContextDetails != null ? new ContextDetails { Result = sdkOp.ContextDetails.Result, - Error = MapError(sdkOp.ContextDetails.Error) + Error = MapError(sdkOp.ContextDetails.Error), + ReplayChildren = sdkOp.ContextDetails.ReplayChildren } : null, CallbackDetails = sdkOp.CallbackDetails != null ? new CallbackDetails { @@ -177,6 +178,9 @@ private static Operation MapFromSdkOperation(SdkOperation sdkOp) }; } + /// Test-only access to . + internal static Operation MapFromSdkOperationForTest(SdkOperation sdkOp) => MapFromSdkOperation(sdkOp); + /// /// Maps an SDK into the /// internal . Carries every field the wire object diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ParallelFlatOverflowTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ParallelFlatOverflowTest.cs new file mode 100644 index 000000000..21db02c6f --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ParallelFlatOverflowTest.cs @@ -0,0 +1,161 @@ +using System.Linq; +using System.Security.Cryptography; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class ParallelFlatOverflowTest +{ + private readonly ITestOutputHelper _output; + public ParallelFlatOverflowTest(ITestOutputHelper output) => _output = output; + + /// + /// Reproduces the deterministic operation ID the SDK assigns. Branch op ids + /// are SHA-256(parentOpId + "-" + (index+1)); inner-op ids nest the same way + /// under the branch op id. Reproduced locally because OperationIdGenerator is + /// internal to the SDK. + /// + private static string HashOpId(string raw) + { + var bytes = Encoding.UTF8.GetBytes(raw); + var hash = SHA256.HashData(bytes); + var sb = new StringBuilder(hash.Length * 2); + foreach (var b in hash) sb.Append(b.ToString("x2")); + return sb.ToString(); + } + + /// + /// End-to-end exercise of the LARGE-PAYLOAD OVERFLOW + ReplayChildren replay path + /// for a parallel. + /// + /// Three branches each return a deterministic ~150 KB string (~450 KB aggregate), + /// which exceeds the 256 KB checkpoint threshold, so the parallel OVERFLOWS: the SDK + /// checkpoints a STRIPPED summary (no inline results) and sets + /// ContextOptions.ReplayChildren=true on the parent CONTEXT op. + /// + /// The workflow is shaped to actually drive the RECOVERY path (ReplayChildrenAsync): + /// - invoke 1: branches suspend on their in-branch waits -> PENDING. + /// - invoke 2: the parallel re-runs the branches, overflow-checkpoints the parent + /// as SUCCEEDED + ReplayChildren, then suspends on the post-parallel + /// "post-overflow" wait (so the parallel does NOT also return in this invoke). + /// - invoke 3: re-enters the already-terminal SUCCEEDED + ReplayChildren parallel, + /// routing through ReplayChildrenAsync to RE-EXECUTE the branch bodies and + /// recover the stripped values (reading per-unit Status/CompletionReason from the + /// frozen summary, never re-checkpointing). The final result is computed from + /// those recovered values. + /// + /// This test proves the whole path works against the real durable-execution service: + /// 1. The execution SUCCEEDED — proving the overflow checkpoint was accepted AND + /// ReplayChildrenAsync correctly reconstructed the aggregate result. (If the + /// ReplayChildren recovery path were broken, reconstruction would fail and the + /// execution would FAIL/TIME_OUT.) + /// 2. Exactly ONE parent CONTEXT op exists — Flat emits no per-branch CONTEXT. + /// 3. The three "generate" steps succeeded and re-parent to the Parallel op. + /// 4. There were >= 3 InvocationCompleted events (initial PENDING + the resume that + /// overflow-checkpoints the parallel + the post-overflow resume that runs + /// ReplayChildrenAsync) — proving the parallel was re-entered while terminal, so + /// the ReplayChildren recovery path really ran. + /// 5. The FINAL execution result (read via GetExecutionAsync after SUCCEEDED, not + /// the first PENDING invoke response) reports the recovered per-branch lengths + /// ("153600" x3) and first chars ("abc") — proving the large deterministic + /// values were recovered EXACTLY by ReplayChildrenAsync, not lost or defaulted. + /// + [Fact] + public async Task Parallel_Flat_Overflow_ReplaysChildren_AndRecoversLargeResults() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("ParallelFlatOverflowFunction"), + "pflow", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "po1"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // SUCCEEDED alone proves the >256 KB overflow checkpoint was accepted and that + // ReplayChildrenAsync (re-entered on the post-overflow resume) reconstructed the + // result. A broken overflow recovery would FAIL or TIME_OUT here. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + // The parallel parent is the first root-level operation -> SHA256("1"). + var parentOpId = HashOpId("1"); + var branchOpIds = new[] + { + HashOpId($"{parentOpId}-1"), + HashOpId($"{parentOpId}-2"), + HashOpId($"{parentOpId}-3"), + }; + // Each branch's "generate" step is the 1st inner op under that branch's own id + // space: SHA256("-1"). + var expectedStepIds = branchOpIds.Select(b => HashOpId($"{b}-1")).ToList(); + + // Wait until the parent CONTEXT succeeded and all three branches' inner step + + // wait events are visible. + var history = await deployment.WaitForHistoryAsync( + arn!, + h => + { + var events = h.Events ?? new List(); + if (events.Count(e => e.EventType == EventType.ContextSucceeded) < 1) return false; + if (events.Count(e => e.EventType == EventType.StepSucceeded) < 3) return false; + if (events.Count(e => e.EventType == EventType.WaitSucceeded) < 3) return false; + return true; + }, + TimeSpan.FromSeconds(60)); + var allEvents = history.Events ?? new List(); + + // 2. Exactly ONE CONTEXT operation exists — the parent Parallel op. No + // per-branch CONTEXT events under Flat (even on the overflow path). + var contextStartedIds = allEvents + .Where(e => e.EventType == EventType.ContextStarted) + .Select(e => e.Id) + .Distinct() + .ToList(); + Assert.Equal(new[] { parentOpId }, contextStartedIds); + Assert.Empty(allEvents.Where(e => + e.EventType == EventType.ContextStarted && branchOpIds.Contains(e.Id))); + + // 3. Each branch's "generate" step re-parents to the Parallel op (NOT to its + // virtual branch op), and the three step ids match the per-branch id space. + var generateSteps = allEvents + .Where(e => e.EventType == EventType.StepSucceeded && e.Name == "generate") + .ToList(); + Assert.Equal(3, generateSteps.Count); + Assert.All(generateSteps, e => Assert.Equal(parentOpId, e.ParentId)); + + var observedStepIds = generateSteps.Select(e => e.Id).Distinct().ToList(); + Assert.Equal(3, observedStepIds.Count); + foreach (var expected in expectedStepIds) + { + Assert.Contains(expected, observedStepIds); + } + + // 4. There are at least 3 invocations: the initial PENDING, the resume that + // overflow-checkpoints the parallel and suspends on the post-overflow wait, and + // the post-overflow resume that re-enters the already-terminal parallel and runs + // ReplayChildrenAsync. >= 3 proves the parallel was re-entered while terminal, so + // the ReplayChildren recovery path really ran (>= 2 alone would only prove a + // single suspend/resume cycle). + var invocations = allEvents.Where(e => e.InvocationCompletedDetails != null).ToList(); + Assert.True( + invocations.Count >= 3, + $"Expected >= 3 InvocationCompleted events (initial + overflow-checkpoint resume + post-overflow ReplayChildren resume), got {invocations.Count}"); + + // 5. The FINAL execution result (NOT the first invoke response, which is PENDING + // because the branch waits suspend it) reports the recovered per-branch metadata. + // Each branch produced a 150 KB (153600-byte) string built from its branch char, + // so a correct ReplayChildrenAsync recovery yields lengths "153600,153600,153600" + // and first chars "abc". This proves the large values were recovered EXACTLY by + // the ReplayChildren path, not lost or defaulted. + var execution = await deployment.GetExecutionAsync(arn!); + Assert.NotNull(execution.Result); + Assert.Contains("\"Lengths\":\"153600,153600,153600\"", execution.Result, StringComparison.OrdinalIgnoreCase); + Assert.Contains("\"FirstChars\":\"abc\"", execution.Result, StringComparison.OrdinalIgnoreCase); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFlatOverflowFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFlatOverflowFunction/Dockerfile new file mode 100644 index 000000000..cc1bdedd3 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFlatOverflowFunction/Dockerfile @@ -0,0 +1,5 @@ +FROM public.ecr.aws/lambda/dotnet:10 + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFlatOverflowFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFlatOverflowFunction/Function.cs new file mode 100644 index 000000000..1e66fa411 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFlatOverflowFunction/Function.cs @@ -0,0 +1,103 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + // Each branch produces a ~150 KB string. Three branches => ~450 KB of inline + // results, comfortably over the 256 KB checkpoint threshold. This forces the + // FLAT parallel aggregate to OVERFLOW: the SDK checkpoints a stripped summary + // (no inline results) and sets ContextOptions.ReplayChildren=true on the parent + // CONTEXT op, keeping the full result in memory for the current invoke. + private const int BranchPayloadSize = 150 * 1024; // 153600 bytes + + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // Three branches run under NestingType.Flat. Each branch generates a LARGE + // (~150 KB) string inside a step, then does an in-branch durable wait. The + // combined ~450 KB aggregate exceeds the 256 KB threshold, so the parallel + // OVERFLOWS: the SDK checkpoints a stripped summary (no inline per-branch + // results) + ReplayChildren=true on the parent CONTEXT op. + // + // To actually exercise the RECOVERY path (ReplayChildrenAsync), the + // already-overflowed parallel must be re-entered on a FRESH invoke while it is + // already terminal (SUCCEEDED + ReplayChildren). The in-branch waits alone are + // NOT enough: the resume invoke that overflow-checkpoints the parallel also + // immediately returns SUCCEEDED, so the parallel goes STARTED -> SUCCEEDED in a + // single invoke and ReplayChildrenAsync is never hit. So we add a durable wait + // AFTER ParallelAsync returns (the "post-overflow" wait below): the overflow + // invoke suspends on that wait, and the NEXT invoke re-enters the already- + // terminal overflowed parallel and routes through ReplayChildrenAsync to + // RE-EXECUTE the branch bodies and recover the stripped values (reading per-unit + // Status/CompletionReason from the frozen summary, never re-checkpointing). + // + // The branch values are built DETERMINISTICALLY from the branch character + // (NOT Guid/random/DateTime). This is critical: the value produced on the + // original execution must be IDENTICAL to the value produced on replay + // re-execution, so the test can prove the large values were recovered exactly + // rather than lost or defaulted. + var batch = await context.ParallelAsync( + new[] + { + new DurableBranch("a", (ctx, _) => BranchAsync(ctx, 'a')), + new DurableBranch("b", (ctx, _) => BranchAsync(ctx, 'b')), + new DurableBranch("c", (ctx, _) => BranchAsync(ctx, 'c')), + }, + name: "fanout", + config: new ParallelConfig { NestingType = NestingType.Flat }); + + // Force another invocation so the already-overflowed parallel is re-entered + // (already SUCCEEDED + ReplayChildren) and replayed via ReplayChildrenAsync, + // which re-executes the branch bodies to recover the stripped >256 KB results. + await context.WaitAsync(TimeSpan.FromSeconds(1), name: "post-overflow"); + + // Compute the verifiable metadata AFTER the post-overflow wait: on the final + // invoke these results come from ReplayChildrenAsync's re-execution, which is + // exactly the recovery we want to prove survives. + var results = batch.GetResults().ToList(); + + // Keep the returned payload SMALL (well under the 6 MB Lambda response + // limit): do NOT echo the ~450 KB back. Instead return verifiable metadata + // proving the large values were recovered on replay: + // - Lengths: comma-joined per-branch result LENGTHS (e.g. "153600,153600,153600") + // - FirstChars: the first character of each recovered branch result, in order + // (e.g. "abc") — confirms each branch's deterministic content survived. + var lengths = string.Join(",", results.Select(r => r.Length)); + var firstChars = string.Concat(results.Select(r => r.Length > 0 ? r[0] : '?')); + + return new TestResult { Status = "completed", Lengths = lengths, FirstChars = firstChars }; + } + + private static async Task BranchAsync(IDurableContext ctx, char branchChar) + { + // Deterministic large payload: same branchChar => same string on original + // execution and on replay re-execution. ~150 KB per branch. + var large = await ctx.StepAsync( + async (_, _) => { await Task.CompletedTask; return new string(branchChar, BranchPayloadSize); }, + name: "generate"); + + // Force a suspend/resume cycle to trigger replay of the (overflowed) parallel. + await ctx.WaitAsync(TimeSpan.FromSeconds(2), name: "boundary"); + + return large; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Lengths { get; set; } public string? FirstChars { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFlatOverflowFunction/ParallelFlatOverflowFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFlatOverflowFunction/ParallelFlatOverflowFunction.csproj new file mode 100644 index 000000000..f8bf7fd0c --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFlatOverflowFunction/ParallelFlatOverflowFunction.csproj @@ -0,0 +1,18 @@ + + + + net10.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CheckpointBatcherTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CheckpointBatcherTests.cs index effeb5804..d5e91ec37 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CheckpointBatcherTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CheckpointBatcherTests.cs @@ -172,6 +172,58 @@ public async Task EnqueueAsync_AfterDispose_Throws() await Assert.ThrowsAnyAsync(() => batcher.EnqueueAsync(Update("0-step"))); } + private static SdkOperationUpdate UpdateWithPayload(string id, int payloadBytes) => new() + { + Id = id, + Type = "CONTEXT", + Action = "SUCCEED", + Payload = new string('p', payloadBytes) + }; + + [Fact] + public async Task EnqueueAsync_ByteCap_SplitsBatchesByBytes() + { + var batchByteTotals = new List(); + var batcher = new CheckpointBatcher("token-0", + (token, ops, ct) => + { + long sum = 0; + foreach (var o in ops) sum += o.Payload?.Length ?? 0; + batchByteTotals.Add(sum); + return Task.FromResult(token); + }, + new CheckpointBatcherConfig + { + MaxBatchBytes = 10 * 1024, + FlushInterval = TimeSpan.FromMilliseconds(100) + }); + + // Three 6 KB payloads: at most one fits per 10 KB batch with overhead. + var tasks = Enumerable.Range(0, 3) + .Select(i => batcher.EnqueueAsync(UpdateWithPayload($"{i}", 6 * 1024))) + .ToArray(); + await Task.WhenAll(tasks); + await batcher.DrainAsync(); + + Assert.True(batchByteTotals.Count >= 2, "expected the byte cap to split into multiple batches"); + Assert.All(batchByteTotals, total => Assert.True(total <= 10 * 1024)); + } + + [Fact] + public async Task EnqueueAsync_SingleOversizedItem_SentAloneNoLoop() + { + var batches = new List(); + var batcher = new CheckpointBatcher("token-0", + (token, ops, ct) => { batches.Add(ops.Count); return Task.FromResult(token); }, + new CheckpointBatcherConfig { MaxBatchBytes = 4 * 1024 }); + + await batcher.EnqueueAsync(UpdateWithPayload("huge", 50 * 1024)); + await batcher.DrainAsync(); + + Assert.Single(batches); + Assert.Equal(1, batches[0]); + } + [Fact] public async Task CheckpointToken_UpdatesAfterEachFlush() { diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs index 8d1d9d591..b8b2e952b 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs @@ -501,6 +501,94 @@ await Assert.ThrowsAsync(() => name: "phase")); } + [Fact] + public async Task RunInChildContextAsync_ResultOverThreshold_EmitsEmptyPayloadAndReplayChildren() + { + var (context, recorder, _, _) = CreateContext(); + var big = new string('y', 300 * 1024); + + var result = await context.RunInChildContextAsync( + async (_, _) => { await Task.Yield(); return big; }, + name: "phase"); + + Assert.Equal(big, result); // in-memory value intact for this invoke + + await recorder.Batcher.DrainAsync(); + + var succeed = recorder.Flushed.Single(o => + o.Type == "CONTEXT" && o.Action == "SUCCEED"); + Assert.Equal(string.Empty, succeed.Payload); + Assert.NotNull(succeed.ContextOptions); + Assert.True(succeed.ContextOptions.ReplayChildren); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayChildren_ReExecutesBodyWithoutRecheckpoint() + { + var childOpId = IdAt(1); // first root-level op + + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = childOpId, + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + Name = "phase", + // Result == "" matches the overflow emission (string.Empty). + ContextDetails = new ContextDetails { Result = "", ReplayChildren = true } + } + } + }); + + var executed = false; + var result = await context.RunInChildContextAsync( + async (_, _) => { executed = true; await Task.Yield(); return "rebuilt"; }, + name: "phase"); + + Assert.True(executed); + Assert.Equal("rebuilt", result); + + await recorder.Batcher.DrainAsync(); + // Already-terminal child must not be re-checkpointed. + Assert.DoesNotContain(recorder.Flushed, o => o.Type == "CONTEXT" && o.Action == "SUCCEED"); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayChildren_BodyThrows_DoesNotEmitFailCheckpoint() + { + var childOpId = IdAt(1); // first root-level op + + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = childOpId, + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + Name = "phase", + // Result == "" matches the overflow emission (string.Empty). + ContextDetails = new ContextDetails { Result = "", ReplayChildren = true } + } + } + }); + + // The op is already terminal (SUCCEEDED). If the overflow re-run body + // throws, the recovery path must NOT re-checkpoint a CONTEXT FAIL over + // the already-SUCCEEDED record — but the exception still propagates. + await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_, _) => { await Task.Yield(); throw new InvalidOperationException("nondeterministic re-run"); }, + name: "phase")); + + await recorder.Batcher.DrainAsync(); + Assert.DoesNotContain(recorder.Flushed, o => o.Type == "CONTEXT" && o.Action == "FAIL"); + } + [Fact] public async Task RunInChildContextAsync_SubTypeAndName_PropagateToCheckpoint() { diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs index ab649f150..a49b8488e 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs @@ -382,6 +382,26 @@ public async Task GetExecutionStateAsync_MapFromSdkOperation_RoundTripsAllErrorF Assert.Equal(new[] { "at Frame.One()", "at Frame.Two()" }, invError.StackTrace!); } + [Fact] + public void MapFromSdkOperation_CopiesReplayChildren() + { + var sdkOp = new Amazon.Lambda.Model.Operation + { + Id = "ctx-1", + Type = "CONTEXT", + Status = "SUCCEEDED", + ContextDetails = new Amazon.Lambda.Model.ContextDetails + { + Result = "{}", + ReplayChildren = true + } + }; + + var mapped = LambdaDurableServiceClient.MapFromSdkOperationForTest(sdkOp); + + Assert.True(mapped.ContextDetails!.ReplayChildren); + } + [Fact] public async Task CheckpointAsync_ReturnsNewToken() { diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs index 786feee5f..750750f0d 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs @@ -671,6 +671,232 @@ public async Task ParallelAsync_NestingTypeFlat_PartialFailure_SurfacesInlineErr o.Type == "CONTEXT" && o.SubType == "ParallelBranch")); } + [Fact] + public async Task ParallelAsync_Flat_ResultOverThreshold_StripsInlineResultsAndSetsReplayChildren() + { + var (context, recorder, _, _) = CreateContext(); + + // Each branch returns a ~200 KB string; the summary with both inline + // exceeds the 256 KB checkpoint threshold. + var big = new string('x', 200 * 1024); + var result = await context.ParallelAsync( + new Func>[] + { + async (_, _) => { await Task.Yield(); return big; }, + async (_, _) => { await Task.Yield(); return big; }, + }, + name: "fanout", + config: new ParallelConfig { NestingType = NestingType.Flat }); + + // In-memory result for the current invoke still carries the full values. + Assert.Equal(2, result.SuccessCount); + Assert.All(result.GetResults(), r => Assert.Equal(big, r)); + + await recorder.Batcher.DrainAsync(); + + var parentSucceed = recorder.Flushed.Single(o => + o.Type == "CONTEXT" && o.SubType == "Parallel" && o.Action == "SUCCEED"); + + // Overflow: ReplayChildren flag set, payload stripped under the threshold. + Assert.NotNull(parentSucceed.ContextOptions); + Assert.True(parentSucceed.ContextOptions.ReplayChildren); + Assert.True(System.Text.Encoding.UTF8.GetByteCount(parentSucceed.Payload) + <= Amazon.Lambda.DurableExecution.Internal.DurableConstants.MaxOperationCheckpointBytes); + // Stripped summary keeps statuses but not the big inline results. + Assert.DoesNotContain(big, parentSucceed.Payload); + Assert.Contains("SUCCEEDED", parentSucceed.Payload); + } + + [Fact] + public async Task ParallelAsync_Flat_ReplayChildren_ReExecutesBodiesWithoutRecheckpointing() + { + var parentOpId = IdAt(1); + + // Stripped summary: statuses present, NO inline Result values. + var summaryJson = """ + {"CompletionReason":"ALL_COMPLETED","Units":[ + {"Index":0,"Name":"0","Status":"SUCCEEDED"}, + {"Index":1,"Name":"1","Status":"SUCCEEDED"} + ]} + """; + + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentOpId, + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + SubType = OperationSubTypes.Parallel, + Name = "fanout", + ContextDetails = new ContextDetails + { + Result = summaryJson, + ReplayChildren = true + } + } + } + }); + + var executions = 0; + var result = await context.ParallelAsync( + new Func>[] + { + async (_, _) => { Interlocked.Increment(ref executions); await Task.Yield(); return 100; }, + async (_, _) => { Interlocked.Increment(ref executions); await Task.Yield(); return 200; }, + }, + name: "fanout", + config: new ParallelConfig { NestingType = NestingType.Flat }); + + // Bodies re-executed (values recovered), statuses/reason from frozen summary. + Assert.Equal(2, executions); + Assert.Equal(new[] { 100, 200 }, result.GetResults()); + Assert.Equal(CompletionReason.AllCompleted, result.CompletionReason); + + await recorder.Batcher.DrainAsync(); + + // The parent is already terminal in state — replay must NOT re-emit a + // parent CONTEXT SUCCEED/FAIL. + Assert.DoesNotContain(recorder.Flushed, o => + o.Type == "CONTEXT" && o.SubType == "Parallel"); + } + + [Fact] + public async Task ParallelAsync_Flat_ReplayChildren_SkipsStartedUnits_ReExecutesCompletedOnly() + { + var parentOpId = IdAt(1); + + // Stripped summary: two units short-circuited the run with MinSuccessful=2 + // (SUCCEEDED, SUCCEEDED), the third was never dispatched (STARTED). On + // overflow replay only the two completed units re-execute; the started + // unit's body must NOT run. + var summaryJson = """ + {"CompletionReason":"MIN_SUCCESSFUL_REACHED","Units":[ + {"Index":0,"Name":"0","Status":"SUCCEEDED"}, + {"Index":1,"Name":"1","Status":"SUCCEEDED"}, + {"Index":2,"Name":"2","Status":"STARTED"} + ]} + """; + + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentOpId, + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + SubType = OperationSubTypes.Parallel, + Name = "fanout", + ContextDetails = new ContextDetails + { + Result = summaryJson, + ReplayChildren = true + } + } + } + }); + + var executions = 0; + var startedBodyRan = false; + var result = await context.ParallelAsync( + new Func>[] + { + async (_, _) => { Interlocked.Increment(ref executions); await Task.Yield(); return 100; }, + async (_, _) => { Interlocked.Increment(ref executions); await Task.Yield(); return 200; }, + async (_, _) => { startedBodyRan = true; Interlocked.Increment(ref executions); await Task.Yield(); return 300; }, + }, + name: "fanout", + config: new ParallelConfig + { + NestingType = NestingType.Flat, + CompletionConfig = new CompletionConfig { MinSuccessful = 2 } + }); + + // Only the two SUCCEEDED unit bodies re-execute; the STARTED unit is skipped. + Assert.Equal(2, executions); + Assert.False(startedBodyRan); + + // Per-item statuses come from the frozen summary. + Assert.Equal(BatchItemStatus.Succeeded, result.All[0].Status); + Assert.Equal(BatchItemStatus.Succeeded, result.All[1].Status); + Assert.Equal(BatchItemStatus.Started, result.All[2].Status); + + // Recovered values for the two succeeded units. + Assert.Equal(new[] { 100, 200 }, result.GetResults()); + Assert.Equal(CompletionReason.MinSuccessfulReached, result.CompletionReason); + + await recorder.Batcher.DrainAsync(); + Assert.DoesNotContain(recorder.Flushed, o => + o.Type == "CONTEXT" && o.SubType == "Parallel"); + } + + [Fact] + public async Task ParallelAsync_Flat_ReplayChildren_ReExecutesFailedUnit_RecoversError() + { + var parentOpId = IdAt(1); + + // Stripped summary: one SUCCEEDED, one FAILED. Errors were stripped on + // overflow, so re-execution recovers them. Tolerated-failure config keeps + // the run from throwing. + var summaryJson = """ + {"CompletionReason":"ALL_COMPLETED","Units":[ + {"Index":0,"Name":"0","Status":"SUCCEEDED"}, + {"Index":1,"Name":"1","Status":"FAILED"} + ]} + """; + + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentOpId, + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + SubType = OperationSubTypes.Parallel, + Name = "fanout", + ContextDetails = new ContextDetails + { + Result = summaryJson, + ReplayChildren = true + } + } + } + }); + + var executions = 0; + var result = await context.ParallelAsync( + new Func>[] + { + async (_, _) => { Interlocked.Increment(ref executions); await Task.Yield(); return 100; }, + async (_, _) => { Interlocked.Increment(ref executions); await Task.Yield(); throw new InvalidOperationException("flat boom"); }, + }, + name: "fanout", + config: new ParallelConfig + { + NestingType = NestingType.Flat, + CompletionConfig = new CompletionConfig { ToleratedFailureCount = 1 } + }); + + // Both bodies re-execute to recover the value and the error. + Assert.Equal(2, executions); + Assert.Equal(BatchItemStatus.Succeeded, result.All[0].Status); + Assert.Equal(BatchItemStatus.Failed, result.All[1].Status); + Assert.Equal(100, result.All[0].Result); + Assert.NotNull(result.All[1].Error); + Assert.Contains("flat boom", result.All[1].Error!.Message); + Assert.Equal(CompletionReason.AllCompleted, result.CompletionReason); + + await recorder.Batcher.DrainAsync(); + Assert.DoesNotContain(recorder.Flushed, o => + o.Type == "CONTEXT" && o.SubType == "Parallel"); + } + [Fact] public async Task ParallelAsync_NestingTypeFlat_ReplaySucceeded_RebuildsFromInlinePayload() {