feat: support exact percentile aggregate natively#4542
Draft
andygrove wants to merge 2 commits into
Draft
Conversation
Wire Spark's exact `Percentile` aggregate (and the `percentile_cont` ANSI form, which Spark rewrites to `Percentile`) to DataFusion's `percentile_cont` aggregate. DataFusion uses the same `index = p * (n - 1)` linear interpolation as Spark, so results match for the common single-percentage form. - proto: add `Percentile` AggExpr message (child, percentage, datatype). - native planner: map it to `percentile_cont_udaf()` with [child, percentile]. - CometPercentile serde: Compatible for a single literal double percentage, default frequency, and numeric input; the child is cast to double so the native result is DoubleType. Array-of-percentages, a non-default frequency argument, and interval inputs fall back to Spark. - operators.adjustOutputForNativeState: map Percentile's TypedImperativeAggregate Binary partial buffer to the native List<Float64> state (ArrayType(DoubleType)), mirroring CollectSet, so the partial/shuffle/final exchange schema is correct. Codegen dispatch is not applicable: aggregates (TypedImperativeAggregate) cannot run in the per-row scalar kernel, so native is the only path. Tests: SQL file test covering global, grouped, integer-input, all-null, exact and interpolated percentiles, plus fallback assertions for the array and frequency forms. No new regressions in the SQL suite.
312293f to
751131c
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Part of #3190.
Rationale for this change
Comet had no native percentile aggregate, so
percentile(...)(and the ANSIpercentile_cont(...) WITHIN GROUP, which Spark rewrites toPercentile) always fell back to Spark. Codegen dispatch is not an option here:Percentileis aTypedImperativeAggregate, and the codegen dispatcher is a per-row scalar kernel that explicitly cannot run aggregates. So the only paths are native or fall back, and this PR wires it natively.DataFusion's
percentile_contcomputes the percentile withindex = p * (n - 1)and linear interpolation between the two closest ranks, which is exactly Spark's exactPercentilealgorithm. So the common single-percentage form matches Spark.What changes are included in this PR?
PercentileAggExprmessage (child,percentage,datatype).planner.rs): mapAggExprStruct::Percentiletopercentile_cont_udaf()with args[child, percentile].CometPercentileserde:Compatiblefor a single literal double percentage, default frequency, and numeric input. The child is cast to double so the native result isDoubleType, matching Spark.operators.adjustOutputForNativeState: map Percentile'sTypedImperativeAggregateBinarypartial buffer to the nativeList<Float64>state (ArrayType(DoubleType)), mirroring the existingCollectSethandling, so the partial/shuffle/final exchange schema is correct.Out of scope (fall back to Spark): an array of percentages, a non-default frequency argument, and interval inputs.
approx_percentileis deliberately not included (t-digest vs Spark's GK algorithm; tracked separately under #3189).Known minor caveat: DataFusion quantizes the interpolation fraction to 6 decimal places, so a deeply-interpolated value could in principle differ from Spark in the last ULPs. The tested percentiles match exactly; if needed this can be revisited with a custom accumulator.
How are these changes tested?
A SQL file test (
expressions/aggregate/percentile.sql) run byCometSqlFileTestSuitecovers global, grouped, integer-input, all-null-group, and exact and interpolated percentiles, asserting answer parity and native execution viacheckSparkAnswerAndOperator. It also asserts that the array-of-percentages and frequency-argument forms fall back to Spark. The full SQL suite shows no new regressions.