Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/parallel-checkpoint-align-succeed.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.DurableExecution",
"Type": "Patch",
"ChangelogMessages": [
"Align parallel parent checkpoint with other SDKs: always emit SUCCEED with summary in Payload, throw ParallelException SDK-side based on CompletionReason."
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Text;
using Amazon.Lambda;
using Amazon.Lambda.Core;
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;

namespace Amazon.Lambda.DurableExecution.Internal;
Expand All @@ -25,9 +24,9 @@ namespace Amazon.Lambda.DurableExecution.Internal;
/// SUCCEED with summary payload (<see cref="ParallelSummary"/>).</item>
/// <item><b>SUCCEEDED</b>: parent payload supplies the snapshot of per-
/// branch statuses + completion reason; per-branch results are
/// deserialised from the children's own CONTEXT checkpoints.</item>
/// <item><b>FAILED</b>: same reconstruction; throws
/// <see cref="ParallelException"/> carrying the rebuilt
/// deserialised from the children's own CONTEXT checkpoints. If the
/// completion reason is <see cref="CompletionReason.FailureToleranceExceeded"/>,
/// throws <see cref="ParallelException"/> carrying the rebuilt
/// <see cref="IBatchResult{T}"/>.</item>
/// <item><b>STARTED</b> / <b>PENDING</b>: re-execute (children replay from
/// their own checkpoints).</item>
Expand Down Expand Up @@ -96,13 +95,13 @@ protected override Task<IBatchResult<T>> ReplayAsync(Operation existing, Cancell
switch (existing.Status)
{
case OperationStatuses.Succeeded:
return Task.FromResult(ReconstructFromCheckpoints(existing, throwOnFailure: false));

case OperationStatuses.Failed:
// Reconstruct so the caller (and ParallelException.Result) sees
// the per-branch outcomes; then throw.
var failed = ReconstructFromCheckpoints(existing, throwOnFailure: false);
throw BuildParallelException(failed);
// The parent always checkpoints as SUCCEED — even when
// CompletionReason is FailureToleranceExceeded. Reconstruct
// the BatchResult and throw if it was a tolerance failure.
var result = ReconstructFromCheckpoints(existing);
if (result.CompletionReason == CompletionReason.FailureToleranceExceeded)
throw BuildParallelException(result);
return Task.FromResult(result);

case OperationStatuses.Started:
case OperationStatuses.Pending:
Expand Down Expand Up @@ -467,21 +466,23 @@ private async Task CheckpointParentResultAsync(
{
var summary = ParallelSummaryCodec.Build(result.All, completionReason);
var payload = ParallelSummaryCodec.ToPayload(summary);
var failed = completionReason == CompletionReason.FailureToleranceExceeded;

// Always checkpoint as SUCCEED — even when FailureToleranceExceeded.
// The completion reason lives inside the payload, matching the wire
// format of the Python/JS/Java SDKs. The ParallelException is thrown
// SDK-side after this checkpoint.
await EnqueueAsync(new SdkOperationUpdate
{
Id = OperationId,
Type = OperationTypes.Context,
Action = failed ? OperationAction.FAIL : OperationAction.SUCCEED,
Action = OperationAction.SUCCEED,
SubType = OperationSubTypes.Parallel,
Name = Name,
Payload = failed ? null : payload,
Error = failed ? BuildAggregateError(result) : null
Payload = payload
}, cancellationToken);
}

private IBatchResult<T> ReconstructFromCheckpoints(Operation parent, bool throwOnFailure)
private IBatchResult<T> ReconstructFromCheckpoints(Operation parent)
{
var summary = ParallelSummaryCodec.FromPayload(parent.ContextDetails?.Result);

Expand Down Expand Up @@ -545,14 +546,7 @@ private IBatchResult<T> ReconstructFromCheckpoints(Operation parent, bool throwO
? ParallelSummaryCodec.ReadCompletionReason(summary.CompletionReason)
: EvaluateCompletion(items, _branches.Count);

var result = new BatchResult<T>(items, completionReason);

if (throwOnFailure && completionReason == CompletionReason.FailureToleranceExceeded)
{
throw BuildParallelException(result);
}

return result;
return new BatchResult<T>(items, completionReason);
}

private static BatchItemStatus InferStatusFromBranchOp(Operation? branchOp)
Expand All @@ -576,15 +570,6 @@ private static ParallelException BuildParallelException(IBatchResult<T> result)
};
}

private static SdkErrorObject BuildAggregateError(IBatchResult<T> result)
{
return new SdkErrorObject
{
ErrorType = typeof(ParallelException).FullName,
ErrorMessage = $"Parallel operation failed: {result.FailureCount} of {result.TotalCount} branches failed."
};
}

private T DeserializeBranchResult(string serialized)
{
var bytes = Encoding.UTF8.GetBytes(serialized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ public async Task ParallelAsync_ReplayFailed_ThrowsParallelException()
{
Id = parentOpId,
Type = OperationTypes.Context,
Status = OperationStatuses.Failed,
Status = OperationStatuses.Succeeded,
SubType = OperationSubTypes.Parallel,
Name = "fanout",
ContextDetails = new ContextDetails { Result = summaryJson }
Expand Down