Skip to content

[PD][Nixl] Remote consumer READ timeout for clearing request blocks #20139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

NickLucche
Copy link
Contributor

@NickLucche NickLucche commented Jun 26, 2025

With #19223, we're addressing most of the cases where P request blocks may be left starving.
However, there are still cases where if the router fails to communicate request abortion for whatever reason (eg in-flight request lost, router down..) while the request has not yet reached D or D fails to communicate the abortion to P, where the remote producer may be left with blocks that won't be cleared.

This PR addresses these final edge-cases by attaching a simple TTL to every request that needs to be read from local(D)<-remote (P).

cc @njhill

Signed-off-by: NickLucche <[email protected]>
Signed-off-by: NickLucche <[email protected]>
Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @NickLucche, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

I've addressed a critical edge case in the disaggregated decode-prefill setup where KV cache blocks on the remote producer (P) could become permanently stranded if the consumer (D) failed to communicate request abortion or completion. This pull request introduces a robust timeout mechanism that automatically frees these blocks after a configurable period, significantly improving resource utilization and system stability in failure scenarios.

Highlights

  • KV Cache Block Timeout: I've implemented a timeout mechanism to automatically clear KV cache blocks on the remote producer (P) if the consumer (D) fails to read them within a specified duration. This prevents resource starvation in disaggregated decode-prefill setups.
  • Configurable Timeout: A new environment variable, VLLM_NIXL_ABORT_REQUEST_TIMEOUT, has been introduced, allowing users to configure the timeout duration for remote consumer reads (defaulting to 120 seconds).
  • NixlConnector Enhancements: I've modified the NixlConnector and NixlConnectorWorker to track requests that have completed prefill on the producer and are awaiting consumption by the decoder, enabling the new timeout logic to be applied.
  • Unit Test Coverage: A dedicated unit test (test_abort_timeout_on_prefiller) has been added to validate the end-to-end functionality of the remote consumer read timeout, simulating a scenario where communication fails and blocks are eventually cleared.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This PR introduces a timeout mechanism to clear request blocks in the remote producer when the router fails to communicate request abortion, addressing an edge case in the NixlConnector. The changes include adding a TTL to requests, updating metadata, and implementing timeout handling in the worker. The code also includes a new unit test to verify the timeout functionality.

Comment on lines +861 to 863
if self._done_sending_count[req_id] >= self.world_size:
del self._done_sending_count[req_id]
all_done_sending.add(req_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The removal of self._reqs_to_send[req_id] here might lead to a race condition if _get_new_notifs is called concurrently from multiple threads. Consider using a lock to protect access to self._reqs_to_send or using a thread-safe data structure.

Suggested change
if self._done_sending_count[req_id] >= self.world_size:
del self._done_sending_count[req_id]
all_done_sending.add(req_id)
notified_req_ids.add(req_id)
del self.consumer_notification_counts_by_req[req_id]
with self._handshake_lock:
del self._reqs_to_send[req_id]

Comment on lines +861 to 863
if self._done_sending_count[req_id] >= self.world_size:
del self._done_sending_count[req_id]
all_done_sending.add(req_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

It's crucial to ensure that self._reqs_to_send is thread-safe, especially when dealing with asynchronous operations and potential concurrent access. Using a standard dictionary without any synchronization mechanisms can lead to race conditions and data corruption. Consider using threading.Lock or asyncio.Lock to protect access to this dictionary.

Suggested change
if self._done_sending_count[req_id] >= self.world_size:
del self._done_sending_count[req_id]
all_done_sending.add(req_id)
self._reqs_to_send_lock = threading.Lock() # Or asyncio.Lock for async context
def _get_new_notifs(self) -> set[str]:
"""
Get req_ids which got a remote xfer message. When multiple consumers
are reading from the same producer (heterogeneous TP scenario), wait
for all consumers to be done pulling.
"""
notified_req_ids: set[str] = set()
for notifs in self.nixl_wrapper.get_new_notifs().values():
for notif in notifs:
req_id, tp_ratio = notif.decode("utf-8").rsplit(":", 1)
self.consumer_notification_counts_by_req[req_id] += 1
# Wait all consumers (D) to be done reading before freeing.
if self.consumer_notification_counts_by_req[req_id] == int(
tp_ratio):
notified_req_ids.add(req_id)
del self.consumer_notification_counts_by_req[req_id]
with self._reqs_to_send_lock:
del self._reqs_to_send[req_id]

Comment on lines +861 to 863
if self._done_sending_count[req_id] >= self.world_size:
del self._done_sending_count[req_id]
all_done_sending.add(req_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider using discard instead of del to avoid KeyError if the key is not present.

                    notified_req_ids.add(req_id)
                    self.consumer_notification_counts_by_req.discard(req_id)
                    self._reqs_to_send.discard(req_id)

Comment on lines +815 to +828
for req_id, finish_time in self._reqs_to_send.items():
if finish_time < 0:
# Request just finished, start timeout.
self._reqs_to_send[req_id] = now
elif now - finish_time >= envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT:
# Timeout exceed, clear the request blocks.
timed_out_requests.append(req_id)

for req_id in timed_out_requests:
# Skip communication with other ranks, but
if self.tp_rank == 0:
self._done_sending_count[req_id] += self.world_size
done_sending.add(req_id)
del self._reqs_to_send[req_id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for handling timeouts and clearing requests could be simplified. Instead of checking finish_time < 0, you can directly set the timeout when the request finishes and then check if the timeout has exceeded. This makes the code more readable and easier to understand.

        # Handle timeout to avoid stranding blocks on remote.
        now = time.perf_counter()
        timed_out_requests: list[str] = []
        for req_id, start_time in self._reqs_to_send.items():
            if now - start_time >= envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT:
                # Timeout exceed, clear the request blocks.
                timed_out_requests.append(req_id)

        for req_id in timed_out_requests:
            # Skip communication with other ranks, but
            if self.tp_rank == 0:
                self._done_sending_count[req_id] += self.world_size
                done_sending.add(req_id)
            del self._reqs_to_send[req_id]

Comment on lines +861 to 863
if self._done_sending_count[req_id] >= self.world_size:
del self._done_sending_count[req_id]
all_done_sending.add(req_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The code deletes keys from dictionaries while iterating through them, which can lead to unexpected behavior or errors. It's safer to collect the keys to be deleted and then delete them after the loop.

        notified_req_ids: set[str] = set()
        reqs_to_remove = []
        for notifs in self.nixl_wrapper.get_new_notifs().values():
            for notif in notifs:
                req_id, tp_ratio = notif.decode("utf-8").rsplit(":", 1)
                self.consumer_notification_counts_by_req[req_id] += 1
                # Wait all consumers (D) to be done reading before freeing.
                if self.consumer_notification_counts_by_req[req_id] == int(
                        tp_ratio):
                    notified_req_ids.add(req_id)
                    self.consumer_notification_counts_by_req.pop(req_id, None)
                    reqs_to_remove.append(req_id)
        for req_id in reqs_to_remove:
            self._reqs_to_send.pop(req_id, None)

Comment on lines +815 to +828
for req_id, finish_time in self._reqs_to_send.items():
if finish_time < 0:
# Request just finished, start timeout.
self._reqs_to_send[req_id] = now
elif now - finish_time >= envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT:
# Timeout exceed, clear the request blocks.
timed_out_requests.append(req_id)

for req_id in timed_out_requests:
# Skip communication with other ranks, but
if self.tp_rank == 0:
self._done_sending_count[req_id] += self.world_size
done_sending.add(req_id)
del self._reqs_to_send[req_id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout mechanism implemented here relies on time.perf_counter(), which is susceptible to system clock adjustments. If the system clock is adjusted backward, it could cause requests to timeout prematurely. Consider using a monotonic clock source that is not affected by system clock changes.

import time

# Use time.monotonic() instead of time.perf_counter()
now = time.monotonic()
timed_out_requests: list[str] = []
for req_id, finish_time in self._reqs_to_send.items():
    if finish_time < 0:
        # Request just finished, start timeout.
        self._reqs_to_send[req_id] = now
    elif now - finish_time >= envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT:
        # Timeout exceed, clear the request blocks.
        timed_out_requests.append(req_id)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant