Speed up index construction

This commit is contained in:
Marko Durkovic 2022-04-23 11:21:47 +02:00
parent d196e8b74e
commit 926813bf17

View File

@ -4,7 +4,6 @@
from __future__ import annotations from __future__ import annotations
import bisect
import heapq import heapq
import os import os
import re import re
@ -348,7 +347,7 @@ class Reader:
self.chunks: dict[int, Chunk] = {} self.chunks: dict[int, Chunk] = {}
self.current_chunk: tuple[int, BinaryIO] = (-1, BytesIO()) self.current_chunk: tuple[int, BinaryIO] = (-1, BytesIO())
def open(self) -> None: # pylint: disable=too-many-locals def open(self) -> None:
"""Open rosbag and read metadata.""" """Open rosbag and read metadata."""
try: try:
self.bio = self.path.open('rb') self.bio = self.path.open('rb')
@ -390,18 +389,15 @@ class Reader:
raise ReaderError(f'Bag index looks damaged: {err.args}') from None raise ReaderError(f'Bag index looks damaged: {err.args}') from None
self.chunks = {} self.chunks = {}
indexes: dict[int, list[list[IndexData]]] = defaultdict(list) indexes: dict[int, list[IndexData]] = defaultdict(list)
for chunk_info in self.chunk_infos: for chunk_info in self.chunk_infos:
self.bio.seek(chunk_info.pos) self.bio.seek(chunk_info.pos)
self.chunks[chunk_info.pos] = self.read_chunk() self.chunks[chunk_info.pos] = self.read_chunk()
for _ in range(len(chunk_info.connection_counts)): for _ in range(len(chunk_info.connection_counts)):
cid, index = self.read_index_data(chunk_info.pos) self.read_index_data(chunk_info.pos, indexes)
indexes[cid].append(index)
self.indexes = { self.indexes = {cid: sorted(x) for cid, x in indexes.items()}
cid: list(heapq.merge(*x, key=lambda x: x.time)) for cid, x in indexes.items()
}
assert all(self.indexes[x.id] for x in self.connections) assert all(self.indexes[x.id] for x in self.connections)
self.connections = [ self.connections = [
@ -538,14 +534,12 @@ class Reader:
decompressor, decompressor,
) )
def read_index_data(self, pos: int) -> tuple[int, list[IndexData]]: def read_index_data(self, pos: int, indexes: dict[int, list[IndexData]]) -> None:
"""Read index data from position. """Read index data from position.
Args: Args:
pos: Seek position. pos: Seek position.
indexes: Accumulated index data.
Returns:
Connection id and list of index data.
Raises: Raises:
ReaderError: Record unreadable. ReaderError: Record unreadable.
@ -562,12 +556,11 @@ class Reader:
self.bio.seek(4, os.SEEK_CUR) self.bio.seek(4, os.SEEK_CUR)
index: list[IndexData] = [] index = indexes[conn]
for _ in range(count): for _ in range(count):
time = deserialize_time(self.bio.read(8)) time = deserialize_time(self.bio.read(8))
offset = read_uint32(self.bio) offset = read_uint32(self.bio)
bisect.insort(index, IndexData(time, pos, offset)) index.append(IndexData(time, pos, offset))
return conn, index
def messages( def messages(
self, self,