Skip to content

Commit

Permalink
fix(python): use deque for insert-order message queue (#1316)
Browse files Browse the repository at this point in the history
### Changelog
<!-- Write a one-sentence summary of the user-impacting change (API,
UI/UX, performance, etc) that could appear in a changelog. Write "None"
if there is no user-facing change -->

Significantly increases the performance of iterating through an MCAP
file in insertion order.

### Docs

<!-- Link to a Docs PR, tracking ticket in Linear, OR write "None" if no
documentation changes are needed. -->

None. (internal only)

### Description

<!-- Describe the problem, what has changed, and motivation behind those
changes. Pretend you are advocating for this change and the reader is
skeptical. -->

I used `iter_messages(log_time_order=False)` since my application
doesn't care about message order, expecting it to perform better than
`log_time_order=True` (the default). However I noticed that my
application took _17 minutes_ to iterate through 3.5 million MCAP
messages. It took 32 seconds with `log_time_order=True`.

The previous message queue implementation used a `list` as a FIFO queue,
which has poor performance as `pop(0)` will copy the list every time an
item is evicted. After changing the implementation to use a `deque`,
`log_time_order=False` took 15 seconds.

<!-- In addition to unit tests, describe any manual testing you did to
validate this change. -->

Here's code for a minimal performance test. Note that the mcap file has
~3.5 million messages.

```python
import time
from collections import deque
from mcap.reader import make_reader

with open("example.mcap", "rb") as f:
    reader = make_reader(f)
    start = time.time()
    deque(reader.iter_messages(log_time_order=False), maxlen=0)
    end = time.time()
    print(end - start)
```

Note that `deque(iterable, maxlen=0)` will consume the iterable in C, so
basically as fast as possible (see the `itertools`
[recipe](https://docs.python.org/3/library/itertools.html#itertools-recipes)
for `consume()`).

I ran this three times after the fix, but just once before because I
didn't have the patience 😄

<table><tr><th>Before</th><th>After</th></tr><tr><td>
1125.6354639530182
</td><td>
11.093337059020996</br>
10.840037822723389</br>
10.490556001663208
</td></tr></table>

Over 100x faster! If you're curious, `log_time_order=True` was ~17
seconds (both before and after the changeset).

<!-- If necessary, link relevant Linear or Github issues. Use `Fixes:
foxglove/repo#1234` to auto-close the Github issue or Fixes: FG-### for
Linear isses. -->

I also updated the code to have two separate impls rather than a
bifurcated implementation.

I added a unit test to assert that `log_time_order=False` is faster than
`True` (purposely adding messages in reverse log time order so that the
heap should perform worse). As with most perf unit tests, it is
potentially flakey with smaller inputs so I put 10k messages in it, but
feel free to discuss if there's a different way you want to run through
that.
  • Loading branch information
alkasm authored Jan 21, 2025
1 parent b1380d3 commit babc294
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 28 deletions.
78 changes: 57 additions & 21 deletions python/mcap/mcap/_message_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import heapq
from typing import List, Optional, Tuple, Union
from abc import ABC, abstractmethod
from collections import deque
from typing import Deque, List, Optional, Tuple, Union

from .records import Channel, ChunkIndex, Message, Schema

Expand Down Expand Up @@ -65,34 +67,68 @@ def position(self) -> Tuple[int, Optional[int]]:
return (self.item[1], self.item[2])


class MessageQueue:
"""A queue of MCAP messages and chunk indices.
def _make_orderable(item: QueueItem, reverse: bool) -> _Orderable:
if isinstance(item, ChunkIndex):
return _ChunkIndexWrapper(item, reverse)
return _MessageTupleWrapper(item, reverse)


class _MessageQueue(ABC):
@abstractmethod
def push(self, item: QueueItem):
raise NotImplementedError()

@abstractmethod
def pop(self) -> QueueItem:
raise NotImplementedError()

@abstractmethod
def __len__(self) -> int:
raise NotImplementedError()

:param log_time_order: if True, this queue acts as a priority queue, ordered by log time.
if False, ``pop()`` returns elements in insert order.
:param reverse: if True, order elements in descending log time order rather than ascending.
"""

def __init__(self, log_time_order: bool, reverse: bool = False):
class LogTimeOrderQueue(_MessageQueue):
def __init__(self, reverse: bool = False):
self._q: List[_Orderable] = []
self._log_time_order = log_time_order
self._reverse = reverse

def push(self, item: QueueItem):
if isinstance(item, ChunkIndex):
orderable = _ChunkIndexWrapper(item, self._reverse)
else:
orderable = _MessageTupleWrapper(item, self._reverse)
if self._log_time_order:
heapq.heappush(self._q, orderable)
else:
self._q.append(orderable)
orderable = _make_orderable(item, self._reverse)
heapq.heappush(self._q, orderable)

def pop(self) -> QueueItem:
if self._log_time_order:
return heapq.heappop(self._q).item
else:
return self._q.pop(0).item
return heapq.heappop(self._q).item

def __len__(self) -> int:
return len(self._q)


class InsertOrderQueue(_MessageQueue):
def __init__(self):
self._q: Deque[QueueItem] = deque()

def push(self, item: QueueItem):
self._q.append(item)

def pop(self) -> QueueItem:
return self._q.popleft() # cspell:disable-line

def __len__(self) -> int:
return len(self._q)


def make_message_queue(
log_time_order: bool = True, reverse: bool = False
) -> _MessageQueue:
"""Create a queue of MCAP messages and chunk indices.
:param log_time_order: if True, this queue acts as a priority queue, ordered by log time.
if False, ``pop()`` returns elements in insert order.
:param reverse: if True, order elements in descending log time order rather than ascending.
only valid if ``log_time_order`` is True, otherwise throws a ValueError.
"""
if log_time_order:
return LogTimeOrderQueue(reverse)
if reverse:
raise ValueError("reverse is only valid with log_time_order=True")
return InsertOrderQueue()
6 changes: 4 additions & 2 deletions python/mcap/mcap/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Tuple,
)

from ._message_queue import MessageQueue
from ._message_queue import make_message_queue
from .data_stream import ReadDataStream, RecordBuilder
from .decoder import DecoderFactory
from .exceptions import DecoderNotFoundError, McapError
Expand Down Expand Up @@ -292,7 +292,9 @@ def iter_messages(
)
return

message_queue = MessageQueue(log_time_order=log_time_order, reverse=reverse)
message_queue = make_message_queue(
log_time_order=log_time_order, reverse=reverse
)
for chunk_index in _chunks_matching_topics(
summary, topics, start_time, end_time
):
Expand Down
34 changes: 29 additions & 5 deletions python/mcap/tests/test_message_queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
from typing import List

from mcap._message_queue import MessageQueue, QueueItem
from mcap._message_queue import QueueItem, _MessageQueue, make_message_queue
from mcap.records import Channel, ChunkIndex, Message, Schema


Expand Down Expand Up @@ -49,7 +50,7 @@ def dummy_message_tuple(
)


def push_elements(mq: MessageQueue):
def push_elements(mq: _MessageQueue):
mq.push(dummy_chunk_index(3, 6, 100))
mq.push(dummy_chunk_index(1, 2, 400))
mq.push(dummy_chunk_index(4, 5, 500))
Expand All @@ -58,8 +59,13 @@ def push_elements(mq: MessageQueue):
mq.push(dummy_message_tuple(5, 200, 30))


def push_messages_reverse_order(mq: _MessageQueue, n: int = 10_000):
for i in range(n):
mq.push(dummy_message_tuple(n - i, 0, i))


def test_chunk_message_ordering():
mq = MessageQueue(log_time_order=True)
mq = make_message_queue(log_time_order=True)
push_elements(mq)

results: List[QueueItem] = []
Expand All @@ -81,7 +87,7 @@ def test_chunk_message_ordering():


def test_reverse_ordering():
mq = MessageQueue(log_time_order=True, reverse=True)
mq = make_message_queue(log_time_order=True, reverse=True)
push_elements(mq)

results: List[QueueItem] = []
Expand All @@ -103,7 +109,7 @@ def test_reverse_ordering():


def test_insert_ordering():
mq = MessageQueue(log_time_order=False)
mq = make_message_queue(log_time_order=False)
push_elements(mq)

results: List[QueueItem] = []
Expand All @@ -122,3 +128,21 @@ def test_insert_ordering():
assert results[4][2] == 20
assert isinstance(results[5], tuple)
assert results[5][2] == 30


def test_insert_order_is_faster():
log_time_order_mq = make_message_queue(log_time_order=True)
push_messages_reverse_order(log_time_order_mq)
log_time_start = time.time()
while log_time_order_mq:
log_time_order_mq.pop()
log_time_end = time.time()

insert_order_mq = make_message_queue(log_time_order=False)
push_messages_reverse_order(insert_order_mq)
insert_start = time.time()
while insert_order_mq:
insert_order_mq.pop()
insert_end = time.time()

assert insert_end - insert_start < log_time_end - log_time_start

0 comments on commit babc294

Please sign in to comment.