Skip to content

vllm.distributed.kv_events

MEDIUM_GPU module-attribute

MEDIUM_GPU = 'GPU'

logger module-attribute

logger = init_logger(__name__)

AllBlocksCleared

Bases: KVCacheEvent

Source code in vllm/distributed/kv_events.py
class AllBlocksCleared(KVCacheEvent):
    pass

BlockRemoved

Bases: KVCacheEvent

Source code in vllm/distributed/kv_events.py
class BlockRemoved(KVCacheEvent):
    block_hashes: list[ExternalBlockHash]
    medium: str | None

    def __hash__(self) -> int:
        return hash((tuple(self.block_hashes), self.medium))

block_hashes instance-attribute

block_hashes: list[ExternalBlockHash]

medium instance-attribute

medium: str | None

__hash__

__hash__() -> int
Source code in vllm/distributed/kv_events.py
def __hash__(self) -> int:
    return hash((tuple(self.block_hashes), self.medium))

BlockStored

Bases: KVCacheEvent

Source code in vllm/distributed/kv_events.py
class BlockStored(KVCacheEvent):
    block_hashes: list[ExternalBlockHash]
    parent_block_hash: ExternalBlockHash | None
    token_ids: list[int]
    block_size: int
    lora_id: int | None
    medium: str | None

    def __hash__(self) -> int:
        return hash(
            (
                tuple(self.block_hashes),
                self.parent_block_hash,
                tuple(self.token_ids),
                self.block_size,
                self.lora_id,
                self.medium,
            )
        )

block_hashes instance-attribute

block_hashes: list[ExternalBlockHash]

block_size instance-attribute

block_size: int

lora_id instance-attribute

lora_id: int | None

medium instance-attribute

medium: str | None

parent_block_hash instance-attribute

parent_block_hash: ExternalBlockHash | None

token_ids instance-attribute

token_ids: list[int]

__hash__

__hash__() -> int
Source code in vllm/distributed/kv_events.py
def __hash__(self) -> int:
    return hash(
        (
            tuple(self.block_hashes),
            self.parent_block_hash,
            tuple(self.token_ids),
            self.block_size,
            self.lora_id,
            self.medium,
        )
    )

EventBatch

Bases: Struct

Source code in vllm/distributed/kv_events.py
class EventBatch(
    msgspec.Struct,
    array_like=True,  # type: ignore[call-arg]
    omit_defaults=True,  # type: ignore[call-arg]
    gc=False,  # type: ignore[call-arg]
):
    ts: float
    events: list[Any]
    data_parallel_rank: int | None = None

data_parallel_rank class-attribute instance-attribute

data_parallel_rank: int | None = None

events instance-attribute

events: list[Any]

ts instance-attribute

ts: float

EventPublisher

Bases: ABC

Lightweight publisher for EventBatch batches with data parallelism support.

In data parallel setups, each DP rank runs its own EventPublisher instance to avoid duplicate events and ensure proper event attribution:

  • Each DP rank creates a separate publisher
  • Publishers automatically annotate events with their data_parallel_rank
  • This allows consumers to distinguish events from different DP ranks

The publisher is responsible for adding DP metadata since the scheduler operates independently of DP topology and shouldn't need DP awareness.

Source code in vllm/distributed/kv_events.py
class EventPublisher(ABC):
    """Lightweight publisher for EventBatch batches with data parallelism
    support.

    In data parallel setups, each DP rank runs its own EventPublisher instance
    to avoid duplicate events and ensure proper event attribution:

    - Each DP rank creates a separate publisher
    - Publishers automatically annotate events with their data_parallel_rank
    - This allows consumers to distinguish events from different DP ranks

    The publisher is responsible for adding DP metadata since the scheduler
    operates independently of DP topology and shouldn't need DP awareness.
    """

    def __init__(self, data_parallel_rank: int = 0) -> None:
        self._data_parallel_rank = data_parallel_rank

    @abstractmethod
    def publish(self, events: EventBatch) -> None:
        """Emit events in order.

        Implementations should guarantee at-least-once delivery and
        monotonic ordering (e.g., via sequence numbers).
        """

    @abstractmethod
    def shutdown(self) -> None:
        """Shutdown the publisher."""

_data_parallel_rank instance-attribute

_data_parallel_rank = data_parallel_rank

__init__

__init__(data_parallel_rank: int = 0) -> None
Source code in vllm/distributed/kv_events.py
def __init__(self, data_parallel_rank: int = 0) -> None:
    self._data_parallel_rank = data_parallel_rank

publish abstractmethod

publish(events: EventBatch) -> None

Emit events in order.

Implementations should guarantee at-least-once delivery and monotonic ordering (e.g., via sequence numbers).

Source code in vllm/distributed/kv_events.py
@abstractmethod
def publish(self, events: EventBatch) -> None:
    """Emit events in order.

    Implementations should guarantee at-least-once delivery and
    monotonic ordering (e.g., via sequence numbers).
    """

shutdown abstractmethod

shutdown() -> None

Shutdown the publisher.

Source code in vllm/distributed/kv_events.py
@abstractmethod
def shutdown(self) -> None:
    """Shutdown the publisher."""

EventPublisherFactory

Source code in vllm/distributed/kv_events.py
class EventPublisherFactory:
    _registry: dict[str, Callable[..., EventPublisher]] = {
        "null": NullEventPublisher,
        "zmq": ZmqEventPublisher,
    }

    @classmethod
    def register_publisher(cls, name: str, ctor: Callable[..., EventPublisher]) -> None:
        if name in cls._registry:
            raise KeyError(f"publisher '{name}' already registered")
        cls._registry[name] = ctor

    @classmethod
    def create(
        cls, config: KVEventsConfig | None, data_parallel_rank: int = 0
    ) -> EventPublisher:
        """Create publisher from a config mapping."""
        if (
            config is None
            or not config.enable_kv_cache_events
            or config.publisher == "null"
        ):
            return NullEventPublisher()

        config_dict = asdict(config)

        kind = config_dict.pop("publisher")
        config_dict.pop("enable_kv_cache_events")
        try:
            constructor = cls._registry[kind]
        except KeyError as exc:
            raise ValueError(f"Unknown event publisher '{kind}'") from exc
        return constructor(data_parallel_rank=data_parallel_rank, **config_dict)

_registry class-attribute instance-attribute

_registry: dict[str, Callable[..., EventPublisher]] = {
    "null": NullEventPublisher,
    "zmq": ZmqEventPublisher,
}

create classmethod

create(
    config: KVEventsConfig | None,
    data_parallel_rank: int = 0,
) -> EventPublisher

Create publisher from a config mapping.

Source code in vllm/distributed/kv_events.py
@classmethod
def create(
    cls, config: KVEventsConfig | None, data_parallel_rank: int = 0
) -> EventPublisher:
    """Create publisher from a config mapping."""
    if (
        config is None
        or not config.enable_kv_cache_events
        or config.publisher == "null"
    ):
        return NullEventPublisher()

    config_dict = asdict(config)

    kind = config_dict.pop("publisher")
    config_dict.pop("enable_kv_cache_events")
    try:
        constructor = cls._registry[kind]
    except KeyError as exc:
        raise ValueError(f"Unknown event publisher '{kind}'") from exc
    return constructor(data_parallel_rank=data_parallel_rank, **config_dict)

register_publisher classmethod

register_publisher(
    name: str, ctor: Callable[..., EventPublisher]
) -> None
Source code in vllm/distributed/kv_events.py
@classmethod
def register_publisher(cls, name: str, ctor: Callable[..., EventPublisher]) -> None:
    if name in cls._registry:
        raise KeyError(f"publisher '{name}' already registered")
    cls._registry[name] = ctor

KVCacheEvent

Bases: Struct

Base class for all KV cache-related events

Source code in vllm/distributed/kv_events.py
class KVCacheEvent(
    msgspec.Struct,
    array_like=True,  # type: ignore[call-arg]
    omit_defaults=True,  # type: ignore[call-arg]
    gc=False,  # type: ignore[call-arg]
    tag=True,
):
    """Base class for all KV cache-related events"""

KVConnectorKVEvents

Bases: ABC

Abstract base class for KV events. Acts as a container for KV events from the connector.

Source code in vllm/distributed/kv_events.py
class KVConnectorKVEvents(ABC):
    """
    Abstract base class for KV events.
    Acts as a container for KV events from the connector.
    """

    @abstractmethod
    def add_events(self, events: list[KVCacheEvent]) -> None:
        raise NotImplementedError

    @abstractmethod
    def aggregate(self) -> "KVConnectorKVEvents":
        raise NotImplementedError

    @abstractmethod
    def increment_workers(self, count: int = 1) -> None:
        raise NotImplementedError

    @abstractmethod
    def get_all_events(self) -> list[KVCacheEvent]:
        raise NotImplementedError

    @abstractmethod
    def get_number_of_workers(self) -> int:
        raise NotImplementedError

    @abstractmethod
    def clear_events(self) -> None:
        raise NotImplementedError

add_events abstractmethod

add_events(events: list[KVCacheEvent]) -> None
Source code in vllm/distributed/kv_events.py
@abstractmethod
def add_events(self, events: list[KVCacheEvent]) -> None:
    raise NotImplementedError

aggregate abstractmethod

aggregate() -> KVConnectorKVEvents
Source code in vllm/distributed/kv_events.py
@abstractmethod
def aggregate(self) -> "KVConnectorKVEvents":
    raise NotImplementedError

clear_events abstractmethod

clear_events() -> None
Source code in vllm/distributed/kv_events.py
@abstractmethod
def clear_events(self) -> None:
    raise NotImplementedError

get_all_events abstractmethod

get_all_events() -> list[KVCacheEvent]
Source code in vllm/distributed/kv_events.py
@abstractmethod
def get_all_events(self) -> list[KVCacheEvent]:
    raise NotImplementedError

get_number_of_workers abstractmethod

get_number_of_workers() -> int
Source code in vllm/distributed/kv_events.py
@abstractmethod
def get_number_of_workers(self) -> int:
    raise NotImplementedError

increment_workers abstractmethod

increment_workers(count: int = 1) -> None
Source code in vllm/distributed/kv_events.py
@abstractmethod
def increment_workers(self, count: int = 1) -> None:
    raise NotImplementedError

KVEventAggregator

Aggregates KV events across multiple workers. Tracks how many times each event appears and returns only those that were emitted by all workers.

Source code in vllm/distributed/kv_events.py
class KVEventAggregator:
    """
    Aggregates KV events across multiple workers.
    Tracks how many times each event appears and returns only those
    that were emitted by all workers.
    """

    __slots__ = ("_event_counter", "_num_workers")

    def __init__(self, num_workers: int) -> None:
        if num_workers <= 0:
            raise ValueError("num_workers must be greater than zero.")
        self._event_counter: Counter[KVCacheEvent] = Counter()
        self._num_workers: int = num_workers

    def add_events(self, events: list[KVCacheEvent]) -> None:
        """
        Add events from a worker batch.

        :param events: List of KVCacheEvent objects.
        """
        if not isinstance(events, list):
            raise TypeError("events must be a list of KVCacheEvent.")
        self._event_counter.update(events)

    def get_common_events(self) -> list[KVCacheEvent]:
        """
        Return events that appeared in all workers.

        :return: List of events present in all workers.
        """
        return [
            event
            for event, count in self._event_counter.items()
            if count == self._num_workers
        ]

    def get_all_events(self) -> list[KVCacheEvent]:
        """
        Return all events for all workers.

        :return: List of events for all workers.
        """
        return list(self._event_counter.elements())

    def clear_events(self) -> None:
        """
        Clear all tracked events.
        """
        self._event_counter.clear()

    def increment_workers(self, count: int = 1) -> None:
        """
        Increment the number of workers contributing events.

        :param count: Number to increment the workers by.
        """
        if count <= 0:
            raise ValueError("count must be positive.")
        self._num_workers += count

    def reset_workers(self) -> None:
        """
        Reset the number of workers to 1.
        """
        self._num_workers = 1

    def get_number_of_workers(self) -> int:
        """
        Return the number of workers.

        :return: int number of workers.
        """
        return self._num_workers

    def __repr__(self) -> str:
        return (
            f"<KVEventAggregator workers={self._num_workers}, "
            f"events={len(self._event_counter)}>"
        )

__slots__ class-attribute instance-attribute

__slots__ = ('_event_counter', '_num_workers')

_event_counter instance-attribute

_event_counter: Counter[KVCacheEvent] = Counter()

_num_workers instance-attribute

_num_workers: int = num_workers

__init__

__init__(num_workers: int) -> None
Source code in vllm/distributed/kv_events.py
def __init__(self, num_workers: int) -> None:
    if num_workers <= 0:
        raise ValueError("num_workers must be greater than zero.")
    self._event_counter: Counter[KVCacheEvent] = Counter()
    self._num_workers: int = num_workers

__repr__

__repr__() -> str
Source code in vllm/distributed/kv_events.py
def __repr__(self) -> str:
    return (
        f"<KVEventAggregator workers={self._num_workers}, "
        f"events={len(self._event_counter)}>"
    )

add_events

add_events(events: list[KVCacheEvent]) -> None

Add events from a worker batch.

:param events: List of KVCacheEvent objects.

Source code in vllm/distributed/kv_events.py
def add_events(self, events: list[KVCacheEvent]) -> None:
    """
    Add events from a worker batch.

    :param events: List of KVCacheEvent objects.
    """
    if not isinstance(events, list):
        raise TypeError("events must be a list of KVCacheEvent.")
    self._event_counter.update(events)

clear_events

clear_events() -> None

Clear all tracked events.

Source code in vllm/distributed/kv_events.py
def clear_events(self) -> None:
    """
    Clear all tracked events.
    """
    self._event_counter.clear()

get_all_events

get_all_events() -> list[KVCacheEvent]

Return all events for all workers.

:return: List of events for all workers.

Source code in vllm/distributed/kv_events.py
def get_all_events(self) -> list[KVCacheEvent]:
    """
    Return all events for all workers.

    :return: List of events for all workers.
    """
    return list(self._event_counter.elements())

get_common_events

get_common_events() -> list[KVCacheEvent]

Return events that appeared in all workers.

:return: List of events present in all workers.

Source code in vllm/distributed/kv_events.py
def get_common_events(self) -> list[KVCacheEvent]:
    """
    Return events that appeared in all workers.

    :return: List of events present in all workers.
    """
    return [
        event
        for event, count in self._event_counter.items()
        if count == self._num_workers
    ]

get_number_of_workers

get_number_of_workers() -> int

Return the number of workers.

:return: int number of workers.

Source code in vllm/distributed/kv_events.py
def get_number_of_workers(self) -> int:
    """
    Return the number of workers.

    :return: int number of workers.
    """
    return self._num_workers

increment_workers

increment_workers(count: int = 1) -> None

Increment the number of workers contributing events.

:param count: Number to increment the workers by.

Source code in vllm/distributed/kv_events.py
def increment_workers(self, count: int = 1) -> None:
    """
    Increment the number of workers contributing events.

    :param count: Number to increment the workers by.
    """
    if count <= 0:
        raise ValueError("count must be positive.")
    self._num_workers += count

reset_workers

reset_workers() -> None

Reset the number of workers to 1.

Source code in vllm/distributed/kv_events.py
def reset_workers(self) -> None:
    """
    Reset the number of workers to 1.
    """
    self._num_workers = 1

KVEventBatch

Bases: EventBatch

Source code in vllm/distributed/kv_events.py
class KVEventBatch(EventBatch):
    events: list[BlockStored | BlockRemoved | AllBlocksCleared]

events instance-attribute

NullEventPublisher

Bases: EventPublisher

No-op implementation (default when disabled).

Source code in vllm/distributed/kv_events.py
class NullEventPublisher(EventPublisher):
    """No-op implementation (default when disabled)."""

    def publish(self, events) -> None:
        return

    def shutdown(self) -> None:
        return

publish

publish(events) -> None
Source code in vllm/distributed/kv_events.py
def publish(self, events) -> None:
    return

shutdown

shutdown() -> None
Source code in vllm/distributed/kv_events.py
def shutdown(self) -> None:
    return

ZmqEventPublisher

Bases: EventPublisher

Reliable PUB/ROUTER publisher with an in-memory replay buffer.

Spawns a separate thread to handle publishing from a queue.

Parameters

endpoint: PUB address. Use tcp://*:5557 to bind or tcp://host:5557 to connect. replay_endpoint: Optional ROUTER address for replay requests. When given, subscribers can request missed batches by sending the starting sequence number as an 8-byte big-endian integer. buffer_steps: Number of past batches to keep for replay. hwm: ZeroMQ high-water-mark for PUB socket. max_queue_size: Maximum number of events to buffer in memory. topic: Topic to publish events to.

Source code in vllm/distributed/kv_events.py
class ZmqEventPublisher(EventPublisher):
    """Reliable PUB/ROUTER publisher with an in-memory replay buffer.

    Spawns a separate thread to handle publishing from a queue.

    Parameters
    ----------
    endpoint:
        PUB address. Use `tcp://*:5557` to bind or `tcp://host:5557` to
        connect.
    replay_endpoint:
        Optional ROUTER address for replay requests. When given, subscribers can
        request missed batches by sending the starting sequence number as an
        8-byte big-endian integer.
    buffer_steps:
        Number of past batches to keep for replay.
    hwm:
        ZeroMQ high-water-mark for PUB socket.
    max_queue_size:
        Maximum number of events to buffer in memory.
    topic:
        Topic to publish events to.
    """

    SHUTDOWN_TIMEOUT: float = 1.0
    END_SEQ = (-1).to_bytes(8, "big", signed=True)

    def __init__(
        self,
        data_parallel_rank: int,
        endpoint: str = "tcp://*:5557",
        replay_endpoint: str | None = None,
        buffer_steps: int = 10_000,
        hwm: int = 100_000,
        max_queue_size: int = 100_000,
        topic: str = "",
    ) -> None:
        # Storage
        super().__init__(data_parallel_rank)
        self._event_queue = Queue[EventBatch | None](maxsize=max_queue_size)
        self._buffer = deque[tuple[int, bytes]](maxlen=buffer_steps)

        # ZMQ sockets
        self._ctx = zmq.Context.instance()
        self._pub: zmq.Socket | None = None
        self._replay: zmq.Socket | None = None
        self._dp_rank = data_parallel_rank

        self._endpoint = self.offset_endpoint_port(endpoint, self._dp_rank)
        self._replay_endpoint = self.offset_endpoint_port(
            replay_endpoint, self._dp_rank
        )
        self._hwm = hwm
        self._socket_setup()

        # Payload
        self._seq_gen = count()
        self._topic_bytes = topic.encode("utf-8")

        # Thread
        self._running = True
        logger.info("Starting ZMQ publisher thread")

        self._thread = threading.Thread(
            target=self._publisher_thread, daemon=True, name="zmq-publisher"
        )
        self._thread.start()

    def publish(self, events: EventBatch) -> None:
        if not self._running:
            raise RuntimeError("Publisher is closed")
        if events.data_parallel_rank is None:
            events.data_parallel_rank = self._data_parallel_rank
        self._event_queue.put(events)

    def shutdown(self) -> None:
        """Stop the publisher thread and clean up resources."""
        self._running = False
        self._event_queue.put_nowait(None)

        start = time.time()
        pending_items = True
        while pending_items and (time.time() - start < self.SHUTDOWN_TIMEOUT):
            pending_items = not self._event_queue.empty()
            if pending_items:
                time.sleep(0.1)

        if pending_items:
            logger.warning(
                "Warning: Queue still has %s items after %s seconds timeout",
                self._event_queue.qsize(),
                self.SHUTDOWN_TIMEOUT,
            )

        if self._thread.is_alive():
            self._thread.join(timeout=self.SHUTDOWN_TIMEOUT)

        # Clean up ZMQ resources
        try:
            if self._pub is not None:
                self._pub.close(linger=0)
            if self._replay is not None:
                self._replay.close(linger=0)
        finally:
            pass  # Do not terminate context; other sockets may use it

    def _socket_setup(self) -> None:
        """Initialize sockets
        https://pyzmq.readthedocs.io/en/v19.0.0/morethanbindings.html#thread-safety
        """
        if self._pub is None:
            self._pub = self._ctx.socket(zmq.PUB)
            self._pub.set_hwm(self._hwm)
            # Heuristic: bind if wildcard / * present, else connect.
            # bind stable, connect volatile convention
            if self._endpoint is not None and (
                "*" in self._endpoint
                or "::" in self._endpoint
                or self._endpoint.startswith("ipc://")
                or self._endpoint.startswith("inproc://")
            ):
                self._pub.bind(self._endpoint)
            elif self._endpoint is not None:
                self._pub.connect(self._endpoint)

        # Set up replay socket: use ROUTER
        # 1) handles multiple REQ clients (identities)
        # 2) lets us send back one request → many replies (streamed events)
        # 3) works in our non‑blocking poll loop alongside PUB
        if self._replay_endpoint is not None:
            self._replay = self._ctx.socket(zmq.ROUTER)
            self._replay.bind(self._replay_endpoint)

    def _publisher_thread(self) -> None:
        """Background thread that processes the event queue."""
        self._pack = msgspec.msgpack.Encoder()

        assert self._pub is not None  # narrows type for mypy

        while self._running or self._event_queue.qsize() > 0:
            # --- replay (non-critical) ---------------------------------
            if self._replay is not None and self._replay.poll(0):
                try:
                    self._service_replay()
                except Exception as e:
                    logger.exception("Error in replay: %s", e)

            # --- main queue (critical) ---------------------------------
            try:
                event = self._event_queue.get(timeout=0.1)
                if event is None:
                    break  # Sentinel received, exit thread
            except queue.Empty:
                continue

            try:
                seq = next(self._seq_gen)

                payload = self._pack.encode(event)
                seq_bytes = seq.to_bytes(8, "big")
                self._pub.send_multipart((self._topic_bytes, seq_bytes, payload))

                self._buffer.append((seq, payload))
                self._event_queue.task_done()

            except Exception as e:
                # Publishing failed;  back-off a bit to avoid a tight error loop
                logger.exception("Error in publisher thread: %s", e)
                time.sleep(0.1)

    def _service_replay(self) -> None:
        """If a replay request is waiting, send buffered batches."""
        assert self._replay is not None  # narrows type for mypy

        frame = self._replay.recv_multipart()
        if len(frame) != 3:
            logger.warning("Invalid replay request: %s", frame)
            return
        client_id, _, start_seq_bytes = frame
        start_seq = int.from_bytes(start_seq_bytes, "big")

        for seq, buf in self._buffer:
            if seq >= start_seq:
                # [identity, empty_delim, seq_bytes, payload]
                # (identity, empty_delim) are stripped off by the router
                # receiving payload is (seq_bytes, payload)
                self._replay.send_multipart(
                    (client_id, b"", seq.to_bytes(8, "big"), buf)
                )
        # Send end of sequence marker
        # receiving payload is (-1, b""")
        self._replay.send_multipart((client_id, b"", self.END_SEQ, b""))

    @staticmethod
    def offset_endpoint_port(
        endpoint: str | None, data_parallel_rank: int
    ) -> str | None:
        """Helper function to offset the port in an endpoint by
            the data parallel rank.

        Args:
            endpoint: The endpoint string
                (e.g., "tcp://*:5557" or "inproc://cache")
            data_parallel_rank: The data parallel rank to offset by

        Returns:
            The endpoint with the port offset by data_parallel_rank
                or suffix appended
        """
        # Do nothing if input is None or data_parallel_rank is 0
        if not endpoint or data_parallel_rank == 0:
            return endpoint

        if "inproc" in endpoint:
            return f"{endpoint}_dp{data_parallel_rank}"
        if "tcp" in endpoint:
            if endpoint and ":" in endpoint:
                # Get everything after the last colon (the port)
                last_colon_idx = endpoint.rfind(":")
                base_addr = endpoint[:last_colon_idx]
                base_port = int(endpoint[last_colon_idx + 1 :])
                new_port = base_port + data_parallel_rank
                return f"{base_addr}:{new_port}"
            return endpoint
        raise ValueError("Invalid endpoint: must contain 'inproc' or 'tcp'")

END_SEQ class-attribute instance-attribute

END_SEQ = to_bytes(8, 'big', signed=True)

SHUTDOWN_TIMEOUT class-attribute instance-attribute

SHUTDOWN_TIMEOUT: float = 1.0

_buffer instance-attribute

_buffer = deque[tuple[int, bytes]](maxlen=buffer_steps)

_ctx instance-attribute

_ctx = instance()

_dp_rank instance-attribute

_dp_rank = data_parallel_rank

_endpoint instance-attribute

_endpoint = offset_endpoint_port(endpoint, _dp_rank)

_event_queue instance-attribute

_event_queue = Queue[EventBatch | None](
    maxsize=max_queue_size
)

_hwm instance-attribute

_hwm = hwm

_pub instance-attribute

_pub: Socket | None = None

_replay instance-attribute

_replay: Socket | None = None

_replay_endpoint instance-attribute

_replay_endpoint = offset_endpoint_port(
    replay_endpoint, _dp_rank
)

_running instance-attribute

_running = True

_seq_gen instance-attribute

_seq_gen = count()

_thread instance-attribute

_thread = Thread(
    target=_publisher_thread,
    daemon=True,
    name="zmq-publisher",
)

_topic_bytes instance-attribute

_topic_bytes = encode('utf-8')

__init__

__init__(
    data_parallel_rank: int,
    endpoint: str = "tcp://*:5557",
    replay_endpoint: str | None = None,
    buffer_steps: int = 10000,
    hwm: int = 100000,
    max_queue_size: int = 100000,
    topic: str = "",
) -> None
Source code in vllm/distributed/kv_events.py
def __init__(
    self,
    data_parallel_rank: int,
    endpoint: str = "tcp://*:5557",
    replay_endpoint: str | None = None,
    buffer_steps: int = 10_000,
    hwm: int = 100_000,
    max_queue_size: int = 100_000,
    topic: str = "",
) -> None:
    # Storage
    super().__init__(data_parallel_rank)
    self._event_queue = Queue[EventBatch | None](maxsize=max_queue_size)
    self._buffer = deque[tuple[int, bytes]](maxlen=buffer_steps)

    # ZMQ sockets
    self._ctx = zmq.Context.instance()
    self._pub: zmq.Socket | None = None
    self._replay: zmq.Socket | None = None
    self._dp_rank = data_parallel_rank

    self._endpoint = self.offset_endpoint_port(endpoint, self._dp_rank)
    self._replay_endpoint = self.offset_endpoint_port(
        replay_endpoint, self._dp_rank
    )
    self._hwm = hwm
    self._socket_setup()

    # Payload
    self._seq_gen = count()
    self._topic_bytes = topic.encode("utf-8")

    # Thread
    self._running = True
    logger.info("Starting ZMQ publisher thread")

    self._thread = threading.Thread(
        target=self._publisher_thread, daemon=True, name="zmq-publisher"
    )
    self._thread.start()

_publisher_thread

_publisher_thread() -> None

Background thread that processes the event queue.

Source code in vllm/distributed/kv_events.py
def _publisher_thread(self) -> None:
    """Background thread that processes the event queue."""
    self._pack = msgspec.msgpack.Encoder()

    assert self._pub is not None  # narrows type for mypy

    while self._running or self._event_queue.qsize() > 0:
        # --- replay (non-critical) ---------------------------------
        if self._replay is not None and self._replay.poll(0):
            try:
                self._service_replay()
            except Exception as e:
                logger.exception("Error in replay: %s", e)

        # --- main queue (critical) ---------------------------------
        try:
            event = self._event_queue.get(timeout=0.1)
            if event is None:
                break  # Sentinel received, exit thread
        except queue.Empty:
            continue

        try:
            seq = next(self._seq_gen)

            payload = self._pack.encode(event)
            seq_bytes = seq.to_bytes(8, "big")
            self._pub.send_multipart((self._topic_bytes, seq_bytes, payload))

            self._buffer.append((seq, payload))
            self._event_queue.task_done()

        except Exception as e:
            # Publishing failed;  back-off a bit to avoid a tight error loop
            logger.exception("Error in publisher thread: %s", e)
            time.sleep(0.1)

_service_replay

_service_replay() -> None

If a replay request is waiting, send buffered batches.

Source code in vllm/distributed/kv_events.py
def _service_replay(self) -> None:
    """If a replay request is waiting, send buffered batches."""
    assert self._replay is not None  # narrows type for mypy

    frame = self._replay.recv_multipart()
    if len(frame) != 3:
        logger.warning("Invalid replay request: %s", frame)
        return
    client_id, _, start_seq_bytes = frame
    start_seq = int.from_bytes(start_seq_bytes, "big")

    for seq, buf in self._buffer:
        if seq >= start_seq:
            # [identity, empty_delim, seq_bytes, payload]
            # (identity, empty_delim) are stripped off by the router
            # receiving payload is (seq_bytes, payload)
            self._replay.send_multipart(
                (client_id, b"", seq.to_bytes(8, "big"), buf)
            )
    # Send end of sequence marker
    # receiving payload is (-1, b""")
    self._replay.send_multipart((client_id, b"", self.END_SEQ, b""))

_socket_setup

_socket_setup() -> None

Initialize sockets https://pyzmq.readthedocs.io/en/v19.0.0/morethanbindings.html#thread-safety

Source code in vllm/distributed/kv_events.py
def _socket_setup(self) -> None:
    """Initialize sockets
    https://pyzmq.readthedocs.io/en/v19.0.0/morethanbindings.html#thread-safety
    """
    if self._pub is None:
        self._pub = self._ctx.socket(zmq.PUB)
        self._pub.set_hwm(self._hwm)
        # Heuristic: bind if wildcard / * present, else connect.
        # bind stable, connect volatile convention
        if self._endpoint is not None and (
            "*" in self._endpoint
            or "::" in self._endpoint
            or self._endpoint.startswith("ipc://")
            or self._endpoint.startswith("inproc://")
        ):
            self._pub.bind(self._endpoint)
        elif self._endpoint is not None:
            self._pub.connect(self._endpoint)

    # Set up replay socket: use ROUTER
    # 1) handles multiple REQ clients (identities)
    # 2) lets us send back one request → many replies (streamed events)
    # 3) works in our non‑blocking poll loop alongside PUB
    if self._replay_endpoint is not None:
        self._replay = self._ctx.socket(zmq.ROUTER)
        self._replay.bind(self._replay_endpoint)

offset_endpoint_port staticmethod

offset_endpoint_port(
    endpoint: str | None, data_parallel_rank: int
) -> str | None

Helper function to offset the port in an endpoint by the data parallel rank.

Parameters:

Name Type Description Default
endpoint str | None

The endpoint string (e.g., "tcp://*:5557" or "inproc://cache")

required
data_parallel_rank int

The data parallel rank to offset by

required

Returns:

Type Description
str | None

The endpoint with the port offset by data_parallel_rank or suffix appended

Source code in vllm/distributed/kv_events.py
@staticmethod
def offset_endpoint_port(
    endpoint: str | None, data_parallel_rank: int
) -> str | None:
    """Helper function to offset the port in an endpoint by
        the data parallel rank.

    Args:
        endpoint: The endpoint string
            (e.g., "tcp://*:5557" or "inproc://cache")
        data_parallel_rank: The data parallel rank to offset by

    Returns:
        The endpoint with the port offset by data_parallel_rank
            or suffix appended
    """
    # Do nothing if input is None or data_parallel_rank is 0
    if not endpoint or data_parallel_rank == 0:
        return endpoint

    if "inproc" in endpoint:
        return f"{endpoint}_dp{data_parallel_rank}"
    if "tcp" in endpoint:
        if endpoint and ":" in endpoint:
            # Get everything after the last colon (the port)
            last_colon_idx = endpoint.rfind(":")
            base_addr = endpoint[:last_colon_idx]
            base_port = int(endpoint[last_colon_idx + 1 :])
            new_port = base_port + data_parallel_rank
            return f"{base_addr}:{new_port}"
        return endpoint
    raise ValueError("Invalid endpoint: must contain 'inproc' or 'tcp'")

publish

publish(events: EventBatch) -> None
Source code in vllm/distributed/kv_events.py
def publish(self, events: EventBatch) -> None:
    if not self._running:
        raise RuntimeError("Publisher is closed")
    if events.data_parallel_rank is None:
        events.data_parallel_rank = self._data_parallel_rank
    self._event_queue.put(events)

shutdown

shutdown() -> None

Stop the publisher thread and clean up resources.

Source code in vllm/distributed/kv_events.py
def shutdown(self) -> None:
    """Stop the publisher thread and clean up resources."""
    self._running = False
    self._event_queue.put_nowait(None)

    start = time.time()
    pending_items = True
    while pending_items and (time.time() - start < self.SHUTDOWN_TIMEOUT):
        pending_items = not self._event_queue.empty()
        if pending_items:
            time.sleep(0.1)

    if pending_items:
        logger.warning(
            "Warning: Queue still has %s items after %s seconds timeout",
            self._event_queue.qsize(),
            self.SHUTDOWN_TIMEOUT,
        )

    if self._thread.is_alive():
        self._thread.join(timeout=self.SHUTDOWN_TIMEOUT)

    # Clean up ZMQ resources
    try:
        if self._pub is not None:
            self._pub.close(linger=0)
        if self._replay is not None:
            self._replay.close(linger=0)
    finally:
        pass  # Do not terminate context; other sockets may use it