Speed up reading of index data records

This commit is contained in:
Marko Durkovic 2022-04-24 10:11:31 +02:00
parent 926813bf17
commit c2e676a01f

View File

@ -24,7 +24,20 @@ from rosbags.typesys.msg import normalize_msgtype
if TYPE_CHECKING:
from types import TracebackType
from typing import BinaryIO, Callable, Generator, Iterable, Literal, Optional, Type, Union
from typing import (
BinaryIO,
Callable,
Generator,
Iterable,
Literal,
Optional,
Tuple,
Type,
Union,
)
Unpack = Callable[[bytes], Tuple[int]]
UnpackFrom = Callable[[bytes, int], Tuple[int]]
class ReaderError(Exception):
@ -103,9 +116,9 @@ class IndexData(NamedTuple):
return NotImplemented # pragma: no cover
deserialize_uint8: Callable[[bytes], tuple[int]] = struct.Struct('<B').unpack # type: ignore
deserialize_uint32: Callable[[bytes], tuple[int]] = struct.Struct('<L').unpack # type: ignore
deserialize_uint64: Callable[[bytes], tuple[int]] = struct.Struct('<Q').unpack # type: ignore
deserialize_uint8: Unpack = struct.Struct('<B').unpack # type: ignore
deserialize_uint32: UnpackFrom = struct.Struct('<L').unpack_from # type: ignore
deserialize_uint64: Unpack = struct.Struct('<Q').unpack # type: ignore
def deserialize_time(val: bytes) -> int:
@ -158,7 +171,7 @@ class Header(Dict[str, Any]):
"""
try:
return deserialize_uint32(self[name])[0]
return deserialize_uint32(self[name], 0)[0]
except (KeyError, struct.error) as err:
raise ReaderError(f'Could not read uint32 field {name!r}.') from err
@ -243,7 +256,7 @@ class Header(Dict[str, Any]):
length = len(binary)
while pos < length:
try:
size = deserialize_uint32(binary[pos:pos + 4])[0]
size = deserialize_uint32(binary, pos)[0]
except struct.error as err:
raise ReaderError('Header field size could not be read.') from err
pos += 4
@ -280,7 +293,7 @@ def read_uint32(src: BinaryIO) -> int:
"""
try:
return deserialize_uint32(src.read(4))[0]
return deserialize_uint32(src.read(4), 0)[0]
except struct.error as err:
raise ReaderError('Could not read uint32.') from err
@ -326,6 +339,8 @@ class Reader:
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, path: Union[str, Path]):
"""Initialize.
@ -343,6 +358,7 @@ class Reader:
self.bio: Optional[BinaryIO] = None
self.connections: list[Connection] = []
self.indexes: dict[int, list[IndexData]]
self.index_data_header_offsets: Optional[tuple[int, int]] = None
self.chunk_infos: list[ChunkInfo] = []
self.chunks: dict[int, Chunk] = {}
self.current_chunk: tuple[int, BinaryIO] = (-1, BytesIO())
@ -537,6 +553,9 @@ class Reader:
def read_index_data(self, pos: int, indexes: dict[int, list[IndexData]]) -> None:
"""Read index data from position.
The implementation purposely avoids the generic Header class and
its costly string processing.
Args:
pos: Seek position.
indexes: Accumulated index data.
@ -546,20 +565,45 @@ class Reader:
"""
assert self.bio
header = Header.read(self.bio, RecordType.IDXDATA)
ver = header.get_uint32('ver')
if ver != 1:
raise ReaderError(f'IDXDATA version {ver} is not supported.')
conn = header.get_uint32('conn')
count = header.get_uint32('count')
buf = self.bio.read(55)
if not self.index_data_header_offsets:
size, = deserialize_uint32(buf, 0)
assert size == 47
idx = 4
connpos = -1
countpos = -1
while idx < size:
char = buf[idx + 6]
if char == 61: # ord(b'=')
assert buf[idx + 7] == 4
idx += 8
elif char == 114: # ord(b'r')
if (ver := buf[idx + 8]) != 1:
raise ReaderError(f'IDXDATA version {ver} is not supported.')
idx += 12
elif char == 110: # ord(b'n')
connpos = idx + 9
idx += 13
else:
assert char == 117 # ord(b'u')
countpos = idx + 10
idx += 14
self.index_data_header_offsets = (connpos, countpos)
connpos, countpos = self.index_data_header_offsets
self.bio.seek(4, os.SEEK_CUR)
conn, = deserialize_uint32(buf, connpos)
count, = deserialize_uint32(buf, countpos)
size, = deserialize_uint32(buf, 51)
assert size == count * 12
index = indexes[conn]
for _ in range(count):
time = deserialize_time(self.bio.read(8))
offset = read_uint32(self.bio)
buf = self.bio.read(size)
idx = 0
while idx < size:
time = deserialize_uint32(buf, idx)[0] * 10**9 + deserialize_uint32(buf, idx + 4)[0]
offset, = deserialize_uint32(buf, idx + 8)
idx += 12
index.append(IndexData(time, pos, offset))
def messages(