Skip to content

Collect worker metrics after stream drop#503

Merged
gabotechs merged 1 commit into
datafusion-contrib:mainfrom
geoffreyclaude:fix/worker-metrics-after-stream-drop
Jun 18, 2026
Merged

Collect worker metrics after stream drop#503
gabotechs merged 1 commit into
datafusion-contrib:mainfrom
geoffreyclaude:fix/worker-metrics-after-stream-drop

Conversation

@geoffreyclaude

@geoffreyclaude geoffreyclaude commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator

Summary

Worker task metrics are sent when the last partition stream for a task finishes or is dropped. Before this change, that happened from the generic on_drop_stream callback. That callback ran from PinnedDrop, before the wrapped inner stream field was actually dropped.

That ordering matters for execution plans whose metrics are finalized by stream Drop: the worker could snapshot and send task metrics before those drop-finalized metrics had been flushed. This is easiest to hit when a downstream operator stops early, for example because of a LIMIT.

This PR makes the shared on_drop_stream helper use the safer ordering: it drops the wrapped stream first, then runs the callback. The worker metrics path can keep using the common helper, but the callback now observes metrics finalized by the inner execution stream.

Impact

This changes the timing of on_drop_stream callbacks, not the distributed metrics protocol. Workers still send the same TaskMetrics message to the coordinator, and the coordinator still rewrites/display metrics the same way. The difference is that the worker now takes its metrics snapshot after the wrapped stream has completed its own Drop logic.

For cleanup-oriented uses of on_drop_stream, the callback still runs when the wrapper is dropped. It just runs after the inner stream is dropped, which is the less surprising ordering for code that wants to observe final stream state.

Rust Pinning Detail

OnDropStream stores an arbitrary inner stream, and streams are allowed to be !Unpin. In plain terms, that means Rust may require the stream to stay at the same memory address after polling starts. We cannot safely move the inner stream out of OnDropStream just to drop it before running the callback.

The nested enum plus pin_project is the small bit of machinery that makes this safe:

#[pin_project(project = OnDropInnerProj, project_replace = OnDropInnerProjOwn)]
enum OnDropInner<S> {
    Live(#[pin] S),
    Dropped,
}

#[pin_project] generates projection helpers for pinned fields. The project_replace helper lets PinnedDrop replace Live(inner) with Dropped without illegally moving a pinned stream. During that replacement, the old Live(inner) value is dropped, so the wrapped stream gets its Drop first. Only after that does on_drop_stream run the callback.

The new regression test uses a stream that flips a flag in its own Drop, then asserts the callback can already observe that flag. That test captures the intended ordering directly.

Validation

  • cargo fmt
  • cargo test common::on_drop_stream
  • cargo test fires_on_drop_after_inner_stream_is_dropped
  • cargo test --test metrics_collection

@geoffreyclaude geoffreyclaude force-pushed the fix/worker-metrics-after-stream-drop branch 2 times, most recently from 301ce72 to 009384c Compare June 17, 2026 16:10
@geoffreyclaude geoffreyclaude force-pushed the fix/worker-metrics-after-stream-drop branch from 009384c to ca5e6b1 Compare June 17, 2026 16:11
@geoffreyclaude geoffreyclaude marked this pull request as ready for review June 17, 2026 16:15

@gabotechs gabotechs left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 This actually makes sense

@gabotechs gabotechs merged commit 12d31b6 into datafusion-contrib:main Jun 18, 2026
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants