Collect worker metrics after stream drop#503
Merged
gabotechs merged 1 commit intoJun 18, 2026
Merged
Conversation
301ce72 to
009384c
Compare
009384c to
ca5e6b1
Compare
gabotechs
approved these changes
Jun 18, 2026
gabotechs
left a comment
Collaborator
There was a problem hiding this comment.
🤔 This actually makes sense
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Worker task metrics are sent when the last partition stream for a task finishes or is dropped. Before this change, that happened from the generic
on_drop_streamcallback. That callback ran fromPinnedDrop, before the wrapped inner stream field was actually dropped.That ordering matters for execution plans whose metrics are finalized by stream
Drop: the worker could snapshot and send task metrics before those drop-finalized metrics had been flushed. This is easiest to hit when a downstream operator stops early, for example because of aLIMIT.This PR makes the shared
on_drop_streamhelper use the safer ordering: it drops the wrapped stream first, then runs the callback. The worker metrics path can keep using the common helper, but the callback now observes metrics finalized by the inner execution stream.Impact
This changes the timing of
on_drop_streamcallbacks, not the distributed metrics protocol. Workers still send the sameTaskMetricsmessage to the coordinator, and the coordinator still rewrites/display metrics the same way. The difference is that the worker now takes its metrics snapshot after the wrapped stream has completed its ownDroplogic.For cleanup-oriented uses of
on_drop_stream, the callback still runs when the wrapper is dropped. It just runs after the inner stream is dropped, which is the less surprising ordering for code that wants to observe final stream state.Rust Pinning Detail
OnDropStreamstores an arbitrary inner stream, and streams are allowed to be!Unpin. In plain terms, that means Rust may require the stream to stay at the same memory address after polling starts. We cannot safely move the inner stream out ofOnDropStreamjust to drop it before running the callback.The nested enum plus
pin_projectis the small bit of machinery that makes this safe:#[pin_project]generates projection helpers for pinned fields. Theproject_replacehelper letsPinnedDropreplaceLive(inner)withDroppedwithout illegally moving a pinned stream. During that replacement, the oldLive(inner)value is dropped, so the wrapped stream gets itsDropfirst. Only after that doeson_drop_streamrun the callback.The new regression test uses a stream that flips a flag in its own
Drop, then asserts the callback can already observe that flag. That test captures the intended ordering directly.Validation
cargo fmtcargo test common::on_drop_streamcargo test fires_on_drop_after_inner_stream_is_droppedcargo test --test metrics_collection