From 926813bf173c8b26da5e2a12e60eb4099f6be092 Mon Sep 17 00:00:00 2001 From: Marko Durkovic Date: Sat, 23 Apr 2022 11:21:47 +0200 Subject: [PATCH] Speed up index construction --- src/rosbags/rosbag1/reader.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/rosbags/rosbag1/reader.py b/src/rosbags/rosbag1/reader.py index 483e731f..657fa18e 100644 --- a/src/rosbags/rosbag1/reader.py +++ b/src/rosbags/rosbag1/reader.py @@ -4,7 +4,6 @@ from __future__ import annotations -import bisect import heapq import os import re @@ -348,7 +347,7 @@ class Reader: self.chunks: dict[int, Chunk] = {} self.current_chunk: tuple[int, BinaryIO] = (-1, BytesIO()) - def open(self) -> None: # pylint: disable=too-many-locals + def open(self) -> None: """Open rosbag and read metadata.""" try: self.bio = self.path.open('rb') @@ -390,18 +389,15 @@ class Reader: raise ReaderError(f'Bag index looks damaged: {err.args}') from None self.chunks = {} - indexes: dict[int, list[list[IndexData]]] = defaultdict(list) + indexes: dict[int, list[IndexData]] = defaultdict(list) for chunk_info in self.chunk_infos: self.bio.seek(chunk_info.pos) self.chunks[chunk_info.pos] = self.read_chunk() for _ in range(len(chunk_info.connection_counts)): - cid, index = self.read_index_data(chunk_info.pos) - indexes[cid].append(index) + self.read_index_data(chunk_info.pos, indexes) - self.indexes = { - cid: list(heapq.merge(*x, key=lambda x: x.time)) for cid, x in indexes.items() - } + self.indexes = {cid: sorted(x) for cid, x in indexes.items()} assert all(self.indexes[x.id] for x in self.connections) self.connections = [ @@ -538,14 +534,12 @@ class Reader: decompressor, ) - def read_index_data(self, pos: int) -> tuple[int, list[IndexData]]: + def read_index_data(self, pos: int, indexes: dict[int, list[IndexData]]) -> None: """Read index data from position. Args: pos: Seek position. - - Returns: - Connection id and list of index data. + indexes: Accumulated index data. Raises: ReaderError: Record unreadable. @@ -562,12 +556,11 @@ class Reader: self.bio.seek(4, os.SEEK_CUR) - index: list[IndexData] = [] + index = indexes[conn] for _ in range(count): time = deserialize_time(self.bio.read(8)) offset = read_uint32(self.bio) - bisect.insort(index, IndexData(time, pos, offset)) - return conn, index + index.append(IndexData(time, pos, offset)) def messages( self,