Skip to content

Add ResumableJobMixin with SparkSubmitOperator as a case study for surviving worker failures (standalone)#67118

Open
amoghrajesh wants to merge 20 commits into
apache:mainfrom
astronomer:aip-103-9-resumable-mixin-plus-spark
Open

Add ResumableJobMixin with SparkSubmitOperator as a case study for surviving worker failures (standalone)#67118
amoghrajesh wants to merge 20 commits into
apache:mainfrom
astronomer:aip-103-9-resumable-mixin-plus-spark

Conversation

@amoghrajesh
Copy link
Copy Markdown
Contributor

@amoghrajesh amoghrajesh commented May 18, 2026

Background

Long-running Spark jobs (and similar remote execution patterns) have historically had two options in Airflow:

  1. Deferrable operators — the task defers, hands control to the Triggerer, and the worker slot is freed. In theory this handles the "worker dies" case because the Triggerer owns continuity. In practice, deferrable adoption has been low: the async programming model is a different mental model from standard data engineering, requires a separate Triggerer component to operate and scale, and the defer() boundary means a killed task during deferral (e.g. user clearing while deferred) still loses the job handle. Zero custom deferrable operators have been written by teams that do write custom regular operators, for these reasons.

  2. Custom operators with manual application ID tracking — some teams build their own operators that capture the YARN application ID or Spark driver ID immediately after submission, persist it to some storage, and on retry look it up before deciding whether to resubmit. It works but requires significant per-team investment and the storage layer is bespoke — no standard interface, no UI visibility, no composability with Airflow's retry policies.

This PR takes a third path: synchronous execution with checkpointing. The operator runs in a worker slot (same mental model as today, no Triggerer needed), persists the job handle to the AIP-103 task_state before polling begins, and on retry reads it back. From the data engineers perspective the operator looks and behaves identically to before — it just survives worker disruptions without losing its place.

What problem are we solving?

This is the side benefit of AIP 103 on a test subject.

This PR handles cluster deploy mode where the driver runs on the cluster independently of the submitter. In client mode the driver runs inside the spark-submit process, so killing the worker also kills the driver. The PR targets cluster mode only.

When an Airflow worker running a SparkSubmitOperator task dies mid-execution, the spark driver keeps running on the spark cluster independently. Airflow has no recollection of the previous submission, ie: on retry it creates a fresh operator instance and submits a new Spark job irrespective of what happened with the earlier submitted spark job. The original driver's work is wasted, and both jobs may run concurrently consuming duplicate compute.

This was reproduced empirically: triggered a SparkPi job in spark cluster mode, killed the Airflow worker while the driver was running, observed and waited for the driver complete on the spark cluster, then watched the retry submit a second driver with a different ID.

Current behaviour

SparkSubmitOperator.execute() calls self._hook.submit(), a single blocking call that submits the job, extracts the driver ID, and polls — all in one function. The driver ID is stored on self._hook._driver_id, in memory on the worker side. When the worker dies, that memory is gone. Retry submits a new job with no knowledge of the previous one.

Proposed change

ResumableJobMixin: a new mixin for operators that submit one long-running job to an external system and poll for completion. It provides a single method, execute_resumable(context), that operators call from their own execute():

  • First run: calls submit_job(), persists the returned external ID to task_state before polling starts, then calls poll_until_complete()
  • Retry with active job: reads the ID from task_state, checks its status, skips resubmission and reconnects directly to the running job
  • Retry with already-finished job: returns the result immediately — no resubmission, no polling
  • Retry with failed job: falls through and resubmits fresh

The mixin is generic — it knows nothing about spark and that is intention. Six abstract methods (submit_job, get_job_status, is_job_active, is_job_succeeded, poll_until_complete, get_job_result) are implemented by each operator for its external system.

SparkSubmitOperator is wired to the mixin for Spark Standalone cluster mode, and this is how its going to operate:

  • execute() routes to execute_resumable() when hook._should_track_driver_status is True (standalone cluster mode only — the existing flag that YARN and K8s set to False). Adding YARN/K8s resumable support in a follow-up only requires updating that flag; execute() routing is stable.
  • get_job_status() queries the Spark REST API, replacing the existing spark-submit --status subprocess approach which is a blocking sync call that ran a binary
  • external_id_key = "spark_job_id" is intentionally generic — works for standalone (driver-xxx), YARN (application-xxx), and K8s (pod name)
  • hook.submit() now returns the driver ID instead of None, with the internal _start_driver_status_tracking() call removed — polling is now owned by poll_until_complete() in the operator.
    For flow, see this:

Before:

image

After:

image

Opt-out

Teams who prefer the previous behaviour (always resubmit on retry) can set reconnect_on_retry=False on the operator. The default is True — reconnect when a prior job is found. Reasoning: almost no one wants a Spark job to resubmit when the worker dies; Spark jobs are expensive and a duplicate submission wastes compute. The opt-out exists for teams who have specific reasons to always resubmit.

User implications / backcompat

  • Non-cluster modes (YARN, K8s, local/client): behaviour is unchanged.
  • Standalone cluster mode: on first run, the driver ID is now persisted to task_state before polling. On retry, Airflow reconnects instead of resubmitting. No DAG changes required.
  • Airflow 2: the operator falls back to a no-op stub via try/except — always submits fresh, identical to todays behaviour.
  • hook.submit() return type changed from None to str | None — not a breaking change; no existing caller used the return value.
  • Adding reconnect_on_retry=False allows opting out of reconnecting to a running spark job and submitting fresh always.

Notes worth knowing

  • There is a ~very small crash window between submit_job() returning and task_state.set() completing. If the worker dies in this window, the retry submits a duplicate.
  • The mixin is designed as a trial: after Spark (this PR) and EMR (follow-up) both fit the 6-method interface cleanly, the mixin ships permanently. If either fights back (multi-job fan-out, Flink savepoint semantics), the fallback is a doc-only execute() template.

Testing

Setup

Created a spark standalone setup

docker compose -f dev/spark-cluster/docker-compose.yml up -d
services:
  spark-master:
    image: spark:python3
    container_name: spark-master
    user: root
    command:
      - bash
      - -c
      - |
        mkdir -p /usr/lib/jvm && ln -sf /opt/java/openjdk /usr/lib/jvm/java-17-openjdk-arm64
        exec /opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -h spark-master
    ports:
      - "7077:7077"
      - "8080:8080"
      - "6066:6066"
    volumes:
      - ./:/opt/spark/apps
    networks:
      - spark-net

  spark-worker:
    image: spark:python3
    container_name: spark-worker
    user: root
    command:
      - bash
      - -c
      - |
        mkdir -p /usr/lib/jvm && ln -sf /opt/java/openjdk /usr/lib/jvm/java-17-openjdk-arm64
        exec /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
    environment:
      - SPARK_WORKER_MEMORY=4G
      - SPARK_WORKER_CORES=2
    volumes:
      - ./:/opt/spark/apps
    depends_on:
      - spark-master
    networks:
      - spark-net

networks:
  spark-net:
    driver: bridge

Start with breeze, configure breeze to run spark jobs

Start Breeze, then run the one-time setup to install java:

#!/usr/bin/env bash
# Run once after starting Breeze to prepare the environment for SparkSubmitOperator.
set -e

if ! command -v java &>/dev/null; then
    echo "Installing Java..."
    apt-get update -qq && apt-get install -y -q openjdk-17-jdk-headless
fi

export JAVA_HOME=$(readlink -f /usr/bin/java | sed 's|/bin/java||')
echo "JAVA_HOME=$JAVA_HOME"
java -version

airflow connections delete spark_default 2>/dev/null || true
airflow connections add spark_default \
  --conn-type spark \
  --conn-host host.docker.internal \
  --conn-port 7077 \
  --conn-extra '{"deploy-mode": "cluster", "spark-binary": "spark3-submit"}'

Reproducing the need for checkpointing

Spark Standalone Mode

Use this DAG:

import pendulum
from airflow.sdk import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG(
    dag_id="spark_cluster_mode_repro",
    schedule=None,
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    max_active_runs=1,
    tags=["aip-103", "spark-repro"],
) as dag:

    submit_spark_job = SparkSubmitOperator(
        task_id="submit_long_running_job",
        # SparkPi with 100M iterations gives ~2 min of runtime to kill the worker mid-job.
        application="/opt/spark/examples/jars/spark-examples_2.13-4.1.1.jar",
        java_class="org.apache.spark.examples.SparkPi",
        application_args=["10000"],
        conn_id="spark_default",
        deploy_mode="cluster",
        retries=2,
        retry_delay=pendulum.duration(seconds=10),
    )

Before applying fix (from main)

  1. Trigger the DAG: airflow dags trigger spark_cluster_mode_repro
  2. Stream logs and wait for: identified spark driver id: driver-XXXXX log
  3. Kill the Airflow worker: kill -9 $(pgrep -f "airflow task run")
  4. Check the retry logs — you see a new Spark-Submit cmd which has run spark-submit ... with a different driver ID, And spark UI at http://localhost:8080 shows 2 drivers running.

This basically means, when Airflow worker came back up, it wasn't aware that spark driver ran through and it submitted another one, doing duplicate work.

First Run:

image

Killed worker mid run:

image

Wait until this stage:

image

Airflow worker should be brought back up:

Resubmit happens from airflow:

image

Spark ends up doing more work / duplicate work:

image

After applying the fix

Same steps

First Run:

image

Kill worker mid run

image

Wait until:

image

Airflow worker should be brought back up:

image

No rework submitted on spark:

image

What is next

  • YARN cluster mode: implement get_job_status via YARN RM REST API and implement poll_until_complete polling loop.
  • K8s cluster mode: same pattern with K8s pod phase API.
  • Other potential providers: ResumableJobMixin is ready for Flink, EMR, Databricks, KPO
  • Progress tracking: task_state can store spark_progress (percentage complete) by querying the Spark master stage API.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

if self._hook is None:
self._hook = self._get_hook()
if self._hook._should_track_driver_status:
return self.execute_resumable(context)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should really be self.defer() at this point/soon after it. It's precisely the use case Triggers are ideal for (that where a job is submitted to a remote service and you have an ID and want to efficiently poll for it to be completed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a textbook deferrable case. This PR is specifically addressing the synchronous operator case, which is a different problem from the deferrable case.

For deferrable operators, the triggerer already handles continuity. The failure mode I am attempting fixing here (worker dies mid-poll, driver ID lost, retry resubmits a duplicate) does not apply to deferrable operators once they have deferred.

The reason i am not using defer() here is that the deferrable path has its own adoption barriers — it requires a separate Triggerer component, introduces an async programming model that is a different mental model from standard sync model, and has seen very low adoption in practice despite being available for several years. Teams that write custom operators are known to write synchronous ones. These are reasons to solve the sync case on its own terms rather than requiring users to migrate to deferrable.

The two approaches are complementary, not competing. The spark_job_id that we persist to task_state is intentionally compatible with a future deferrable implementation — a SparkSubmitTrigger could read the same key from task_state to reconnect before deferring. But thats a diff problem as I see it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the approach. We've used custom deferrable sensors extensively, but run GlueJobOperator synchronously in production. We explored deferrable for compute operators but didn't fully adopt it (triggerer bottlenecks, didn't dive deep enough into reliability concerns). Either way, the worker crash recovery problem is orthogonal to deferrable, as you point out.

Glue seems like a great case to test this as well, since it recently added resume_glue_job_on_retry which aims to solve the same problem via XCom + scanning all job runs as fallback. Haven't tried it yet but plan to shortly. Would the intent be for operators like Glue to eventually adopt ResumableJobMixin and replace that ad-hoc mechanism ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Alexhans thanks for chiming in. Yes, exactly thats the intent. GlueJobOperator``resume_glue_job_on_retry is solving the same problem but with two rough edges that task_state addresses cleanly:

  • XCom is shared across all TIs in a DAG run — using it for retry continuity is a workaround. task_state is scoped to a single TI across its retries, which is exactly the right scope for "what job did I submit last time."
    "Scan all job runs as fallback" exists because there's no reliable place to store the ID before the worker dies. With task_state, the ID is persisted before polling starts, ie: no scan needed, no ambiguity about which run belongs to this retry.
  • Migrating Glue to ResumableJobMixin can be a probably nice follow-up — drop the XCom write + scan logic, implement the six abstract methods for resumable, and the retry behaviour becomes standard.

@ashb
Copy link
Copy Markdown
Member

ashb commented May 18, 2026

When an Airflow worker running a SparkSubmitOperator task dies mid-execution, the spark driver keeps running on the spark cluster independently:

I'm not sure that is always true -- it depends on what mode you submit the job as. (My spark knowledge is rusty though)

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

When an Airflow worker running a SparkSubmitOperator task dies mid-execution, the spark driver keeps running on the spark cluster independently:

I'm not sure that is always true -- it depends on what mode you submit the job as. (My spark knowledge is rusty though)

@ashb This is true specifically for cluster deploy mode where the driver runs on the cluster independently of the submitter. In client mode the driver runs inside the spark-submit process, so killing the worker also kills the driver.

The PR targets cluster mode only. Will tighten the PR description.

@amoghrajesh amoghrajesh marked this pull request as ready for review May 19, 2026 11:32
Comment thread providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py Outdated
Comment thread task-sdk/src/airflow/sdk/bases/resumablemixin.py Outdated
Comment thread task-sdk/src/airflow/sdk/__init__.py Outdated
Comment thread task-sdk/src/airflow/sdk/bases/resumablemixin.py
@amoghrajesh amoghrajesh requested review from ashb and kaxil May 21, 2026 11:11
Copy link
Copy Markdown
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for delayed review.

As expressed in devcall I am not fully convinced that SparkSubmit is taken as an example. If there are real issues with Deferred execution then I think we should also ensure this is running reliable.
I understand concerns and for bis spark jobs running a dedicated worker might be neglegible but in my view we should not "advertise" this as the new superior solution for long running tasks. Still should be Triggerer in my view and using this MixIn for cases where Triggerer is not reliable (as a workaround until improved) or in cases where knowledge or complexity is not matching implementing a Deferred.

Some non-blocking comments on the interface. Not blocking but just some thoughts.

Note that KPO is not a candidate for this interface because it already imeplements a resume option on both deferrable=true/false. Before a new Pod is started a query is made to K8s if a Pod can be found via labels as labels get task-id, dag-id, run-id and map-index attached. So state is in K8s already. Operators inheriting from KPO inherit this feature as well.
Especially in KPO a lot of fixes were made the last ~9months (e.g. by @AutomationDev85 and @wolfdn) so before assuming a resume option is added we should recommend using Triggerer. This includes log tailing as well.

Otherwise: Looks great! Hope to get this in 3.3.0!

def submit_job(self, context) -> str:
return self.hook.submit(...)

def get_job_status(self, external_id: str) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about exposing this as property and not as a get_*() method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_job_result(self, external_id, context) takes a parameter so it cannot be a property. It takes args and submits a job

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I was wondering about from interface perspective as well. If it is common then the external ID could be in side the MixIn.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already is, external_id_key is already defined in the mixin (with a default of "remote_job_id"). Subclasses override it with their specific key name, e.g. external_id_key = "spark_job_id" in SparkSubmitOperator. The storage and retrieval in execute_resumable() reads self.external_id_key directly, ie: the operator only declares the name, the mixin owns the read/write logic.

Comment thread task-sdk/src/airflow/sdk/bases/resumablemixin.py
return self.hook.get_status(external_id)

def is_job_active(self, status: str) -> bool:
return status in ("RUNNING", "PENDING")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense instead of adding new strings using the TaskInstance.state enum for this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively if you think this should not be leveraged adding a new enum for the state?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status strings ("RUNNING", "PENDING", etc.) are external system states not Airflow's TaskInstanceState. TaskInstanceState has values like running, success, failed which are Airflow task states, not remote job states.

An enum in the mixin would force every provider to translate their native states into it, adding a layer with no real benefit.

The current contract — two boolean methods is_job_active() and is_job_succeeded() — is deliberately looser. The mixin does not need to know what states are possible, it just asks "is this still running?" and the subclass answers with whatever logic fits that system. Stays composable across providers.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, then I understand it after reading your comments. Maybe this should be expressed in interface contract then. The status this is any kind of string whatever the respective backend has, could be also a "ContainerCreating" or "Approval pending" or "Pizza was ordered, delivery will arrive in 20min"... very much based on specific backend.

Can you express this in the docs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added it: 355f485, thanks, good call.

@jscheffl
Copy link
Copy Markdown
Contributor

Post-Approve comment: Would it be possible to extend the MixIn as well with partner functions in async that the MixIn can also be used to transfer the state to Triggerer? Would be a cool convenience.

@ashb
Copy link
Copy Markdown
Member

ashb commented May 22, 2026

@jscheffl On the KPO front: i think even there, task state could make it more efficient. Rather than having to search by label, we can store the actual launched pod name/id in state. I know there's been some search performance issues at the K8s API in the past

@amoghrajesh amoghrajesh requested a review from shahar1 May 22, 2026 09:35
@amoghrajesh
Copy link
Copy Markdown
Contributor Author

@jscheffl thanks for your review. I agree that deferrable should remain the recommended path for long-running tasks — that is not what this is trying to change. The goal here is much narrower: making the existing sync path "survivable" for teams already on it.

The framing I would probably use here is: ResumableJobMixin is not "a better alternative to Triggerer" — it is more of a safety net for the sync path that already exists. If you are running sync operators today (and many teams are, for log observability, complexity, or org constraints), you get crash recovery. If you want to move to deferrable, nothing here blocks that, the two paths are independent.

And tbh, it's not a question of sync vs async, but making sync better for teams who can't or won't do async right now. I will make this explicit in the mixin's docstring so it's clear to anyone reading the code.

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

And regarding KPO, it is worth noting that task_state could still improve KPO's resume mechanism — today it searches by labels (a K8s list+filter operation). With task_state, the pod name is stored on launch and retrieved directly by name on retry, very quick lookup vs a label scan everytime, and no ambiguity from stale pods. That is a separate follow-up, not this PR. The broader point being: task_state is useful beyond crash recovery — it's a general "remember what you submitted" store that benefits any operator tracking external jobs

@amoghrajesh amoghrajesh force-pushed the aip-103-9-resumable-mixin-plus-spark branch from 791de5d to d1fa89d Compare May 22, 2026 12:58
@amoghrajesh
Copy link
Copy Markdown
Contributor Author

@ashb / @kaxil i think this PR is pretty ready now, would love some reviews.

Comment on lines +197 to +200
This is the **synchronous path** — the worker holds a slot for the duration of polling. This is
intentional for teams that prefer sync operators for log observability, org constraints, or
because a Triggerer is not available. It is not a replacement for deferrable operators; the two
approaches are complementary.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This is the **synchronous path** — the worker holds a slot for the duration of polling. This is
intentional for teams that prefer sync operators for log observability, org constraints, or
because a Triggerer is not available. It is not a replacement for deferrable operators; the two
approaches are complementary.
This is the **synchronous path** — the worker holds a slot for the duration of polling. This is
a crash-safety net for teams running sync operators for log observability, org constraints, or
because a Triggerer is not available. If a Triggerer is available, deferrable
operators are the better choice for long-running tasks.

The original "complementary" implies parity that doesn't exist - deferrable wins on resource cost whenever it's an option. Worth updating the docs to say so explicitly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shahar1 good callout, this is how I handled it: e7701de, some small edits to what you mention:

This is the synchronous path — the worker holds a slot for the duration of polling. This is
a crash-safety net for teams running sync operators for log observability, org constraints, or
because a Triggerer is not available. Teams with a Triggerer available may also consider
deferrable operators, which free the worker slot but may come with added complexity.

WDYT?

Comment thread providers/apache/spark/docs/operators.rst Outdated
Copy link
Copy Markdown
Member

@XD-DENG XD-DENG left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question I would have is: earlier when I was talking to @kaxil , seems to me the direction to "build a triggerer for this purpose and make this operator deferrable" is preferred?

Do we need to build two modes? Would like to understand @kaxil 's thoughts on this.

No strong opinion on this though.

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

Alright I am back on this one now.

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

amoghrajesh commented May 25, 2026

Another question I would have is: earlier when I was talking to @kaxil , seems to me the direction to "build a triggerer for this purpose and make this operator deferrable" is preferred?

Do we need to build two modes? Would like to understand @kaxil 's thoughts on this.

No strong opinion on this though.

@XD-DENG The mixin should not be thought of a second mode, rather it makes the existing sync path crash-safe. SparkSubmitOperator has always had the option to run synchronously and this addition just means if the worker dies mid-poll, the next retry reconnects to the running driver instead of blindly resubmitting or not having an option to recover safely. Sync becomes survivable without any architecture change on the user's side. I will also be building a triggerer solution to this in a follow on: #67168

If a team is/can already using deferrable operators, nothing here changes for them.

@kaxil can add more if I missed anything.

self._hook = self._get_hook()
self._hook.submit(self.application)
hook = self._hook
if hook._should_track_driver_status and self.reconnect_on_retry:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reconnect_on_retry=False currently skips execute_resumable() entirely, but polling was removed from hook.submit() for cluster mode. Result: standalone cluster + reconnect_on_retry=False submits and returns immediately — task succeeds while the driver is still running.

I think here we need poll_until_complete call ?

Copy link
Copy Markdown
Contributor Author

@amoghrajesh amoghrajesh May 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — you're right. The opt-out path fell through to bare hook.submit() which no longer polls in cluster mode (polling was extracted into poll_until_complete). Fixed: the reconnect_on_retry=False + cluster-mode path now calls submit_job + poll_until_complete directly, giving it the same submit-and-wait semantics as before without the crash recovery.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled in db93333

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:dev-tools area:providers area:task-sdk backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch full tests needed We need to run full set of tests for this PR to merge provider:apache-spark provider:common-compat

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants