Skip to content

Fix scheduler dag-version refresh deadlock with PK-ordered locking#66762

Draft
capytan wants to merge 1 commit into
apache:mainfrom
capytan:fix/verify-integrity-dag-version-deadlock
Draft

Fix scheduler dag-version refresh deadlock with PK-ordered locking#66762
capytan wants to merge 1 commit into
apache:mainfrom
capytan:fix/verify-integrity-dag-version-deadlock

Conversation

@capytan
Copy link
Copy Markdown

@capytan capytan commented May 12, 2026

related: #66734

After upgrading to 3.2.1 we hit periodic Postgres deadlocks on task_instance between two writers that lock the same rows in different orders:

  • scheduler's _verify_integrity_if_dag_changed issues UPDATE task_instance ... WHERE dag_id=? AND run_id=? AND state IN (...), which the planner satisfies via the (dag_id, run_id) index
  • api-server's ti_update_state does SELECT FOR UPDATE on the same rows by primary key

When the row sets overlap, PG aborts one of them with deadlock detected. Traceback and pg_stat_activity output are in #66734.

This PR replaces the bulk UPDATE with a SELECT id ORDER BY id LIMIT N FOR UPDATE SKIP LOCKED + UPDATE WHERE id IN (...) batch loop, so the scheduler also locks in PK order. Same approach #65920 takes for check_trigger_timeouts; #65818 tracks this family of bugs.

One thing that bit me on the way: the inner UPDATE only changes dag_version_id, not state, so a SELECT filtered solely on state IN (State.unfinished) keeps returning the same N ids on every iteration and the loop never terminates once the batch is full. The candidate SELECT also filters on dag_version_id IS NULL OR != latest_dag_version.id so each batch advances. The previous single bulk UPDATE never surfaced this; the batched form needs it.


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Opus 4.7 (1M context)

Generated-by: Claude Opus 4.7 (1M context) following the guidelines

@boring-cyborg boring-cyborg Bot added the area:Scheduler including HA (high availability) scheduler label May 12, 2026
@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented May 12, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example Dag that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@capytan capytan force-pushed the fix/verify-integrity-dag-version-deadlock branch 2 times, most recently from 99888c3 to e478a3f Compare May 14, 2026 18:43
Replace the bulk UPDATE in _verify_integrity_if_dag_changed (which
locked task_instance rows via the (dag_id, run_id) index) with a
SELECT id ORDER BY id FOR UPDATE SKIP LOCKED + UPDATE WHERE id IN
(...) batch loop. The previous bulk UPDATE could deadlock with
PK-ordered writers - notably the api-server ti_update_state path,
which locks rows by primary key under concurrent task state
updates; locking in PK order eliminates the cross-index race.

Filter the SELECT on dag_version_id so each batch advances past
already-updated rows. Without this guard the batch loop never
terminates: state IN (State.unfinished) still matches the rows
that the previous batch just tagged with the latest dag_version_id,
so SELECT keeps returning the same first N task_instance ids.
With BATCH_SIZE=1000 the bug only surfaces for dag_runs that have
more than one batch of unfinished task instances, which is why it
slipped through the default tests.

Add test_verify_integrity_processes_more_than_one_batch which uses
monkeypatch to shrink _DAG_VERSION_REFRESH_BATCH_SIZE to 2 and
asserts every initial task instance ends up tagged with the latest
dag_version after the refresh.
@capytan capytan force-pushed the fix/verify-integrity-dag-version-deadlock branch from e478a3f to 211c4f3 Compare May 25, 2026 09:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant