diff --git a/.autover/changes/parallel-checkpoint-align-succeed.json b/.autover/changes/parallel-checkpoint-align-succeed.json new file mode 100644 index 000000000..6e39e6d02 --- /dev/null +++ b/.autover/changes/parallel-checkpoint-align-succeed.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.DurableExecution", + "Type": "Patch", + "ChangelogMessages": [ + "Align parallel parent checkpoint with other SDKs: always emit SUCCEED with summary in Payload, throw ParallelException SDK-side based on CompletionReason." + ] + } + ] +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs index 2d55dc511..8b4cca630 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs @@ -5,7 +5,6 @@ using System.Text; using Amazon.Lambda; using Amazon.Lambda.Core; -using SdkErrorObject = Amazon.Lambda.Model.ErrorObject; using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; namespace Amazon.Lambda.DurableExecution.Internal; @@ -25,9 +24,9 @@ namespace Amazon.Lambda.DurableExecution.Internal; /// SUCCEED with summary payload (). /// SUCCEEDED: parent payload supplies the snapshot of per- /// branch statuses + completion reason; per-branch results are -/// deserialised from the children's own CONTEXT checkpoints. -/// FAILED: same reconstruction; throws -/// carrying the rebuilt +/// deserialised from the children's own CONTEXT checkpoints. If the +/// completion reason is , +/// throws carrying the rebuilt /// . /// STARTED / PENDING: re-execute (children replay from /// their own checkpoints). @@ -96,13 +95,13 @@ protected override Task> ReplayAsync(Operation existing, Cancell switch (existing.Status) { case OperationStatuses.Succeeded: - return Task.FromResult(ReconstructFromCheckpoints(existing, throwOnFailure: false)); - - case OperationStatuses.Failed: - // Reconstruct so the caller (and ParallelException.Result) sees - // the per-branch outcomes; then throw. - var failed = ReconstructFromCheckpoints(existing, throwOnFailure: false); - throw BuildParallelException(failed); + // The parent always checkpoints as SUCCEED — even when + // CompletionReason is FailureToleranceExceeded. Reconstruct + // the BatchResult and throw if it was a tolerance failure. + var result = ReconstructFromCheckpoints(existing); + if (result.CompletionReason == CompletionReason.FailureToleranceExceeded) + throw BuildParallelException(result); + return Task.FromResult(result); case OperationStatuses.Started: case OperationStatuses.Pending: @@ -467,21 +466,23 @@ private async Task CheckpointParentResultAsync( { var summary = ParallelSummaryCodec.Build(result.All, completionReason); var payload = ParallelSummaryCodec.ToPayload(summary); - var failed = completionReason == CompletionReason.FailureToleranceExceeded; + // Always checkpoint as SUCCEED — even when FailureToleranceExceeded. + // The completion reason lives inside the payload, matching the wire + // format of the Python/JS/Java SDKs. The ParallelException is thrown + // SDK-side after this checkpoint. await EnqueueAsync(new SdkOperationUpdate { Id = OperationId, Type = OperationTypes.Context, - Action = failed ? OperationAction.FAIL : OperationAction.SUCCEED, + Action = OperationAction.SUCCEED, SubType = OperationSubTypes.Parallel, Name = Name, - Payload = failed ? null : payload, - Error = failed ? BuildAggregateError(result) : null + Payload = payload }, cancellationToken); } - private IBatchResult ReconstructFromCheckpoints(Operation parent, bool throwOnFailure) + private IBatchResult ReconstructFromCheckpoints(Operation parent) { var summary = ParallelSummaryCodec.FromPayload(parent.ContextDetails?.Result); @@ -545,14 +546,7 @@ private IBatchResult ReconstructFromCheckpoints(Operation parent, bool throwO ? ParallelSummaryCodec.ReadCompletionReason(summary.CompletionReason) : EvaluateCompletion(items, _branches.Count); - var result = new BatchResult(items, completionReason); - - if (throwOnFailure && completionReason == CompletionReason.FailureToleranceExceeded) - { - throw BuildParallelException(result); - } - - return result; + return new BatchResult(items, completionReason); } private static BatchItemStatus InferStatusFromBranchOp(Operation? branchOp) @@ -576,15 +570,6 @@ private static ParallelException BuildParallelException(IBatchResult result) }; } - private static SdkErrorObject BuildAggregateError(IBatchResult result) - { - return new SdkErrorObject - { - ErrorType = typeof(ParallelException).FullName, - ErrorMessage = $"Parallel operation failed: {result.FailureCount} of {result.TotalCount} branches failed." - }; - } - private T DeserializeBranchResult(string serialized) { var bytes = Encoding.UTF8.GetBytes(serialized); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs index a9621ccbf..1be4b817d 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs @@ -667,7 +667,7 @@ public async Task ParallelAsync_ReplayFailed_ThrowsParallelException() { Id = parentOpId, Type = OperationTypes.Context, - Status = OperationStatuses.Failed, + Status = OperationStatuses.Succeeded, SubType = OperationSubTypes.Parallel, Name = "fanout", ContextDetails = new ContextDetails { Result = summaryJson }