Skip to content

fix(optimize): fix stale OCC read_version in distributed compaction to prevent row resurrection#6653

Open
xiaguanglei wants to merge 1 commit intolance-format:mainfrom
xiaguanglei:fix/distributed-compact-read-version-resurrection
Open

fix(optimize): fix stale OCC read_version in distributed compaction to prevent row resurrection#6653
xiaguanglei wants to merge 1 commit intolance-format:mainfrom
xiaguanglei:fix/distributed-compact-read-version-resurrection

Conversation

@xiaguanglei
Copy link
Copy Markdown

@xiaguanglei xiaguanglei commented Apr 30, 2026

Problem

In distributed compaction (e.g. Spark's OptimizeExec), the commit phase opens a fresh Dataset handle at the latest version V+N, which is newer than version V at which tasks were planned and executed.

commit_compaction was using dataset.manifest.version (= V+N) as the TransactionBuilder read_version. Lance uses Optimistic Concurrency Control (OCC): when committing a transaction, the conflict checker loads all transactions committed after read_version and checks whether any of them touch the same fragments. Using V+N as read_version narrows the OCC scan window to (V+N, ∞), silently skipping any concurrent DELETE/UPDATE that committed between V and V+N. The compacted fragment is written without those deletion markers, causing deleted rows to reappear (data resurrection).

Timeline (bug):

  V     App A plan_compaction (read_version = V)
  │     App A executors rewriting fragments ...
  V+1   App B DELETE committed  ← OCC window starts at V+N, this is INVISIBLE
  │     App A executors finish
  V+N   App A opens fresh Dataset → dataset.manifest.version = V+N
        commit_compaction(read_version = V+N)
        OCC scans (V+N, ∞) → finds nothing → no conflict
  V+N+1 App A commits Rewrite → compacted fragment has NO deletion markers
        ❌ deleted rows resurrected

Root Cause

// Before – uses the version of the Dataset handle passed to commit_compaction,
// which in distributed mode is V+N (the latest version), not V (plan version)
let transaction = TransactionBuilder::new(
    dataset.manifest.version,   // V+N  ← wrong
    Operation::Rewrite { ... },
)

Fix

Each RewriteResult already carries read_version = V (the version at which the executor ran the task). Derive the transaction read_version from min(task.read_version) so the OCC window is anchored at V, covering the full range [V, ∞).

// After – anchor OCC window to the version tasks were planned/executed at
let tasks_read_version = completed_tasks
    .iter()
    .map(|t| t.read_version)
    .min()
    .unwrap_or(dataset.manifest.version);

let transaction = TransactionBuilder::new(
    tasks_read_version,   // V  ← correct
    Operation::Rewrite { ... },
)
Timeline (fixed):

  V     App A plan_compaction (read_version = V)
  │     App A executors rewriting fragments ...
  V+1   App B DELETE committed
  │     App A executors finish
  V+N   App A opens fresh Dataset → dataset.manifest.version = V+N
        commit_compaction(read_version = min(task.read_version) = V)
        OCC scans (V, ∞) → finds DELETE at V+1 → conflict detected
        ✅ OPTIMIZE retries or fails, no data resurrection

Test Plan

1. Rust unit test (added in this PR)

rust/lance/src/dataset/optimize.rstest_distributed_compact_concurrent_delete_no_resurrection

Simulates the exact distributed scenario in a single-process test:

Step 1  Write 4 fragments × 1 000 rows (version V=1)
Step 2  plan_compaction at V=1 → 1 CompactionTask (read_version=1)
Step 3  Execute task against dataset@V=1 → RewriteResult(read_version=1)
Step 4  Concurrent DELETE "a < 1000" → version V=2
Step 5  Open fresh Dataset → version=2 (simulates OptimizeExec's commit Dataset)
Step 6  commit_compaction(fresh_dataset@V=2, results[read_version=1])
        BEFORE fix → Ok(…), rows resurrected
        AFTER  fix → Err(retryable conflict), data integrity preserved
Step 7  Assert final row count WHERE a < 1000 == 0

Run locally:

cargo test -p lance --lib \
  dataset::optimize::tests::test_distributed_compact_concurrent_delete_no_resurrection

2. End-to-end reproduction on a real Spark YARN cluster

Two independent spark-submit applications against a 10 M-row, 32-fragment Lance table on YARN:

App Config Role
App A 1 executor, OPTIMIZE TABLE Plan at V=34, executor phase ~110 s
App B 4 executors, DELETE WHERE id % 312500 < 100 Commits in ~25 s (within App A's executor phase)

Result without fix:

App A (OPTIMIZE)  final status: SUCCEEDED  (committed at V+1, skipped DELETE)
App B (DELETE)    final status: SUCCEEDED  (committed at V+1 first)

SELECT COUNT(*) WHERE id % 312500 < 100
→ 3200  (expected 0)
❌ 3200 deleted rows resurrected

Result with fix:

App A (OPTIMIZE)  → Err: retryable commit conflict (OCC detected DELETE at V+1)
App B (DELETE)    final status: SUCCEEDED

SELECT COUNT(*) WHERE id % 312500 < 100
→ 0
✅ no data resurrection

Notes

  • The bug only manifests in the distributed compaction path where the caller opens a fresh Dataset for the commit phase (e.g. OptimizeExec.scala in lance-spark). Single-process compact_files is unaffected because the same Dataset handle is used throughout.
  • RewriteResult.read_version was already populated correctly by each task; this fix simply threads it through to TransactionBuilder.
  • End-to-end validation was performed on a production YARN cluster with the patched JAR: two independent Spark applications (OPTIMIZE + concurrent DELETE) confirmed that after the fix, OPTIMIZE correctly receives a Retryable commit conflict error instead of silently resurrecting deleted rows.

…ead_version in distributed compaction

In distributed compaction (e.g. Spark's OptimizeExec), the commit phase
opens a fresh Dataset handle at the latest version (V+N), which is newer
than the version V at which tasks were planned and executed.

Using dataset.manifest.version (= V+N) as the TransactionBuilder
read_version causes the OCC conflict checker to scan only transactions
after V+N, silently skipping any concurrent DELETE/UPDATE that committed
between V and V+N.  This allows deleted rows to reappear in the
compacted output (data resurrection).

Fix: derive read_version from min(task.read_version) across all completed
tasks, anchoring the OCC window to V so conflicts in [V, V+N] are caught.
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

@github-actions github-actions Bot added the bug Something isn't working label Apr 30, 2026
@github-actions
Copy link
Copy Markdown
Contributor

ACTION NEEDED
Lance follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

For details on the error please inspect the "PR Title Check" action.

@xiaguanglei xiaguanglei changed the title fix(optimize): prevent deleted-row resurrection caused by stale OCC read_version in distributed compaction fix(optimize): fix stale OCC read_version in distributed compaction to prevent row resurrection Apr 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants