Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .autover/changes/durable-payload-overflow.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.DurableExecution",
"Type": "Patch",
"ChangelogMessages": [
"Handle large results (over the 256 KB per-operation checkpoint limit) for `ParallelAsync`, `MapAsync`, and `RunInChildContextAsync`. When a Flat concurrent operation's summary or a child context's result exceeds the limit, the inline result is stripped from the checkpoint and the operation is flagged with `ReplayChildren`; on a later replay the unit/child bodies are re-executed to recover the stripped values without re-checkpointing the already-terminal parent. Plain `StepAsync` and `InvokeAsync` results are unaffected, matching the Python/Java/JS SDKs.",
"Enforce the `CheckpointBatcher` request byte limit: the batcher pre-flushes before an operation update would push a batch over the byte (or count) cap, and sends an oversized update on its own."
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ internal sealed class CheckpointBatcher : IAsyncDisposable
private Exception? _terminalError;
private int _disposed;

// Per-update wire-footprint estimate constants. Deliberate over-estimates:
// flushing slightly early is safe, flushing late risks a request-too-large.
private const int PerOpEnvelopeOverheadBytes = 512;
private const int StackFrameOverheadBytes = 8;

/// <summary>
/// Cheap UTF-8 byte estimate of one update's wire footprint — variable string
/// fields plus a fixed envelope. No JSON is produced (AOT-safe). Payload is
/// counted at 2x because it is already-serialized JSON re-escaped as a string
/// value, which roughly doubles for escape-heavy content.
/// </summary>
private static int EstimateUpdateBytes(SdkOperationUpdate u)
{
var size = PerOpEnvelopeOverheadBytes;
// int arithmetic is safe: payloads are bounded by the 6MB Lambda
// invocation-payload cap, so the 2x multiply can never overflow a 32-bit int.
if (u.Payload != null) size += System.Text.Encoding.UTF8.GetByteCount(u.Payload) * 2;
size += ByteCount(u.Id) + ByteCount(u.ParentId) + ByteCount(u.Name);
if (u.Error != null)
{
size += ByteCount(u.Error.ErrorType) + ByteCount(u.Error.ErrorMessage) + ByteCount(u.Error.ErrorData);
if (u.Error.StackTrace != null)
foreach (var line in u.Error.StackTrace)
size += ByteCount(line) + StackFrameOverheadBytes;
}
return size;
}

private static int ByteCount(string? s) => s == null ? 0 : System.Text.Encoding.UTF8.GetByteCount(s);

public CheckpointBatcher(
string? initialCheckpointToken,
Func<string?, IReadOnlyList<SdkOperationUpdate>, CancellationToken, Task<string?>> flushAsync,
Expand Down Expand Up @@ -113,25 +143,43 @@ public async ValueTask DisposeAsync()

private async Task RunWorkerAsync(CancellationToken shutdownToken)
{
// TODO: also enforce _config.MaxBatchBytes here. Today we only cap by
// operation count; an item whose serialized size pushes the batch over
// ~750 KB will be sent and rejected service-side. See CheckpointBatcherConfig.
var batch = new List<BatchItem>(_config.MaxBatchOperations);
// Both caps are enforced: before adding an item that would push the batch
// over MaxBatchOperations OR MaxBatchBytes, the current batch is flushed.
// A lone item already over the byte cap is sent by itself (never loops).
// The byte accumulator is seeded with a fixed reserve covering the request
// prefix (checkpoint token + ARN + array framing) that the per-update
// estimate does not include.
const int RequestEnvelopeReserveBytes = 4 * 1024;
var batch = new PendingBatch(_config.MaxBatchOperations);

async Task AddItemAsync(BatchItem item)
{
var itemBytes = EstimateUpdateBytes(item.Update);
if (batch.Count > 0 &&
(batch.Count + 1 > _config.MaxBatchOperations ||
RequestEnvelopeReserveBytes + batch.Bytes + itemBytes > _config.MaxBatchBytes))
{
await FlushBatchAsync(batch.Items, shutdownToken).ConfigureAwait(false);
batch.Clear();
}

batch.Add(item);

// Lone item already over the cap: send it alone, do not loop.
if (batch.Count == 1 &&
RequestEnvelopeReserveBytes + batch.Bytes > _config.MaxBatchBytes)
{
await FlushBatchAsync(batch.Items, shutdownToken).ConfigureAwait(false);
batch.Clear();
}
}

try
{
while (await _channel.Reader.WaitToReadAsync(shutdownToken).ConfigureAwait(false))
{
// Drain everything currently queued.
while (_channel.Reader.TryRead(out var item))
{
batch.Add(item);
if (batch.Count >= _config.MaxBatchOperations)
{
await FlushBatchAsync(batch, shutdownToken).ConfigureAwait(false);
batch.Clear();
}
}
await AddItemAsync(item).ConfigureAwait(false);

// Optionally wait for late arrivals to coalesce into one batch.
if (_config.FlushInterval > TimeSpan.Zero && batch.Count > 0)
Expand All @@ -143,14 +191,7 @@ private async Task RunWorkerAsync(CancellationToken shutdownToken)
while (await _channel.Reader.WaitToReadAsync(windowCts.Token).ConfigureAwait(false))
{
while (_channel.Reader.TryRead(out var item))
{
batch.Add(item);
if (batch.Count >= _config.MaxBatchOperations)
{
await FlushBatchAsync(batch, shutdownToken).ConfigureAwait(false);
batch.Clear();
}
}
await AddItemAsync(item).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (!shutdownToken.IsCancellationRequested)
Expand All @@ -161,7 +202,7 @@ private async Task RunWorkerAsync(CancellationToken shutdownToken)

if (batch.Count > 0)
{
await FlushBatchAsync(batch, shutdownToken).ConfigureAwait(false);
await FlushBatchAsync(batch.Items, shutdownToken).ConfigureAwait(false);
batch.Clear();
}
}
Expand All @@ -179,9 +220,9 @@ private async Task RunWorkerAsync(CancellationToken shutdownToken)
}
finally
{
// Anything left in the channel after the worker exits — fail it.
// Anything left in the batch/channel after the worker exits — fail it.
var failure = Volatile.Read(ref _terminalError) ?? new ObjectDisposedException(nameof(CheckpointBatcher));
foreach (var leftover in batch)
foreach (var leftover in batch.Items)
leftover.Completion.TrySetException(failure);
while (_channel.Reader.TryRead(out var item))
item.Completion.TrySetException(failure);
Expand Down Expand Up @@ -214,5 +255,17 @@ private async Task FlushBatchAsync(IReadOnlyList<BatchItem> batch, CancellationT
}
}

/// <summary>Accumulates a batch plus its estimated byte footprint so the two
/// never drift across the worker's add/flush/clear sites.</summary>
private sealed class PendingBatch
{
public readonly List<BatchItem> Items;
public long Bytes;
public PendingBatch(int capacity) { Items = new List<BatchItem>(capacity); }
public int Count => Items.Count;
public void Add(BatchItem item) { Items.Add(item); Bytes += EstimateUpdateBytes(item.Update); }
public void Clear() { Items.Clear(); Bytes = 0; }
}

private readonly record struct BatchItem(SdkOperationUpdate Update, TaskCompletionSource<bool> Completion);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ internal sealed class CheckpointBatcherConfig
public int MaxBatchOperations { get; init; } = 200;

/// <summary>
/// Maximum batch size in bytes. Service-side limit is ~750 KB.
/// Maximum batch size in bytes. Service-side request limit is ~750 KB.
/// </summary>
/// <remarks>
/// TODO: not enforced today. The worker only checks <see cref="MaxBatchOperations"/>;
/// a single oversized item (or a batch whose serialized size exceeds 750 KB)
/// will be sent to the service and rejected there. Wire this in alongside
/// the async-flush operations (Map / Parallel / child-context) since those
/// are the scenarios that can actually fill a batch — today every batch is
/// 1 item with <see cref="FlushInterval"/> = Zero, so the gap is latent.
/// Enforced by the worker: it flushes the current batch before adding an item
/// that would push the estimated request size over this cap, and sends a lone
/// item that already exceeds the cap by itself. The per-update estimate plus a
/// fixed request-prefix reserve approximate the real wire size conservatively.
/// </remarks>
internal int MaxBatchBytes { get; init; } = 750 * 1024;
public int MaxBatchBytes { get; init; } = 750 * 1024;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text;
using Amazon.Lambda;
using Amazon.Lambda.Core;
using SdkContextOptions = Amazon.Lambda.Model.ContextOptions;
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;

Expand All @@ -24,6 +25,10 @@ namespace Amazon.Lambda.DurableExecution.Internal;
/// and throw <see cref="ChildContextException"/>.</item>
/// <item><b>SUCCEEDED</b>: return cached deserialized result; user func is
/// NOT re-executed.</item>
/// <item><b>SUCCEEDED (overflow)</b>: <c>ReplayChildren=true</c> + empty
/// payload (the result was too large to checkpoint inline) → re-run the
/// user func to recover the large result value; terminal checkpoints
/// (SUCCEED/FAIL) are suppressed since the op is already terminal.</item>
/// <item><b>FAILED</b>: throw <see cref="ChildContextException"/> with the
/// recorded error; if <see cref="ChildContextConfig.ErrorMapping"/> is
/// set, the mapped exception is thrown instead.</item>
Expand All @@ -45,6 +50,8 @@ internal sealed class ChildContextOperation<T> : DurableOperation<T>
private readonly WorkflowCancellation _workflowCancellation;
private readonly CancellationToken _cooperativeBailToken;
private readonly bool _isVirtual;
// Set once on overflow-replay re-execution; never reset.
private bool _suppressTerminalCheckpoint;

public ChildContextOperation(
string operationId,
Expand Down Expand Up @@ -105,6 +112,14 @@ protected override Task<T> ReplayAsync(Operation existing, CancellationToken can
switch (existing.Status)
{
case OperationStatuses.Succeeded:
// Overflow: the result was too large to checkpoint inline
// (ReplayChildren=true, empty payload). Re-run the body to recover
// the value; the body's inner ops replay from their own
// checkpoints. Do NOT re-emit the (already terminal) SUCCEED.
if (existing.ContextDetails?.ReplayChildren == true)
{
return ExecuteFuncNoCheckpoint(cancellationToken);
}
// Side-effecting code runs at most once: replay returns the
// cached result without invoking the user func.
return Task.FromResult(DeserializeResult(existing.ContextDetails?.Result));
Expand All @@ -127,6 +142,12 @@ protected override Task<T> ReplayAsync(Operation existing, CancellationToken can
}
}

private Task<T> ExecuteFuncNoCheckpoint(CancellationToken cancellationToken)
{
_suppressTerminalCheckpoint = true;
return ExecuteFunc(cancellationToken);
}

private async Task<T> ExecuteFunc(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -180,8 +201,11 @@ private async Task<T> ExecuteFunc(CancellationToken cancellationToken)
{
// Virtual branches suppress the FAIL checkpoint but still propagate
// the exception — the orchestrator records the failure inline on the
// parent payload.
if (!_isVirtual)
// parent payload. Overflow-replay re-execution also suppresses it: the
// op is already terminal (SUCCEEDED) in the store, so re-emitting a
// FAIL would corrupt that record (mirrors ReplayChildrenAsync, which
// never re-checkpoints). The exception still propagates below.
if (!_isVirtual && !_suppressTerminalCheckpoint)
{
await EnqueueAsync(new SdkOperationUpdate
{
Expand All @@ -205,8 +229,17 @@ await EnqueueAsync(new SdkOperationUpdate

// Virtual branches suppress the SUCCEED checkpoint; the orchestrator
// serializes the result inline on the parent payload instead.
if (!_isVirtual)
// _suppressTerminalCheckpoint is set on overflow replay re-execution: the
// child is already terminal in the store, so we re-run only to recover the
// in-memory value and must NOT re-emit a SUCCEED.
if (!_isVirtual && !_suppressTerminalCheckpoint)
{
var serialized = SerializeResult(result);
// Overflow: result too large to checkpoint inline. Emit an empty
// payload + ReplayChildren so replay re-executes this body to recover
// the value (mirrors the concurrent-operation overflow strategy).
var overflow = Encoding.UTF8.GetByteCount(serialized) > DurableConstants.MaxOperationCheckpointBytes;

await EnqueueAsync(new SdkOperationUpdate
{
Id = OperationId,
Expand All @@ -215,7 +248,10 @@ await EnqueueAsync(new SdkOperationUpdate
Action = OperationAction.SUCCEED,
SubType = _config?.SubType,
Name = Name,
Payload = SerializeResult(result)
Payload = overflow ? string.Empty : serialized,
ContextOptions = overflow
? new SdkContextOptions { ReplayChildren = true }
: null
}, cancellationToken);
}

Expand Down
Loading
Loading