Skip to content

Fix pipe drop event discard with restart-aware committer keys#17748

Open
Caideyipi wants to merge 5 commits into
masterfrom
Fix-drop
Open

Fix pipe drop event discard with restart-aware committer keys#17748
Caideyipi wants to merge 5 commits into
masterfrom
Fix-drop

Conversation

@Caideyipi
Copy link
Copy Markdown
Collaborator

@Caideyipi Caideyipi commented May 22, 2026

Description

This PR fixes pipe event discard logic when dropping a pipe task by using CommitterKey instead of only (pipeName, creationTime, regionId).

Previously, queued/retry/batched events were matched only by pipe name, creation time, and region id, which could incorrectly
discard events from a restarted pipe task. This change propagates the full committer key through pending queues, sink
subtasks, batch builders, and sink implementations so discard checks can distinguish task restart generations.

Changes

  • Use CommitterKey for dropped pipe task tracking and event matching.
  • Propagate restart-aware discard APIs across pipe sink subtasks and connectors.
  • Update async, sync, airgap, and websocket sinks to discard only matching pipe task events.
  • Keep backward-compatible discard methods with wildcard restart time.

This PR has:

  • been self-reviewed.
    • concurrent read
    • concurrent write
    • concurrent read and write
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods.
  • added or updated version, license, or notice information
  • added comments explaining the "why" and the intent of the code wherever would not be obvious
    for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold
    for code coverage.
  • added integration tests.
  • been tested in a test IoTDB cluster.

Key changed/added classes (or packages if there are too many classes) in this PR

@Caideyipi Caideyipi changed the title Fixed the drop pipe logic to avoid losing data Fix pipe drop event discard with restart-aware committer keys May 22, 2026
@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
7.3% Duplication on New Code (required ≤ 5%)

See analysis details on SonarQube Cloud

@codecov
Copy link
Copy Markdown

codecov Bot commented May 26, 2026

Codecov Report

❌ Patch coverage is 56.32184% with 38 lines in your changes missing coverage. Please review.
✅ Project coverage is 40.57%. Comparing base (0d1b838) to head (a20c1d3).
⚠️ Report is 13 commits behind head on master.

Files with missing lines Patch % Lines
.../payload/evolvable/batch/PipeTabletEventBatch.java 0.00% 7 Missing ⚠️
.../pipe/agent/task/subtask/sink/PipeSinkSubtask.java 25.00% 6 Missing ⚠️
...ink/protocol/airgap/IoTDBDataRegionAirGapSink.java 0.00% 3 Missing ⚠️
.../protocol/thrift/sync/IoTDBDataRegionSyncSink.java 0.00% 3 Missing ⚠️
...db/pipe/sink/protocol/websocket/WebSocketSink.java 0.00% 3 Missing ⚠️
...e/sink/protocol/PipeConnectorWithEventDiscard.java 0.00% 3 Missing ⚠️
...gent/task/subtask/sink/PipeSinkSubtaskManager.java 0.00% 2 Missing ⚠️
...d/evolvable/batch/PipeTransferBatchReqBuilder.java 50.00% 2 Missing ⚠️
...rotocol/thrift/async/IoTDBDataRegionAsyncSink.java 83.33% 2 Missing ⚠️
...n/task/subtask/SubscriptionSinkSubtaskManager.java 0.00% 2 Missing ⚠️
... and 4 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17748      +/-   ##
============================================
- Coverage     40.59%   40.57%   -0.03%     
  Complexity     2574     2574              
============================================
  Files          5179     5180       +1     
  Lines        349979   350366     +387     
  Branches      44749    44803      +54     
============================================
+ Hits         142082   142153      +71     
- Misses       207897   208213     +316     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants