Add rosbag2 support

This commit is contained in:
Marko Durkovic
2021-05-02 14:49:33 +02:00
parent abd0c1fa73
commit ffacb7602c
11 changed files with 997 additions and 0 deletions
+18
View File
@@ -0,0 +1,18 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""Rosbags support for rosbag2 files.
Readers and writers provide access to metadata and raw message content saved
in the rosbag2 format.
"""
from .reader import Reader, ReaderError
from .writer import Writer, WriterError
__all__ = [
'Reader',
'ReaderError',
'Writer',
'WriterError',
]
+231
View File
@@ -0,0 +1,231 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""Rosbag2 reader."""
from __future__ import annotations
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING
import zstandard
from ruamel.yaml import YAML, YAMLError
if TYPE_CHECKING:
from types import TracebackType
from typing import Any, Generator, Iterable, List, Literal, Optional, Tuple, Type, Union
class ReaderError(Exception):
"""Reader Error."""
@contextmanager
def decompress(path: Path, do_decompress: bool):
"""Transparent rosbag2 database decompression context.
This context manager will yield a path to the decompressed file contents.
Args:
path: Potentially compressed file.
do_decompress: Flag indicating if decompression shall occur.
Yields:
Path of transparently decompressed file.
"""
if do_decompress:
decomp = zstandard.ZstdDecompressor()
with TemporaryDirectory() as tempdir:
dbfile = Path(tempdir, path.stem)
with path.open('rb') as infile, dbfile.open('wb') as outfile:
decomp.copy_stream(infile, outfile)
yield dbfile
else:
yield path
class Reader:
"""Reader for rosbag2 files.
It implements all necessary features to access metadata and message
streams.
Version history:
- Version 1: Initial format.
- Version 2: Changed field sizes in C++ implementation.
- Version 3: Added compression.
- Version 4: Added QoS metadata to topics, changed relative file paths
"""
def __init__(self, path: Union[Path, str]):
"""Open rosbag and check metadata.
Args:
path: Filesystem path to bag.
Raises:
ReaderError: Bag not readable or bag metadata.
"""
path = Path(path)
self.path = Path
self.bio = False
try:
yaml = YAML(typ='safe')
yamlpath = path / 'metadata.yaml'
dct = yaml.load(yamlpath.read_text())
except OSError as err:
raise ReaderError(f'Could not read metadata at {yamlpath}: {err}.') from None
except YAMLError as exc:
raise ReaderError(f'Could not load YAML from {yamlpath}: {exc}') from None
try:
self.metadata = dct['rosbag2_bagfile_information']
if (ver := self.metadata['version']) > 4:
raise ReaderError(f'Rosbag2 version {ver} not supported; please report issue.')
if storageid := self.metadata['storage_identifier'] != 'sqlite3':
raise ReaderError(
f'Storage plugin {storageid!r} not supported; please report issue.',
)
self.paths = [path / Path(x).name for x in self.metadata['relative_file_paths']]
missing = [x for x in self.paths if not x.exists()]
if missing:
raise ReaderError(f'Some database files are missing: {[str(x) for x in missing]!r}')
topics = [x['topic_metadata'] for x in self.metadata['topics_with_message_count']]
noncdr = {y for x in topics if (y := x['serialization_format']) != 'cdr'}
if noncdr:
raise ReaderError(f'Serialization format {noncdr!r} is not supported.')
self.topics = {x['name']: x['type'] for x in topics}
if self.compression_mode and (cfmt := self.compression_format) != 'zstd':
raise ReaderError(f'Compression format {cfmt!r} is not supported.')
except KeyError as exc:
raise ReaderError(f'A metadata key is missing {exc!r}.') from None
def open(self):
"""Open rosbag2."""
# Future storage formats will require file handles.
self.bio = True
def close(self):
"""Close rosbag2."""
# Future storage formats will require file handles.
assert self.bio
self.bio = False
@property
def duration(self) -> int:
"""Duration in nanoseconds between earliest and latest messages."""
return self.metadata['duration']['nanoseconds']
@property
def start_time(self) -> int:
"""Timestamp in nanoseconds of the earliest message."""
return self.metadata['starting_time']['nanoseconds_since_epoch']
@property
def end_time(self) -> int:
"""Timestamp in nanoseconds of the latest message."""
return self.start_time + self.duration
@property
def message_count(self) -> int:
"""Total message count."""
return self.metadata['message_count']
@property
def compression_format(self) -> Optional[str]:
"""Compression format."""
return self.metadata.get('compression_format', None) or None
@property
def compression_mode(self) -> Optional[str]:
"""Compression mode."""
mode = self.metadata.get('compression_mode', '').lower()
return mode if mode != 'none' else None
def messages( # pylint: disable=too-many-locals
self,
topics: Iterable[str] = (),
start: Optional[int] = None,
stop: Optional[int] = None,
) -> Generator[Tuple[str, str, int, bytes], None, None]:
"""Read messages from bag.
Args:
topics: Iterable with topic names to filter for. An empty iterable
yields all messages.
start: Yield only messages at or after this timestamp (ns).
stop: Yield only messages before this timestamp (ns).
Yields:
Tuples of topic name, type, timestamp (ns), and rawdata.
Raises:
ReaderError: Bag not open.
"""
if not self.bio:
raise ReaderError('Rosbag is not open.')
topics = tuple(topics)
for filepath in self.paths:
with decompress(filepath, self.compression_mode == 'file') as path:
conn = sqlite3.connect(f'file:{path}?immutable=1', uri=True)
conn.row_factory = lambda _, x: x
cur = conn.cursor()
cur.execute(
'SELECT count(*) FROM sqlite_master '
'WHERE type="table" AND name IN ("messages", "topics")',
)
if cur.fetchone()[0] != 2:
raise ReaderError(f'Cannot open database {path} or database missing tables.')
query = [
'SELECT topics.name,topics.type,messages.timestamp,messages.data',
'FROM messages JOIN topics ON messages.topic_id=topics.id',
]
args: List[Any] = []
if topics:
query.append(f'WHERE topics.name IN ({",".join("?" for _ in topics)})')
args += topics
if start is not None:
query.append(f'{"AND" if args else "WHERE"} messages.timestamp >= ?')
args.append(start)
if stop is not None:
query.append(f'{"AND" if args else "WHERE"} messages.timestamp < ?')
args.append(stop)
query.append('ORDER BY timestamp')
cur.execute(' '.join(query), args)
if self.compression_mode == 'message':
decomp = zstandard.ZstdDecompressor().decompress
for row in cur:
topic, msgtype, timestamp, data = row
yield topic, msgtype, timestamp, decomp(data)
else:
yield from cur
def __enter__(self) -> Reader:
"""Open rosbag2 when entering contextmanager."""
self.open()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> Literal[False]:
"""Close rosbag2 when exiting contextmanager."""
self.close()
return False
+247
View File
@@ -0,0 +1,247 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""Rosbag2 reader."""
from __future__ import annotations
import sqlite3
from enum import IntEnum, auto
from pathlib import Path
from typing import TYPE_CHECKING
import zstandard
from ruamel.yaml import YAML
if TYPE_CHECKING:
from types import TracebackType
from typing import Any, Dict, Literal, Optional, Type, Union
class WriterError(Exception):
"""Writer Error."""
class Writer: # pylint: disable=too-many-instance-attributes
"""Rosbag2 writer.
This class implements writing of rosbag2 files in version 4. It should be
used as a contextmanager.
"""
SQLITE_SCHEMA = """
CREATE TABLE topics(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
serialization_format TEXT NOT NULL,
offered_qos_profiles TEXT NOT NULL
);
CREATE TABLE messages(
id INTEGER PRIMARY KEY,
topic_id INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL
);
CREATE INDEX timestamp_idx ON messages (timestamp ASC);
"""
class CompressionMode(IntEnum):
"""Compession modes."""
NONE = auto()
FILE = auto()
MESSAGE = auto()
class CompressionFormat(IntEnum):
"""Compession formats."""
ZSTD = auto()
def __init__(self, path: Union[Path, str]):
"""Initialize writer.
Args:
path: Filesystem path to bag.
Raises:
WriterError: Target path exisits already, Writer can only create new rosbags.
"""
path = Path(path)
self.path = path
if path.exists():
raise WriterError(f'{path} exists already, not overwriting.')
self.metapath = path / 'metadata.yaml'
self.dbpath = path / f'{path.name}.db3'
self.compression_mode = ''
self.compression_format = ''
self.compressor: Optional[zstandard.ZstdCompressor] = None
self.topics: Dict[str, Any] = {}
self.conn = None
self.cursor: Optional[sqlite3.Cursor] = None
self.topics = {}
def set_compression(self, mode: CompressionMode, fmt: CompressionFormat):
"""Enable compression on bag.
This function has to be called before opening.
Args:
mode: Compression mode to use, either 'file' or 'message'
fmt: Compressor to use, currently only 'zstd'
Raises:
WriterError: Bag already open.
"""
if self.conn:
raise WriterError(f'Cannot set compression, bag {self.path} already open.')
if mode == self.CompressionMode.NONE:
return
self.compression_mode = mode.name.lower()
self.compression_format = fmt.name.lower()
self.compressor = zstandard.ZstdCompressor()
def open(self):
"""Open rosbag2 for writing.
Create base directory and open database connection.
"""
try:
self.path.mkdir(mode=0o755, parents=True)
except FileExistsError:
raise WriterError(f'{self.path} exists already, not overwriting.') from None
self.conn = sqlite3.connect(f'file:{self.dbpath}', uri=True)
self.conn.executescript(self.SQLITE_SCHEMA)
self.cursor = self.conn.cursor()
def add_topic(
self,
name: str,
typ: str,
serialization_format: str = 'cdr',
offered_qos_profiles: str = '',
):
"""Add a topic.
This function can only be called after opening a bag.
Args:
name: Topic name.
typ: Message type.
serialization_format: Serialization format.
offered_qos_profiles: QOS Profile.
Raises:
WriterError: Bag not open or topic previously registered.
"""
if not self.cursor:
raise WriterError('Bag was not opened.')
if name in self.topics:
raise WriterError(f'Topics can only be added once: {name!r}.')
meta = (len(self.topics) + 1, name, typ, serialization_format, offered_qos_profiles)
self.cursor.execute('INSERT INTO topics VALUES(?, ?, ?, ?, ?)', meta)
self.topics[name] = [*meta, 0]
def write(self, topic: str, timestamp: int, data: bytes):
"""Write message to rosbag2.
Args:
topic: Topic message belongs to.
timestamp: Message timestamp (ns).
data: Serialized message data.
Raises:
WriterError: Bag not open or topic not registered.
"""
if not self.cursor:
raise WriterError('Bag was not opened.')
if topic not in self.topics:
raise WriterError(f'Tried to write to unknown topic {topic!r}.')
if self.compression_mode == 'message':
assert self.compressor
data = self.compressor.compress(data)
tmeta = self.topics[topic]
self.cursor.execute(
'INSERT INTO messages (topic_id, timestamp, data) VALUES(?, ?, ?)',
(tmeta[0], timestamp, data),
)
tmeta[-1] += 1
def close(self):
"""Close rosbag2 after writing.
Closes open database transactions and writes metadata.yaml.
"""
self.cursor.close()
self.cursor = None
duration, start, count = self.conn.execute(
'SELECT max(timestamp) - min(timestamp) + 1, min(timestamp), count(*) FROM messages',
).fetchone()
self.conn.commit()
self.conn.execute('PRAGMA optimize')
self.conn.close()
if self.compression_mode == 'file':
src = self.dbpath
self.dbpath = src.with_suffix(f'.db3.{self.compression_format}')
with src.open('rb') as infile, self.dbpath.open('wb') as outfile:
self.compressor.copy_stream(infile, outfile)
src.unlink()
metadata = {
'rosbag2_bagfile_information': {
'version': 4,
'storage_identifier': 'sqlite3',
'relative_file_paths': [self.dbpath.name],
'duration': {
'nanoseconds': duration,
},
'starting_time': {
'nanoseconds_since_epoch': start,
},
'message_count': count,
'topics_with_message_count': [
{
'topic_metadata': {
'name': x[1],
'type': x[2],
'serialization_format': x[3],
'offered_qos_profiles': x[4],
},
'message_count': x[5],
} for x in self.topics.values()
],
'compression_format': self.compression_format,
'compression_mode': self.compression_mode,
},
}
with self.metapath.open('w') as metafile:
yaml = YAML(typ='safe')
yaml.default_flow_style = False
yaml.dump(metadata, metafile)
def __enter__(self) -> Writer:
"""Open rosbag2 when entering contextmanager."""
self.open()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> Literal[False]:
"""Close rosbag2 when exiting contextmanager."""
self.close()
return False