Skip to content

[BUG] Race condition inside RedisStreamBroker #123

@AivazianArtur

Description

@AivazianArtur

Bug: race condition in RedisStreamBroker.listen causes duplicate task execution via xautoclaim

Environment

  • taskiq-redis: 1.0.9
  • Python: 3.11
  • Redis: 7.2.5

Description

The autoclaim section in listen() uses a two-step lock check that is not atomic, which allows multiple workers to call XAUTOCLAIM concurrently on the same pending message(which shouldn't)

# Following is not atomic
if await lock.locked():
    continue
async with lock:
    pending = await redis_conn.xautoclaim(

Summary

Expected behavior:
Concurrent XAUTOCLAIM calls should not result in the same message being yielded to multiple workers simultaneously.

Actual behavior:
Multiple workers can execute XAUTOCLAIM concurrently, leading to duplicate task execution.

This increases the probability of duplicate task execution beyond the natural at-least-once guarantees of Redis Streams.

Debugging

Look how it was caught, it was literally same moment. I masked request_id, but they are same. And in our system this has happened more than once in minute, not rare case

[2026-04-08 17:16:00,342][root][INFO   ][worker-1][d2161072-00d2-4923-b7f9-5ed2e7be32d5] Starting taskiq task with request_id = 'X'
[2026-04-08 17:16:00,342][root][INFO   ][worker-0][0a4bc39a-6594-45e4-b0d0-f7ef7b4f199c] Starting taskiq task with request_id = 'X'

Reproduction

And here is an example around this case to show how it can be done:

import asyncio
import redis.asyncio as aioredis

STREAM, GROUP = "taskiq_bug", "taskiq"


async def setup():
    r = await aioredis.from_url("redis://localhost:6379")

    await r.delete(STREAM)
    await r.delete(f"autoclaim:{GROUP}:{STREAM}")

    await r.xgroup_create(STREAM, GROUP, id="0", mkstream=True)
    msg_id = await r.xadd(STREAM, {"data": b"test"})

    await r.xreadgroup(GROUP, "worker-seed", {STREAM: ">"}, count=1)

    await r.aclose()
    return msg_id


async def worker(name: str):
    conn = await aioredis.from_url("redis://localhost:6379")

    lock = conn.lock(f"autoclaim:{GROUP}:{STREAM}")

    if await lock.locked():  #HERE, window for RC is opening 
        await conn.aclose()
        return

    async with lock:  #HERE, another worker can enter simultaneously
        result = await conn.xautoclaim(
            STREAM,
            GROUP,
            name,
            min_idle_time=100,
            count=10,
        )
        if result[1]:
            print(f"!!! {name} claimed: {result[1]}")

    await conn.aclose()


async def run_once(i: int):
    print(f"\n--- RUN {i} ---")

    await setup()
    await asyncio.sleep(0.2)

    await asyncio.gather(
        worker("worker-0"),
        worker("worker-1"),
    )


async def main():
    for i in range(25):
        await run_once(i)
        await asyncio.sleep(0.1) 


if __name__ == "__main__":
    asyncio.run(main())

In ouput you will see that different workers are running the same tasks

Proposal fix

Replace the two-step check+acquire with an atomic SET NX PX:

lock_key   = f"autoclaim:{self.consumer_group_name}:{stream}"
lock_value = f"{self.consumer_name}-{uuid.uuid4()}"

acquired = await redis_conn.set(lock_key, lock_value, nx=True, px=30_000)
if not acquired:
    continue

try:
    pending = await redis_conn.xautoclaim(...)
    for msg_id, msg in pending[1]:
        yield AckableMessage(...)
finally:
    await redis_conn.eval(
        """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
        """,
        1, lock_key, lock_value,
    )

I'd like to discuss and prepare PR after this

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions