From ef97081e5a96daf05d2cfd5dc268d73830fbbabd Mon Sep 17 00:00:00 2001 From: Marko Durkovic Date: Sun, 1 Aug 2021 18:00:51 +0200 Subject: [PATCH] Add CDR to ROS1 bytestream conversion --- src/rosbags/serde/__init__.py | 3 +- src/rosbags/serde/messages.py | 4 +- src/rosbags/serde/ros1.py | 147 ++++++++++++++++++++++++++++++++++ src/rosbags/serde/serdes.py | 41 ++++++++++ src/rosbags/serde/typing.py | 2 + tests/test_serde.py | 30 ++++++- 6 files changed, 224 insertions(+), 3 deletions(-) diff --git a/src/rosbags/serde/__init__.py b/src/rosbags/serde/__init__.py index 55cd0961..707afd50 100644 --- a/src/rosbags/serde/__init__.py +++ b/src/rosbags/serde/__init__.py @@ -9,10 +9,11 @@ convert directly between different serialization formats. """ from .messages import SerdeError -from .serdes import deserialize_cdr, ros1_to_cdr, serialize_cdr +from .serdes import cdr_to_ros1, deserialize_cdr, ros1_to_cdr, serialize_cdr __all__ = [ 'SerdeError', + 'cdr_to_ros1', 'deserialize_cdr', 'ros1_to_cdr', 'serialize_cdr', diff --git a/src/rosbags/serde/messages.py b/src/rosbags/serde/messages.py index eae01a59..7c535023 100644 --- a/src/rosbags/serde/messages.py +++ b/src/rosbags/serde/messages.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING from rosbags.typesys import types from .cdr import generate_deserialize_cdr, generate_getsize_cdr, generate_serialize_cdr -from .ros1 import generate_ros1_to_cdr +from .ros1 import generate_cdr_to_ros1, generate_ros1_to_cdr from .typing import Descriptor, Field, Msgdef from .utils import Valtype @@ -67,5 +67,7 @@ def get_msgdef(typename: str) -> Msgdef: generate_deserialize_cdr(fields, 'be'), generate_ros1_to_cdr(fields, typename, False), generate_ros1_to_cdr(fields, typename, True), + generate_cdr_to_ros1(fields, typename, False), + generate_cdr_to_ros1(fields, typename, True), ) return MSGDEFCACHE[typename] diff --git a/src/rosbags/serde/ros1.py b/src/rosbags/serde/ros1.py index e049adf6..5dd21967 100644 --- a/src/rosbags/serde/ros1.py +++ b/src/rosbags/serde/ros1.py @@ -168,3 +168,150 @@ def generate_ros1_to_cdr(fields: List[Field], typename: str, copy: bool) -> Call lines.append(' return ipos, opos') return getattr(compile_lines(lines), funcname) + + +def generate_cdr_to_ros1(fields: List[Field], typename: str, copy: bool) -> Callable: + """Generate CDR to ROS1 conversion function. + + Args: + fields: Fields of message. + typename: Message type name. + copy: Generate conversion or sizing function. + + Returns: + CDR to ROS1 conversion function. + + """ + # pylint: disable=too-many-branches,too-many-locals,too-many-nested-blocks,too-many-statements + aligned = 8 + icurr, inext = cast(Tuple[Iterator[Field], Iterator[Optional[Field]]], tee([*fields, None])) + next(inext) + funcname = 'cdr_to_ros1' if copy else 'getsize_cdr_to_ros1' + lines = [ + 'import sys', + 'import numpy', + 'from rosbags.serde.messages import SerdeError, get_msgdef', + 'from rosbags.serde.primitives import pack_int32_le', + 'from rosbags.serde.primitives import unpack_int32_le', + f'def {funcname}(input, ipos, output, opos):', + ] + + if typename == 'std_msgs/msg/Header': + lines.append(' opos += 4') + + for fcurr, fnext in zip(icurr, inext): + _, desc = fcurr + + if desc.valtype == Valtype.MESSAGE: + lines.append(f' func = get_msgdef("{desc.args.name}").{funcname}') + lines.append(' ipos, opos = func(input, ipos, output, opos)') + aligned = align_after(desc) + + elif desc.valtype == Valtype.BASE: + if desc.args == 'string': + lines.append(' length = unpack_int32_le(input, ipos)[0] - 1') + if copy: + lines.append(' pack_int32_le(output, opos, length)') + lines.append(' ipos += 4') + lines.append(' opos += 4') + if copy: + lines.append(' output[opos:opos + length] = input[ipos:ipos + length]') + lines.append(' ipos += length + 1') + lines.append(' opos += length') + aligned = 1 + else: + size = SIZEMAP[desc.args] + if copy: + lines.append(f' output[opos:opos + {size}] = input[ipos:ipos + {size}]') + lines.append(f' ipos += {size}') + lines.append(f' opos += {size}') + aligned = size + + elif desc.valtype == Valtype.ARRAY: + subdesc, length = desc.args + + if subdesc.valtype == Valtype.BASE: + if subdesc.args == 'string': + for _ in range(length): + lines.append(' ipos = (ipos + 4 - 1) & -4') + lines.append(' length = unpack_int32_le(input, ipos)[0] - 1') + if copy: + lines.append(' pack_int32_le(output, opos, length)') + lines.append(' ipos += 4') + lines.append(' opos += 4') + if copy: + lines.append( + ' output[opos:opos + length] = input[ipos:ipos + length]', + ) + lines.append(' ipos += length + 1') + lines.append(' opos += length') + aligned = 1 + else: + size = length * SIZEMAP[subdesc.args] + if copy: + lines.append(f' output[opos:opos + {size}] = input[ipos:ipos + {size}]') + lines.append(f' ipos += {size}') + lines.append(f' opos += {size}') + aligned = SIZEMAP[subdesc.args] + + if subdesc.valtype == Valtype.MESSAGE: + anext = align(subdesc) + anext_after = align_after(subdesc) + + lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}') + for _ in range(length): + if anext > anext_after: + lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}') + lines.append(' ipos, opos = func(input, ipos, output, opos)') + aligned = anext_after + else: + assert desc.valtype == Valtype.SEQUENCE + lines.append(' size = unpack_int32_le(input, ipos)[0]') + if copy: + lines.append(' pack_int32_le(output, opos, size)') + lines.append(' ipos += 4') + lines.append(' opos += 4') + subdesc = desc.args[0] + aligned = 4 + + if subdesc.valtype == Valtype.BASE: + if subdesc.args == 'string': + lines.append(' for _ in range(size):') + lines.append(' ipos = (ipos + 4 - 1) & -4') + lines.append(' length = unpack_int32_le(input, ipos)[0] - 1') + if copy: + lines.append(' pack_int32_le(output, opos, length)') + lines.append(' ipos += 4') + lines.append(' opos += 4') + if copy: + lines.append(' output[opos:opos + length] = input[ipos:ipos + length]') + lines.append(' ipos += length + 1') + lines.append(' opos += length') + aligned = 1 + else: + if aligned < (anext := align(subdesc)): + lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}') + lines.append(f' length = size * {SIZEMAP[subdesc.args]}') + if copy: + lines.append(' output[opos:opos + length] = input[ipos:ipos + length]') + lines.append(' ipos += length') + lines.append(' opos += length') + aligned = anext + + else: + assert subdesc.valtype == Valtype.MESSAGE + anext = align(subdesc) + lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}') + lines.append(' for _ in range(size):') + lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}') + lines.append(' ipos, opos = func(input, ipos, output, opos)') + aligned = align_after(subdesc) + + aligned = min([aligned, 4]) + + if fnext and aligned < (anext := align(fnext.descriptor)): + lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}') + aligned = anext + + lines.append(' return ipos, opos') + return getattr(compile_lines(lines), funcname) diff --git a/src/rosbags/serde/serdes.py b/src/rosbags/serde/serdes.py index 260edc08..b4977fc1 100644 --- a/src/rosbags/serde/serdes.py +++ b/src/rosbags/serde/serdes.py @@ -100,3 +100,44 @@ def ros1_to_cdr(raw: bytes, typename: str) -> memoryview: assert ipos == len(raw) assert opos + 4 == size return rawdata.toreadonly() + + +def cdr_to_ros1(raw: bytes, typename: str) -> memoryview: + """Convert serialized CDR message directly to ROS1. + + This should be reasonably fast as conversions happen on a byte-level + without going through deserialization and serialization. + + Args: + raw: CDR serialized message. + typename: Message type name. + + Returns: + ROS1 serialized message. + + """ + assert raw[1] == 1, 'Message byte order is not little endian' + + msgdef = get_msgdef(typename) + + ipos, opos = msgdef.getsize_cdr_to_ros1( + raw[4:], + 0, + None, + 0, + ) + assert ipos + 4 == len(raw) + + raw = memoryview(raw) + size = opos + rawdata = memoryview(bytearray(size)) + + ipos, opos = msgdef.cdr_to_ros1( + raw[4:], + 0, + rawdata, + 0, + ) + assert ipos + 4 == len(raw) + assert opos == size + return rawdata.toreadonly() diff --git a/src/rosbags/serde/typing.py b/src/rosbags/serde/typing.py index 3de40593..d4e8bd69 100644 --- a/src/rosbags/serde/typing.py +++ b/src/rosbags/serde/typing.py @@ -38,3 +38,5 @@ class Msgdef(NamedTuple): deserialize_cdr_be: Callable getsize_ros1_to_cdr: Callable ros1_to_cdr: Callable + getsize_cdr_to_ros1: Callable + cdr_to_ros1: Callable diff --git a/tests/test_serde.py b/tests/test_serde.py index 6353b277..a5e646ff 100644 --- a/tests/test_serde.py +++ b/tests/test_serde.py @@ -10,9 +10,10 @@ from unittest.mock import MagicMock, patch import numpy import pytest -from rosbags.serde import SerdeError, deserialize_cdr, ros1_to_cdr, serialize_cdr +from rosbags.serde import SerdeError, cdr_to_ros1, deserialize_cdr, ros1_to_cdr, serialize_cdr from rosbags.serde.messages import get_msgdef from rosbags.typesys import get_types_from_msg, register_types +from rosbags.typesys.types import builtin_interfaces__msg__Time, std_msgs__msg__Header from .cdr import deserialize, serialize @@ -380,3 +381,30 @@ def test_ros1_to_cdr(): b'\x00\x00\x00\x00\x00\x00\x00\x02' ) assert ros1_to_cdr(msg_ros, 'test_msgs/msg/dynamic_s_64') == msg_cdr + + +def test_cdr_to_ros1(): + """Test CDR to ROS1 conversion.""" + register_types(dict(get_types_from_msg(STATIC_16_64, 'test_msgs/msg/static_16_64'))) + msg_ros = (b'\x01\x00' b'\x00\x00\x00\x00\x00\x00\x00\x02') + msg_cdr = ( + b'\x00\x01\x00\x00' + b'\x01\x00' + b'\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x02' + ) + assert cdr_to_ros1(msg_cdr, 'test_msgs/msg/static_16_64') == msg_ros + + register_types(dict(get_types_from_msg(DYNAMIC_S_64, 'test_msgs/msg/dynamic_s_64'))) + msg_ros = (b'\x01\x00\x00\x00X' b'\x00\x00\x00\x00\x00\x00\x00\x02') + msg_cdr = ( + b'\x00\x01\x00\x00' + b'\x02\x00\x00\x00X\x00' + b'\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x02' + ) + assert cdr_to_ros1(msg_cdr, 'test_msgs/msg/dynamic_s_64') == msg_ros + + header = std_msgs__msg__Header(stamp=builtin_interfaces__msg__Time(42, 666), frame_id='frame') + msg_ros = cdr_to_ros1(serialize_cdr(header, 'std_msgs/msg/Header'), 'std_msgs/msg/Header') + assert msg_ros == b'\x00\x00\x00\x00*\x00\x00\x00\x9a\x02\x00\x00\x05\x00\x00\x00frame'