Add support for rosbag version 6 metadata

This commit is contained in:
Marko Durkovic 2022-07-27 16:14:08 +02:00
parent ff24d7e424
commit 5257497a6a
5 changed files with 46 additions and 5 deletions

View File

@ -57,3 +57,4 @@ class Metadata(TypedDict):
compression_mode: str compression_mode: str
topics_with_message_count: list[TopicWithMessageCount] topics_with_message_count: list[TopicWithMessageCount]
files: list[FileInformation] files: list[FileInformation]
custom_data: dict[str, str]

View File

@ -65,6 +65,7 @@ class Reader:
- Version 3: Added compression. - Version 3: Added compression.
- Version 4: Added QoS metadata to topics, changed relative file paths - Version 4: Added QoS metadata to topics, changed relative file paths
- Version 5: Added per file metadata - Version 5: Added per file metadata
- Version 6: Added custom_data dict to metadata
""" """
@ -92,7 +93,7 @@ class Reader:
try: try:
self.metadata: Metadata = dct['rosbag2_bagfile_information'] self.metadata: Metadata = dct['rosbag2_bagfile_information']
if (ver := self.metadata['version']) > 5: if (ver := self.metadata['version']) > 6:
raise ReaderError(f'Rosbag2 version {ver} not supported; please report issue.') raise ReaderError(f'Rosbag2 version {ver} not supported; please report issue.')
if storageid := self.metadata['storage_identifier'] != 'sqlite3': if storageid := self.metadata['storage_identifier'] != 'sqlite3':
raise ReaderError( raise ReaderError(
@ -129,6 +130,7 @@ class Reader:
raise ReaderError(f'Compression format {cfmt!r} is not supported.') raise ReaderError(f'Compression format {cfmt!r} is not supported.')
self.files: list[FileInformation] = self.metadata.get('files', [])[:] self.files: list[FileInformation] = self.metadata.get('files', [])[:]
self.custom_data: dict[str, str] = self.metadata.get('custom_data', {})
except KeyError as exc: except KeyError as exc:
raise ReaderError(f'A metadata key is missing {exc!r}.') from None raise ReaderError(f'A metadata key is missing {exc!r}.') from None

View File

@ -85,6 +85,7 @@ class Writer: # pylint: disable=too-many-instance-attributes
self.counts: dict[int, int] = {} self.counts: dict[int, int] = {}
self.conn: Optional[sqlite3.Connection] = None self.conn: Optional[sqlite3.Connection] = None
self.cursor: Optional[sqlite3.Cursor] = None self.cursor: Optional[sqlite3.Cursor] = None
self.custom_data: dict[str, str] = {}
def set_compression(self, mode: CompressionMode, fmt: CompressionFormat) -> None: def set_compression(self, mode: CompressionMode, fmt: CompressionFormat) -> None:
"""Enable compression on bag. """Enable compression on bag.
@ -92,8 +93,8 @@ class Writer: # pylint: disable=too-many-instance-attributes
This function has to be called before opening. This function has to be called before opening.
Args: Args:
mode: Compression mode to use, either 'file' or 'message' mode: Compression mode to use, either 'file' or 'message'.
fmt: Compressor to use, currently only 'zstd' fmt: Compressor to use, currently only 'zstd'.
Raises: Raises:
WriterError: Bag already open. WriterError: Bag already open.
@ -107,6 +108,21 @@ class Writer: # pylint: disable=too-many-instance-attributes
self.compression_format = fmt.name.lower() self.compression_format = fmt.name.lower()
self.compressor = zstandard.ZstdCompressor() self.compressor = zstandard.ZstdCompressor()
def set_custom_data(self, key: str, value: str) -> None:
"""Set key value pair in custom_data.
Args:
key: Key to set.
value: Value to set.
Raises:
WriterError: If value has incorrect type.
"""
if not isinstance(value, str):
raise WriterError(f'Cannot set non-string value {value!r} in custom_data.')
self.custom_data[key] = value
def open(self) -> None: def open(self) -> None:
"""Open rosbag2 for writing. """Open rosbag2 for writing.
@ -233,7 +249,7 @@ class Writer: # pylint: disable=too-many-instance-attributes
metadata: dict[str, Metadata] = { metadata: dict[str, Metadata] = {
'rosbag2_bagfile_information': { 'rosbag2_bagfile_information': {
'version': 5, 'version': 6,
'storage_identifier': 'sqlite3', 'storage_identifier': 'sqlite3',
'relative_file_paths': [self.dbpath.name], 'relative_file_paths': [self.dbpath.name],
'duration': { 'duration': {
@ -268,6 +284,7 @@ class Writer: # pylint: disable=too-many-instance-attributes
'message_count': count, 'message_count': count,
}, },
], ],
'custom_data': self.custom_data,
}, },
} }
with self.metapath.open('w') as metafile: with self.metapath.open('w') as metafile:

View File

@ -57,7 +57,7 @@ rosbag2_bagfile_information:
METADATA_EMPTY = """ METADATA_EMPTY = """
rosbag2_bagfile_information: rosbag2_bagfile_information:
version: 4 version: 6
storage_identifier: sqlite3 storage_identifier: sqlite3
relative_file_paths: relative_file_paths:
- db.db3 - db.db3
@ -69,6 +69,16 @@ rosbag2_bagfile_information:
topics_with_message_count: [] topics_with_message_count: []
compression_format: "" compression_format: ""
compression_mode: "" compression_mode: ""
files:
- duration:
nanoseconds: 0
message_count: 0
path: db.db3
starting_time:
nanoseconds_since_epoch: 0
custom_data:
key1: value1
key2: value2
""" """
@ -146,6 +156,8 @@ def test_empty_bag(tmp_path: Path) -> None:
assert reader.end_time == 0 assert reader.end_time == 0
assert reader.duration == 0 assert reader.duration == 0
assert not list(reader.messages()) assert not list(reader.messages())
assert reader.custom_data['key1'] == 'value1'
assert reader.custom_data['key2'] == 'value2'
def test_reader(bag: Path) -> None: def test_reader(bag: Path) -> None:

View File

@ -59,6 +59,15 @@ def test_writer(tmp_path: Path) -> None:
assert (path / 'compress_message.db3').exists() assert (path / 'compress_message.db3').exists()
assert size > (path / 'compress_message.db3').stat().st_size assert size > (path / 'compress_message.db3').stat().st_size
path = (tmp_path / 'with_custom_data')
bag = Writer(path)
bag.open()
bag.set_custom_data('key1', 'value1')
with pytest.raises(WriterError, match='non-string value'):
bag.set_custom_data('key1', 42) # type: ignore
bag.close()
assert b'key1: value1' in (path / 'metadata.yaml').read_bytes()
def test_failure_cases(tmp_path: Path) -> None: def test_failure_cases(tmp_path: Path) -> None:
"""Test writer failure cases.""" """Test writer failure cases."""