Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions faststream_outbox/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ async def delete_with_lease(
dlq_payload: "typing.Mapping[str, typing.Any] | None" = None,
) -> bool:
for i, row in enumerate(self._rows):
if row.id == message_id and row.acquired_token == acquired_token:
# ``acquired_token is not None`` mirrors SQL's ``WHERE acquired_token = :token``:
# ``NULL = NULL`` is NULL (no match), so a None token must never match — even a
# row whose own token is None. Without this, the fake's ``None == None`` would
# delete where the real client no-ops.
if row.id == message_id and acquired_token is not None and row.acquired_token == acquired_token:
if dlq_payload is not None:
# Mirror the real CTE side-effect: DLQ row materializes in the
# same call as the DELETE, before the row is removed.
Expand Down Expand Up @@ -197,7 +201,8 @@ async def mark_pending_with_lease(
last_attempt_at: _dt.datetime,
) -> bool:
for row in self._rows:
if row.id == message_id and row.acquired_token == acquired_token:
# Mirror SQL ``NULL = NULL`` semantics — see ``delete_with_lease``.
if row.id == message_id and acquired_token is not None and row.acquired_token == acquired_token:
row.next_attempt_at = _utcnow() + _dt.timedelta(seconds=max(0.0, delay_seconds))
row.attempts_count = attempts_count
row.first_attempt_at = first_attempt_at
Expand Down
12 changes: 7 additions & 5 deletions planning/archived/2026-06-12-code-audit-findings.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ status: shipped
date: 2026-06-12
slug: 2026-06-12-code-audit-findings
scope: faststream_outbox/ (package) + tests/ (test quality)
prs: [61, 66, 67, 68, 69, 70]
prs: [61, 66, 67, 68, 69, 70, 74]
findings_doc_prs: [62, 65]
releases: ["0.9.0", "0.9.1"]
outcome: >
All findings remediated, nothing deferred. Bugs B1–B16 (#61) + test-holes
T1–T8 (#66) + improvements P1–P35 (#67) shipped in 0.9.0; suspected S1–S5
(#68; S3 was already resolved by P17) + warning-attribution P27 (#69) +
test-broker dedup/NOTIFY P29/P30 (#70) shipped in 0.9.1.
All findings remediated. Bugs B1–B16 (#61) + test-holes T1–T8 (#66) +
improvements P1–P35 (#67) shipped in 0.9.0; suspected S1–S5 (#68; S3 was
already resolved by P17) + warning-attribution P27 (#69) + test-broker
dedup/NOTIFY P29/P30 (#70) shipped in 0.9.1. The "Additional test findings"
prose tail below the T1–T8 table (never tracked as its own IDs) was closed
in #74.
---

# Code audit findings — 2026-06-12
Expand Down
66 changes: 60 additions & 6 deletions tests/test_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,12 @@ class LeaseLostClient(FakeOutboxClient):
async def delete_with_lease(self, *args: object, **kwargs: object) -> bool: # noqa: ARG002
return False

broker = _make_broker()
events: list[tuple[str, dict[str, typing.Any]]] = []

def recorder(event: str, tags: Mapping[str, typing.Any]) -> None:
events.append((event, dict(tags)))

broker = OutboxBroker(outbox_table=make_outbox_table(MetaData()), metrics_recorder=recorder)
received: list[str] = []

@broker.subscriber("orders")
Expand All @@ -512,6 +517,10 @@ async def handle(body: str) -> None:
await broker.publish("lease-lost", queue="orders") # ty: ignore[missing-argument]

assert received == ["lease-lost"]
# P17 + lease-lost: the failed delete emits lease_lost(phase=terminal); the paired
# ``acked`` is suppressed (emit-after-flush), so a lease-lost row isn't double-counted.
assert [tags["phase"] for ev, tags in events if ev == "lease_lost"] == ["terminal"]
assert "acked" not in [ev for ev, _ in events]


async def test_fake_broker_publish_invokes_flush_retry_when_lease_lost() -> None:
Expand All @@ -521,7 +530,12 @@ class LeaseLostRetryClient(FakeOutboxClient):
async def mark_pending_with_lease(self, *args: object, **kwargs: object) -> bool: # noqa: ARG002
return False

broker = _make_broker()
events: list[tuple[str, dict[str, typing.Any]]] = []

def recorder(event: str, tags: Mapping[str, typing.Any]) -> None:
events.append((event, dict(tags)))

broker = OutboxBroker(outbox_table=make_outbox_table(MetaData()), metrics_recorder=recorder)
attempts: list[str] = []

@broker.subscriber("orders", retry_strategy=ConstantRetry(delay_seconds=0.05, max_attempts=10))
Expand All @@ -536,6 +550,10 @@ async def handle(body: str) -> None:
await broker.publish("never-cleared", queue="orders") # ty: ignore[missing-argument]

assert attempts == ["never-cleared"]
# The failed retry-update emits lease_lost(phase=retry); the paired ``nacked_retried``
# is suppressed (emit-after-flush), so a lease-lost row isn't double-counted.
assert [tags["phase"] for ev, tags in events if ev == "lease_lost"] == ["retry"]
assert "nacked_retried" not in [ev for ev, _ in events]


async def test_fake_broker_publish_swallows_post_consume_failure() -> None:
Expand Down Expand Up @@ -854,6 +872,10 @@ async def handle(body: str) -> None:
test_broker.feed("orders", p, headers=h)
await _wait_until(lambda: received, timeout=5.0)

# The worker's row had its lease token stripped at fetch -> _flush_terminal early-returns
# without deleting. Pin that the no-op preserved the row (was: only the handler ran).
assert len(test_broker.fake_client.rows) == 1


async def test_loop_mode_flush_retry_with_no_lease_token_is_noop() -> None:
class TokenStrippingClient(FakeOutboxClient):
Expand Down Expand Up @@ -884,6 +906,10 @@ async def handle(body: str) -> None:
test_broker.feed("orders", p, headers=h)
await _wait_until(lambda: attempts, timeout=5.0)

# _flush_retry early-returns on the stripped token -> the row is neither rescheduled nor
# lost; pin that it survives (was: only the handler ran).
assert len(test_broker.fake_client.rows) == 1


async def test_loop_mode_retry_strategy_can_branch_on_exception_type() -> None:
"""Subclass pattern from retry.py docstring: retry transient, terminate on permanent."""
Expand Down Expand Up @@ -1024,7 +1050,8 @@ async def test_broker_stop_cancels_wedged_handler_within_graceful_timeout_in_fak
"""T8: a wedged handler is cancelled within graceful_timeout (no 2x wait), row preserved — off-Postgres."""
metadata = MetaData()
t = make_outbox_table(metadata)
broker = OutboxBroker(outbox_table=t, graceful_timeout=0.3)
graceful_timeout = 1.0 # larger TTL -> load jitter is small relative to the 1x/2x gap
broker = OutboxBroker(outbox_table=t, graceful_timeout=graceful_timeout)
started = asyncio.Event()

@broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05)
Expand All @@ -1043,9 +1070,10 @@ async def handle(body: dict) -> None:
await sub.stop()
elapsed = asyncio.get_event_loop().time() - start

# ~graceful_timeout (0.3) + cancellation slack. The 2x-regression (re-waiting in
# super().stop()) would exceed 0.6s; 0.7s is the safe upper guard.
assert elapsed < 0.7, f"sub.stop() took {elapsed:.3f}s — strict-bound regression"
# Drain waits ~graceful_timeout for the wedged handler, then cancels (~1x). The 2x
# regression (super().stop() re-waiting another full budget) would take ~2x. A 1.6x
# bound separates them with generous slack so CI load can't flake it.
assert elapsed < graceful_timeout * 1.6, f"sub.stop() took {elapsed:.3f}s — 2x-drain regression"
assert len(test_broker.fake_client.rows) == 1 # handler cancelled pre-ack → row preserved (lease set)


Expand Down Expand Up @@ -1171,6 +1199,32 @@ async def test_fake_client_cancel_timer_skips_leased_row() -> None:
assert len(fake.rows) == 1


async def test_fake_client_terminal_writes_reject_none_token() -> None:
"""SQL parity: a None lease token never matches (NULL = NULL is NULL), even on an unleased row."""
fake = FakeOutboxClient()
row_id = fake.feed(queue="q", payload=b"x") # unleased -> acquired_token is None
assert row_id is not None
assert fake.rows[0].acquired_token is None
now = _dt.datetime.now(tz=_dt.UTC)
# Without the ``acquired_token is not None`` guard the fake's ``None == None`` would match
# this row, deleting / rescheduling where the real client's SQL no-ops.
assert await fake.delete_with_lease(None, row_id, None) is False # ty: ignore[invalid-argument-type]
assert len(fake.rows) == 1 # not deleted
assert (
await fake.mark_pending_with_lease(
None,
row_id,
None, # ty: ignore[invalid-argument-type]
delay_seconds=1.0,
attempts_count=1,
first_attempt_at=now,
last_attempt_at=now,
)
is False
)
assert fake.rows[0].acquired_token is None # unchanged


# --- AckPolicy plumbing (A) ----------------------------------------------------------


Expand Down
48 changes: 47 additions & 1 deletion tests/test_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import AsyncSession

from faststream_outbox import NoRetry, OutboxMessage, make_outbox_table
from faststream_outbox import NoRetry, OutboxMessage, OutboxResponse, make_outbox_table
from faststream_outbox.client import AbstractOutboxClient
from faststream_outbox.fastapi import (
OutboxBroker as AnnotatedOutboxBroker,
Expand Down Expand Up @@ -199,3 +199,49 @@ async def test_outbox_router_publisher_delegates_to_broker() -> None:
row_id = await publisher.publish({"v": 1}, session=AsyncMock(spec=AsyncSession))

assert row_id is not None


async def test_fastapi_handler_chains_via_outbox_response_with_per_delivery_session() -> None:
"""
Exercise the transactional contract end-to-end through the FastAPI wrapper.

The Depends-resolved session flows into the chained OutboxResponse, and each delivery
resolves its own fresh session (session-per-delivery).
"""
t = _make_outbox_table()
router = OutboxRouter(outbox_table=t)
sessions_seen: list[AsyncSession] = []
downstream: list[dict] = []

async def get_session() -> AsyncIterator[AsyncSession]:
s = AsyncMock(spec=AsyncSession)
sessions_seen.append(s)
yield s

session_dep = Depends(get_session)

@router.subscriber("orders")
async def handle_order(body: dict, session: AsyncSession = session_dep) -> OutboxResponse:
return OutboxResponse(body={"chained_from": body["id"]}, queue="downstream", session=session)

@router.subscriber("downstream")
async def handle_downstream(body: dict) -> None:
downstream.append(body)

app = _make_app_with_router(router)
with TestClient(app):
await router.broker.publish({"id": 1}, queue="orders") # ty: ignore[missing-argument]
await router.broker.publish({"id": 2}, queue="orders") # ty: ignore[missing-argument]

# OutboxResponse chaining works through the FastAPI wrapper: the bridged Depends session is
# the one the follow-on row is published with.
assert downstream == [{"chained_from": 1}, {"chained_from": 2}]
# Session-per-delivery: each "orders" delivery resolved its own fresh session via Depends.
assert len(sessions_seen) == 2
assert sessions_seen[0] is not sessions_seen[1]


def test_outbox_router_forwards_broker_kwargs_to_inner_broker() -> None:
"""End-to-end forwarding: an outbox-broker kwarg passed to OutboxRouter reaches the broker."""
router = OutboxRouter(outbox_table=_make_outbox_table(), graceful_timeout=3.5)
assert router.broker.config.graceful_timeout == 3.5
32 changes: 19 additions & 13 deletions tests/test_metrics_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,23 +144,29 @@ def test_otel_dlq_written_emits_counter_with_reason_attr() -> None:


def test_otel_meter_argument_takes_precedence_over_meter_provider() -> None:
reader = InMemoryMetricReader()
provider = MeterProvider(metric_readers=[reader])
explicit_meter = provider.get_meter("custom-meter")
rec = OpenTelemetryRecorder(meter=explicit_meter)
"""When both are passed, ``meter`` wins: data lands in the meter's reader, not the provider's."""
meter_reader = InMemoryMetricReader()
explicit_meter = MeterProvider(metric_readers=[meter_reader]).get_meter("explicit")
ignored_reader = InMemoryMetricReader()
ignored_provider = MeterProvider(metric_readers=[ignored_reader])

rec = OpenTelemetryRecorder(meter=explicit_meter, meter_provider=ignored_provider)
rec("fetched", {"queue": "q", "subscriber": "h", "count": 1})
# Smoke-test: explicit-meter path didn't raise; data lands in the same reader.
assert "messaging.outbox.fetch.batches" in _collect_metrics(reader)

assert "messaging.outbox.fetch.batches" in _collect_metrics(meter_reader) # used the meter
assert "messaging.outbox.fetch.batches" not in _collect_metrics(ignored_reader) # ignored the provider


def test_otel_default_meter_provider_path() -> None:
# Using neither meter nor meter_provider falls back to the global provider —
# which here is the no-op MeterProvider. Just verify the recorder constructs
# and accepts events without raising.
rec = OpenTelemetryRecorder()
rec("fetched", {"queue": "q", "subscriber": "h", "count": 0})
# Sanity: confirm the recorder pulled from the global provider (or its default).
assert ot_metrics.get_meter_provider() is not None
"""Neither meter nor meter_provider → the recorder pulls a meter from ``ot_metrics.get_meter``."""
reader = InMemoryMetricReader()
meter = MeterProvider(metric_readers=[reader]).get_meter("global-fallback")
with patch.object(ot_metrics, "get_meter", return_value=meter):
rec = OpenTelemetryRecorder()
rec("fetched", {"queue": "q", "subscriber": "h", "count": 1})
# The event landed in the meter the (patched) global provider handed out — proving the
# no-args path actually resolves a meter rather than silently dropping the instrument.
assert "messaging.outbox.fetch.batches" in _collect_metrics(reader)


def test_otel_nacked_retried_stamps_error_type_attribute() -> None:
Expand Down
34 changes: 34 additions & 0 deletions tests/test_middleware_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import AsyncSession

Expand Down Expand Up @@ -239,3 +241,35 @@ def test_outbox_telemetry_middleware_raises_friendly_error_when_extra_missing()
pytest.raises(ImportError, match=r"pip install 'faststream-outbox\[opentelemetry\]'"),
):
OutboxTelemetryMiddleware()


async def test_outbox_telemetry_middleware_emits_consume_span_with_outbox_attributes() -> None:
"""The middleware owns the consume-scope span (the recorder seam can't): assert it fires."""
reader = InMemoryMetricReader()
span_exporter = InMemorySpanExporter()
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter))

metadata = MetaData()
table = make_outbox_table(metadata)
meter_provider = MeterProvider(metric_readers=[reader])
broker = OutboxBroker(
outbox_table=table,
middlewares=[ # ty: ignore[invalid-argument-type]
OutboxTelemetryMiddleware(meter_provider=meter_provider, tracer_provider=tracer_provider)
],
)

@broker.subscriber("orders")
async def handle(body: dict) -> None:
pass

async with TestOutboxBroker(broker):
await broker.publish({"x": 1}, queue="orders", session=_session_mock())

spans = span_exporter.get_finished_spans()
outbox_spans = [sp for sp in spans if dict(sp.attributes or {}).get("messaging.system") == "outbox"]
assert outbox_spans, f"no outbox consume span emitted; spans={[sp.name for sp in spans]}"
attrs = dict(outbox_spans[0].attributes or {})
assert attrs["messaging.system"] == "outbox"
assert attrs["messaging.destination_publish.name"] == "orders"
30 changes: 26 additions & 4 deletions tests/test_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ async def test_naked_decorator_chain_relays_plain_return_to_kafka() -> None:
async def relay(body: dict[str, Any]) -> dict[str, Any]:
return body

# TestKafkaBroker first, TestOutboxBroker second — see "Context-manager
# ordering note" at the end of this plan. The foreign broker's mock producer
# must be wired before our subscriber starts (Task 6 introduces a startup
# warning that probes foreign producers).
# TestKafkaBroker first, TestOutboxBroker second: the foreign broker's mock producer
# must be wired before our subscriber starts, because the outbox's start() probes
# foreign producers and warns about any whose broker isn't started yet.
async with TestKafkaBroker(broker_kafka), TestOutboxBroker(broker_outbox, run_loops=False) as outbox:
await outbox.publish({"hello": "world"}, queue="relay_queue", session=None) # ty: ignore[invalid-argument-type]
publisher_kafka.mock.assert_called_once_with({"hello": "world"})
Expand Down Expand Up @@ -288,6 +287,29 @@ async def relay(body: dict[str, Any]) -> dict[str, Any]:
)


async def test_started_foreign_broker_does_not_warn_on_start(caplog: pytest.LogCaptureFixture) -> None:
"""Negative of the unstarted-broker warning: a STARTED foreign broker emits no warning at start()."""
metadata = MetaData()
outbox_table = make_outbox_table(metadata)
broker_outbox = OutboxBroker(outbox_table=outbox_table)
broker_kafka = KafkaBroker("kafka://test:9092")
publisher_kafka = broker_kafka.publisher("relay_topic")

@publisher_kafka
@broker_outbox.subscriber("relay_queue")
async def relay(body: dict[str, Any]) -> dict[str, Any]:
return body # pragma: no cover — never invoked; the test only exercises start()

with caplog.at_level(logging.WARNING, logger="faststream_outbox"):
# TestKafkaBroker wires the foreign producer before the outbox starts, so the
# start()-time probe sees it as started and stays silent.
async with TestKafkaBroker(broker_kafka), TestOutboxBroker(broker_outbox, run_loops=False):
pass

foreign = [r.getMessage() for r in caplog.records if "has not been started" in r.getMessage()]
assert foreign == []


async def test_foreign_warning_names_only_decorated_subscriber_queues(caplog: pytest.LogCaptureFixture) -> None:
"""P21: the unstarted-foreign-broker warning names the decorated subscriber's queue, not every queue."""
metadata = MetaData()
Expand Down
Loading