Add rosbag conversion tools

This commit is contained in:
Marko Durkovic
2021-05-02 14:51:08 +02:00
parent 4de0c99274
commit 5d2d394110
10 changed files with 291 additions and 0 deletions
+16
View File
@@ -0,0 +1,16 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""Rosbags file format conversion.
Conversion function transforms files from rosbag1 format to the latest rosbag2
format. It automatically matches ROS1 message types to their ROS2 counterparts
and adds custom types not present in the type system.
"""
from .converter import ConverterError, convert
__all__ = [
'ConverterError',
'convert',
]
+62
View File
@@ -0,0 +1,62 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""CLI tool for rosbag conversion."""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from typing import TYPE_CHECKING
from .converter import ConverterError, convert
if TYPE_CHECKING:
from typing import Callable
def pathtype(exists: bool = True) -> Callable:
"""Path argument for argparse.
Args:
exists: Path should exists in filesystem.
Returns:
Argparse type function.
"""
def topath(pathname: str) -> Path:
path = Path(pathname)
if exists != path.exists():
raise argparse.ArgumentTypeError(
f'{path} should {"exist" if exists else "not exist"}.',
)
return path
return topath
def main() -> None:
"""Parse cli arguments and run conversion."""
parser = argparse.ArgumentParser(description='Convert rosbag1 to rosbag2.')
parser.add_argument(
'src',
type=pathtype(),
help='source path to read rosbag1 from',
)
parser.add_argument(
'--dst',
type=pathtype(exists=False),
help='destination path for rosbag2',
)
args = parser.parse_args()
try:
convert(args.src, args.dst)
except ConverterError as err:
print(f'ERROR: {err}') # noqa: T001
sys.exit(1)
if __name__ == '__main__':
main()
+55
View File
@@ -0,0 +1,55 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""Rosbag1 to Rosbag2 Converter."""
from __future__ import annotations
from typing import TYPE_CHECKING
from rosbags.rosbag1 import Reader, ReaderError
from rosbags.rosbag2 import Writer, WriterError
from rosbags.serde import ros1_to_cdr
from rosbags.typesys import get_types_from_msg, register_types
if TYPE_CHECKING:
from pathlib import Path
from typing import Any, Dict, Optional
class ConverterError(Exception):
"""Converter Error."""
def convert(src: Path, dst: Optional[Path]) -> None:
"""Convert Rosbag1 to Rosbag2.
Args:
src: Rosbag1 path.
dst: Rosbag2 path.
Raises:
ConverterError: An error occured during reading, writing, or
converting.
"""
dst = dst if dst else src.with_suffix('')
if dst.exists():
raise ConverterError(f'Output path {str(dst)!r} exists already.')
try:
with Reader(src) as reader, Writer(dst) as writer:
typs: Dict[str, Any] = {}
for name, topic in reader.topics.items():
writer.add_topic(name, topic.msgtype)
typs.update(get_types_from_msg(topic.msgdef, topic.msgtype))
register_types(typs)
for topic, msgtype, timestamp, data in reader.messages():
data = ros1_to_cdr(data, msgtype)
writer.write(topic, timestamp, data)
except ReaderError as err:
raise ConverterError(f'Reading source bag: {err}') from err
except WriterError as err:
raise ConverterError(f'Writing destination bag: {err}') from err
except Exception as err: # pylint: disable=broad-except
raise ConverterError(f'Converting rosbag: {err!r}') from err