diff --git a/docs/examples/save_images_rosbag1.py b/docs/examples/save_images_rosbag1.py index 3390407c..4789edd1 100644 --- a/docs/examples/save_images_rosbag1.py +++ b/docs/examples/save_images_rosbag1.py @@ -3,7 +3,7 @@ import numpy from rosbags.rosbag1 import Writer -from rosbags.serde import cdr_to_ros1, serialize_cdr +from rosbags.serde import serialize_ros1 from rosbags.typesys.types import builtin_interfaces__msg__Time as Time from rosbags.typesys.types import sensor_msgs__msg__CompressedImage as CompressedImage from rosbags.typesys.types import std_msgs__msg__Header as Header @@ -39,8 +39,5 @@ def save_images() -> None: writer.write( conn, timestamp, - cdr_to_ros1( - serialize_cdr(message, CompressedImage.__msgtype__), - CompressedImage.__msgtype__, - ), + serialize_ros1(message, CompressedImage.__msgtype__), ) diff --git a/docs/topics/serde.rst b/docs/topics/serde.rst index 69b95ebd..f476aa09 100644 --- a/docs/topics/serde.rst +++ b/docs/topics/serde.rst @@ -15,6 +15,16 @@ Deserialize a CDR bytes object using :py:func:`deserialize_cdr() `: + +.. code-block:: python + + from rosbags.serde import deserialize_ros1 + + # rawdata is of type bytes and contains serialized message + msg = deserialize_ros1(rawdata, 'geometry_msgs/msg/Quaternion') + + Serialization --------------- @@ -29,3 +39,11 @@ Serialize a message with CDR using :py:func:`serialize_cdr() `: + +.. code-block:: python + + from rosbags.serde import serialize_ros1 + + serialized = serialize_ros1(msg, 'geometry_msgs/msg/Quaternion') diff --git a/src/rosbags/highlevel/anyreader.py b/src/rosbags/highlevel/anyreader.py index ba265b56..679afbd5 100644 --- a/src/rosbags/highlevel/anyreader.py +++ b/src/rosbags/highlevel/anyreader.py @@ -15,7 +15,7 @@ from rosbags.rosbag1 import Reader as Reader1 from rosbags.rosbag1 import ReaderError as ReaderError1 from rosbags.rosbag2 import Reader as Reader2 from rosbags.rosbag2 import ReaderError as ReaderError2 -from rosbags.serde import deserialize_cdr, ros1_to_cdr +from rosbags.serde import deserialize_cdr, deserialize_ros1 from rosbags.typesys import get_types_from_msg, register_types, types if TYPE_CHECKING: @@ -101,7 +101,7 @@ class AnyReader: def _deser_ros1(self, rawdata: bytes, typ: str) -> object: """Deserialize ROS1 message.""" - return deserialize_cdr(ros1_to_cdr(rawdata, typ, self.typestore), typ, self.typestore) + return deserialize_ros1(rawdata, typ, self.typestore) def _deser_ros2(self, rawdata: bytes, typ: str) -> object: """Deserialize CDR message.""" diff --git a/src/rosbags/serde/__init__.py b/src/rosbags/serde/__init__.py index ef56d345..1210ae6e 100644 --- a/src/rosbags/serde/__init__.py +++ b/src/rosbags/serde/__init__.py @@ -9,12 +9,21 @@ convert directly between different serialization formats. """ from .messages import SerdeError -from .serdes import cdr_to_ros1, deserialize_cdr, ros1_to_cdr, serialize_cdr +from .serdes import ( + cdr_to_ros1, + deserialize_cdr, + deserialize_ros1, + ros1_to_cdr, + serialize_cdr, + serialize_ros1, +) __all__ = [ 'SerdeError', 'cdr_to_ros1', 'deserialize_cdr', + 'deserialize_ros1', 'ros1_to_cdr', 'serialize_cdr', + 'serialize_ros1', ] diff --git a/src/rosbags/serde/messages.py b/src/rosbags/serde/messages.py index c4f52097..b31691be 100644 --- a/src/rosbags/serde/messages.py +++ b/src/rosbags/serde/messages.py @@ -7,7 +7,13 @@ from __future__ import annotations from typing import TYPE_CHECKING from .cdr import generate_deserialize_cdr, generate_getsize_cdr, generate_serialize_cdr -from .ros1 import generate_cdr_to_ros1, generate_ros1_to_cdr +from .ros1 import ( + generate_cdr_to_ros1, + generate_deserialize_ros1, + generate_getsize_ros1, + generate_ros1_to_cdr, + generate_serialize_ros1, +) from .typing import Descriptor, Field, Msgdef from .utils import Valtype @@ -62,6 +68,7 @@ def get_msgdef(typename: str, typestore: Typestore) -> Msgdef: fields = [Field(name, fixup(desc)) for name, desc in entries] getsize_cdr, size_cdr = generate_getsize_cdr(fields) + getsize_ros1, size_ros1 = generate_getsize_ros1(fields, typename) cache[typename] = Msgdef( typename, @@ -73,6 +80,10 @@ def get_msgdef(typename: str, typestore: Typestore) -> Msgdef: generate_serialize_cdr(fields, 'be'), generate_deserialize_cdr(fields, 'le'), generate_deserialize_cdr(fields, 'be'), + size_ros1, + getsize_ros1, + generate_serialize_ros1(fields, typename), + generate_deserialize_ros1(fields, typename), generate_ros1_to_cdr(fields, typename, False), # type: ignore generate_ros1_to_cdr(fields, typename, True), # type: ignore generate_cdr_to_ros1(fields, typename, False), # type: ignore diff --git a/src/rosbags/serde/ros1.py b/src/rosbags/serde/ros1.py index 38322e4c..d9e4eaa0 100644 --- a/src/rosbags/serde/ros1.py +++ b/src/rosbags/serde/ros1.py @@ -11,6 +11,7 @@ conversion of ROS1 to CDR. from __future__ import annotations +import sys from itertools import tee from typing import TYPE_CHECKING, Iterator, cast @@ -20,7 +21,7 @@ from .utils import SIZEMAP, Valtype, align, align_after, compile_lines if TYPE_CHECKING: from typing import Union - from .typing import Bitcvt, BitcvtSize + from .typing import Bitcvt, BitcvtSize, CDRDeser, CDRSer, CDRSerSize def generate_ros1_to_cdr( @@ -331,3 +332,349 @@ def generate_cdr_to_ros1( lines.append(' return ipos, opos') return getattr(compile_lines(lines), funcname) # type: ignore + + +def generate_getsize_ros1(fields: list[Field], typename: str) -> tuple[CDRSerSize, int]: + """Generate ros1 size calculation function. + + Args: + fields: Fields of message. + typename: Message type name. + + Returns: + Size calculation function and static size. + + """ + # pylint: disable=too-many-branches,too-many-statements + size = 0 + is_stat = True + + lines = [ + 'import sys', + 'from rosbags.serde.messages import get_msgdef', + 'def getsize_ros1(pos, message, typestore):', + ] + + if typename == 'std_msgs/msg/Header': + lines.append(' pos += 4') + + for fcurr in fields: + fieldname, desc = fcurr + + if desc.valtype == Valtype.MESSAGE: + if desc.args.size_ros1: + lines.append(f' pos += {desc.args.size_ros1}') + size += desc.args.size_ros1 + else: + lines.append(f' func = get_msgdef("{desc.args.name}", typestore).getsize_ros1') + lines.append(f' pos = func(pos, message.{fieldname}, typestore)') + is_stat = False + + elif desc.valtype == Valtype.BASE: + if desc.args == 'string': + lines.append(f' pos += 4 + len(message.{fieldname}.encode())') + is_stat = False + else: + lines.append(f' pos += {SIZEMAP[desc.args]}') + size += SIZEMAP[desc.args] + + elif desc.valtype == Valtype.ARRAY: + subdesc, length = desc.args + + if subdesc.valtype == Valtype.BASE: + if subdesc.args == 'string': + lines.append(f' val = message.{fieldname}') + for idx in range(length): + lines.append(f' pos += 4 + len(val[{idx}].encode())') + is_stat = False + else: + lines.append(f' pos += {length * SIZEMAP[subdesc.args]}') + size += length * SIZEMAP[subdesc.args] + + else: + assert subdesc.valtype == Valtype.MESSAGE + if subdesc.args.size_ros1: + for _ in range(length): + lines.append(f' pos += {subdesc.args.size_ros1}') + size += subdesc.args.size_ros1 + else: + lines.append( + f' func = get_msgdef("{subdesc.args.name}", typestore).getsize_ros1', + ) + lines.append(f' val = message.{fieldname}') + for idx in range(length): + lines.append(f' pos = func(pos, val[{idx}], typestore)') + is_stat = False + else: + assert desc.valtype == Valtype.SEQUENCE + lines.append(' pos += 4') + subdesc = desc.args[0] + if subdesc.valtype == Valtype.BASE: + if subdesc.args == 'string': + lines.append(f' for val in message.{fieldname}:') + lines.append(' pos += 4 + len(val.encode())') + else: + lines.append(f' pos += len(message.{fieldname}) * {SIZEMAP[subdesc.args]}') + + else: + assert subdesc.valtype == Valtype.MESSAGE + lines.append(f' val = message.{fieldname}') + if subdesc.args.size_ros1: + lines.append(f' pos += {subdesc.args.size_ros1} * len(val)') + + else: + lines.append( + f' func = get_msgdef("{subdesc.args.name}", typestore).getsize_ros1', + ) + lines.append(' for item in val:') + lines.append(' pos = func(pos, item, typestore)') + + is_stat = False + lines.append(' return pos') + return compile_lines(lines).getsize_ros1, is_stat * size + + +def generate_serialize_ros1(fields: list[Field], typename: str) -> CDRSer: + """Generate ros1 serialization function. + + Args: + fields: Fields of message. + typename: Message type name. + + Returns: + Serializer function. + + """ + # pylint: disable=too-many-branches,too-many-statements + lines = [ + 'import sys', + 'import numpy', + 'from rosbags.serde.messages import SerdeError, get_msgdef', + 'from rosbags.serde.primitives import pack_bool_le', + 'from rosbags.serde.primitives import pack_int8_le', + 'from rosbags.serde.primitives import pack_int16_le', + 'from rosbags.serde.primitives import pack_int32_le', + 'from rosbags.serde.primitives import pack_int64_le', + 'from rosbags.serde.primitives import pack_uint8_le', + 'from rosbags.serde.primitives import pack_uint16_le', + 'from rosbags.serde.primitives import pack_uint32_le', + 'from rosbags.serde.primitives import pack_uint64_le', + 'from rosbags.serde.primitives import pack_float32_le', + 'from rosbags.serde.primitives import pack_float64_le', + 'def serialize_ros1(rawdata, pos, message, typestore):', + ] + + if typename == 'std_msgs/msg/Header': + lines.append(' pos += 4') + + be_syms = ('>',) if sys.byteorder == 'little' else ('=', '>') + + for fcurr in fields: + fieldname, desc = fcurr + + lines.append(f' val = message.{fieldname}') + if desc.valtype == Valtype.MESSAGE: + name = desc.args.name + lines.append(f' func = get_msgdef("{name}", typestore).serialize_ros1') + lines.append(' pos = func(rawdata, pos, val, typestore)') + + elif desc.valtype == Valtype.BASE: + if desc.args == 'string': + lines.append(' bval = memoryview(val.encode())') + lines.append(' length = len(bval)') + lines.append(' pack_int32_le(rawdata, pos, length)') + lines.append(' pos += 4') + lines.append(' rawdata[pos:pos + length] = bval') + lines.append(' pos += length') + else: + lines.append(f' pack_{desc.args}_le(rawdata, pos, val)') + lines.append(f' pos += {SIZEMAP[desc.args]}') + + elif desc.valtype == Valtype.ARRAY: + subdesc, length = desc.args + lines.append(f' if len(val) != {length}:') + lines.append(' raise SerdeError(\'Unexpected array length\')') + + if subdesc.valtype == Valtype.BASE: + if subdesc.args == 'string': + for idx in range(length): + lines.append(f' bval = memoryview(val[{idx}].encode())') + lines.append(' length = len(bval)') + lines.append(' pack_int32_le(rawdata, pos, length)') + lines.append(' pos += 4') + lines.append(' rawdata[pos:pos + length] = bval') + lines.append(' pos += length') + else: + lines.append(f' if val.dtype.byteorder in {be_syms}:') + lines.append(' val = val.byteswap()') + size = length * SIZEMAP[subdesc.args] + lines.append(f' rawdata[pos:pos + {size}] = val.view(numpy.uint8)') + lines.append(f' pos += {size}') + + else: + assert subdesc.valtype == Valtype.MESSAGE + name = subdesc.args.name + lines.append(f' func = get_msgdef("{name}", typestore).serialize_ros1') + for idx in range(length): + lines.append(f' pos = func(rawdata, pos, val[{idx}], typestore)') + else: + assert desc.valtype == Valtype.SEQUENCE + lines.append(' pack_int32_le(rawdata, pos, len(val))') + lines.append(' pos += 4') + subdesc = desc.args[0] + + if subdesc.valtype == Valtype.BASE: + if subdesc.args == 'string': + lines.append(' for item in val:') + lines.append(' bval = memoryview(item.encode())') + lines.append(' length = len(bval)') + lines.append(' pack_int32_le(rawdata, pos, length)') + lines.append(' pos += 4') + lines.append(' rawdata[pos:pos + length] = bval') + lines.append(' pos += length') + else: + lines.append(f' size = len(val) * {SIZEMAP[subdesc.args]}') + lines.append(f' if val.dtype.byteorder in {be_syms}:') + lines.append(' val = val.byteswap()') + lines.append(' rawdata[pos:pos + size] = val.view(numpy.uint8)') + lines.append(' pos += size') + + if subdesc.valtype == Valtype.MESSAGE: + name = subdesc.args.name + lines.append(f' func = get_msgdef("{name}", typestore).serialize_ros1') + lines.append(' for item in val:') + lines.append(' pos = func(rawdata, pos, item, typestore)') + + lines.append(' return pos') + return compile_lines(lines).serialize_ros1 # type: ignore + + +def generate_deserialize_ros1(fields: list[Field], typename: str) -> CDRDeser: + """Generate ros1 deserialization function. + + Args: + fields: Fields of message. + typename: Message type name. + + Returns: + Deserializer function. + + """ + # pylint: disable=too-many-branches,too-many-statements + lines = [ + 'import sys', + 'import numpy', + 'from rosbags.serde.messages import SerdeError, get_msgdef', + 'from rosbags.serde.primitives import unpack_bool_le', + 'from rosbags.serde.primitives import unpack_int8_le', + 'from rosbags.serde.primitives import unpack_int16_le', + 'from rosbags.serde.primitives import unpack_int32_le', + 'from rosbags.serde.primitives import unpack_int64_le', + 'from rosbags.serde.primitives import unpack_uint8_le', + 'from rosbags.serde.primitives import unpack_uint16_le', + 'from rosbags.serde.primitives import unpack_uint32_le', + 'from rosbags.serde.primitives import unpack_uint64_le', + 'from rosbags.serde.primitives import unpack_float32_le', + 'from rosbags.serde.primitives import unpack_float64_le', + 'def deserialize_ros1(rawdata, pos, cls, typestore):', + ] + + if typename == 'std_msgs/msg/Header': + lines.append(' pos += 4') + + be_syms = ('>',) if sys.byteorder == 'little' else ('=', '>') + + funcname = 'deserialize_ros1' + lines.append(' values = []') + for fcurr in fields: + desc = fcurr[1] + + if desc.valtype == Valtype.MESSAGE: + lines.append(f' msgdef = get_msgdef("{desc.args.name}", typestore)') + lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)') + lines.append(' values.append(obj)') + + elif desc.valtype == Valtype.BASE: + if desc.args == 'string': + lines.append(' length = unpack_int32_le(rawdata, pos)[0]') + lines.append(' string = bytes(rawdata[pos + 4:pos + 4 + length]).decode()') + lines.append(' values.append(string)') + lines.append(' pos += 4 + length') + else: + lines.append(f' value = unpack_{desc.args}_le(rawdata, pos)[0]') + lines.append(' values.append(value)') + lines.append(f' pos += {SIZEMAP[desc.args]}') + + elif desc.valtype == Valtype.ARRAY: + subdesc, length = desc.args + if subdesc.valtype == Valtype.BASE: + if subdesc.args == 'string': + lines.append(' value = []') + for _ in range(length): + lines.append(' length = unpack_int32_le(rawdata, pos)[0]') + lines.append( + ' value.append(bytes(rawdata[pos + 4:pos + 4 + length]).decode())', + ) + lines.append(' pos += 4 + length') + lines.append(' values.append(value)') + else: + size = length * SIZEMAP[subdesc.args] + lines.append( + f' val = numpy.frombuffer(rawdata, ' + f'dtype=numpy.{subdesc.args}, count={length}, offset=pos)', + ) + lines.append(f' if val.dtype.byteorder in {be_syms}:') + lines.append(' val = val.byteswap()') + lines.append(' values.append(val)') + lines.append(f' pos += {size}') + else: + assert subdesc.valtype == Valtype.MESSAGE + lines.append(f' msgdef = get_msgdef("{subdesc.args.name}", typestore)') + lines.append(' value = []') + for _ in range(length): + lines.append( + f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)', + ) + lines.append(' value.append(obj)') + lines.append(' values.append(value)') + + else: + assert desc.valtype == Valtype.SEQUENCE + lines.append(' size = unpack_int32_le(rawdata, pos)[0]') + lines.append(' pos += 4') + subdesc = desc.args[0] + + if subdesc.valtype == Valtype.BASE: + if subdesc.args == 'string': + lines.append(' value = []') + lines.append(' for _ in range(size):') + lines.append(' length = unpack_int32_le(rawdata, pos)[0]') + lines.append( + ' value.append(bytes(rawdata[pos + 4:pos + 4 + length])' + '.decode())', + ) + lines.append(' pos += 4 + length') + lines.append(' values.append(value)') + else: + lines.append(f' length = size * {SIZEMAP[subdesc.args]}') + lines.append( + f' val = numpy.frombuffer(rawdata, ' + f'dtype=numpy.{subdesc.args}, count=size, offset=pos)', + ) + lines.append(f' if val.dtype.byteorder in {be_syms}:') + lines.append(' val = val.byteswap()') + lines.append(' values.append(val)') + lines.append(' pos += length') + + if subdesc.valtype == Valtype.MESSAGE: + lines.append(f' msgdef = get_msgdef("{subdesc.args.name}", typestore)') + lines.append(' value = []') + lines.append(' for _ in range(size):') + lines.append( + f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)', + ) + lines.append(' value.append(obj)') + lines.append(' values.append(value)') + + lines.append(' return cls(*values), pos') + return compile_lines(lines).deserialize_ros1 # type: ignore diff --git a/src/rosbags/serde/serdes.py b/src/rosbags/serde/serdes.py index 6d56a7b0..ab38af61 100644 --- a/src/rosbags/serde/serdes.py +++ b/src/rosbags/serde/serdes.py @@ -73,6 +73,54 @@ def serialize_cdr( return rawdata.toreadonly() +def deserialize_ros1( + rawdata: bytes, + typename: str, + typestore: Typestore = types, +) -> Any: # noqa: ANN401 + """Deserialize raw data into a message object. + + Args: + rawdata: Serialized data. + typename: Message type name. + typestore: Type store. + + Returns: + Deserialized message object. + + """ + msgdef = get_msgdef(typename, typestore) + func = msgdef.deserialize_ros1 + message, pos = func(rawdata, 0, msgdef.cls, typestore) + assert pos == len(rawdata) + return message + + +def serialize_ros1( + message: object, + typename: str, + typestore: Typestore = types, +) -> memoryview: + """Serialize message object to bytes. + + Args: + message: Message object. + typename: Message type name. + typestore: Type store. + + Returns: + Serialized bytes. + + """ + msgdef = get_msgdef(typename, typestore) + size = msgdef.getsize_ros1(0, message, typestore) + rawdata = memoryview(bytearray(size)) + func = msgdef.serialize_ros1 + pos = func(rawdata, 0, message, typestore) + assert pos == size + return rawdata.toreadonly() + + def ros1_to_cdr(raw: bytes, typename: str, typestore: Typestore = types) -> memoryview: """Convert serialized ROS1 message directly to CDR. diff --git a/src/rosbags/serde/typing.py b/src/rosbags/serde/typing.py index 5275cc2b..8d563308 100644 --- a/src/rosbags/serde/typing.py +++ b/src/rosbags/serde/typing.py @@ -45,6 +45,10 @@ class Msgdef(NamedTuple): serialize_cdr_be: CDRSer deserialize_cdr_le: CDRDeser deserialize_cdr_be: CDRDeser + size_ros1: int + getsize_ros1: CDRSerSize + serialize_ros1: CDRSer + deserialize_ros1: CDRDeser getsize_ros1_to_cdr: BitcvtSize ros1_to_cdr: Bitcvt getsize_cdr_to_ros1: BitcvtSize diff --git a/tests/test_serde.py b/tests/test_serde.py index 586e3573..88542cbc 100644 --- a/tests/test_serde.py +++ b/tests/test_serde.py @@ -10,7 +10,15 @@ from unittest.mock import MagicMock, patch import numpy import pytest -from rosbags.serde import SerdeError, cdr_to_ros1, deserialize_cdr, ros1_to_cdr, serialize_cdr +from rosbags.serde import ( + SerdeError, + cdr_to_ros1, + deserialize_cdr, + deserialize_ros1, + ros1_to_cdr, + serialize_cdr, + serialize_ros1, +) from rosbags.serde.messages import get_msgdef from rosbags.typesys import get_types_from_msg, register_types, types from rosbags.typesys.types import builtin_interfaces__msg__Time as Time @@ -206,6 +214,8 @@ def _comparable() -> Generator[None, None, None]: def __init__(self, *args: Any, **kwargs: Any): # noqa: ANN401 super().__init__(*args, **kwargs) + self.dtype = kwargs['wraps'].dtype + self.reshape = kwargs['wraps'].reshape self.__eq__ = arreq # type: ignore def byteswap(self, *args: Any) -> CNDArray: # noqa: ANN401 @@ -230,6 +240,11 @@ def test_serde(message: tuple[bytes, str, bool]) -> None: assert len(rawdata) - len(serdeser) < 4 assert all(x == 0 for x in rawdata[len(serdeser):]) + if rawdata[1] == 1: + rawdata = cdr_to_ros1(rawdata, typ) + serdeser = serialize_ros1(deserialize_ros1(rawdata, typ), typ) + assert serdeser == rawdata + @pytest.mark.usefixtures('_comparable') def test_deserializer() -> None: @@ -244,6 +259,8 @@ def test_deserializer() -> None: assert msg.points[1].x == 1.25 assert msg.points[1].y == 2.25 assert msg.points[1].z == 3.25 + msg_ros1 = deserialize_ros1(cdr_to_ros1(*MSG_POLY[:2]), MSG_POLY[1]) + assert msg_ros1 == msg msg = deserialize_cdr(*MSG_MAGN[:2]) assert msg == deserialize(*MSG_MAGN[:2]) @@ -256,6 +273,8 @@ def test_deserializer() -> None: assert (field.x, field.y, field.z) == (128., 128., 128.) diag = numpy.diag(msg.magnetic_field_covariance.reshape(3, 3)) assert (diag == [1., 1., 1.]).all() + msg_ros1 = deserialize_ros1(cdr_to_ros1(*MSG_MAGN[:2]), MSG_MAGN[1]) + assert msg_ros1 == msg msg_big = deserialize_cdr(*MSG_MAGN_BIG[:2]) assert msg_big == deserialize(*MSG_MAGN_BIG[:2]) @@ -384,10 +403,14 @@ def test_custom_type() -> None: assert res == deserialize(serialize(msg, cname), cname) assert res == msg + res = deserialize_ros1(serialize_ros1(msg, cname), cname) + assert res == msg + def test_ros1_to_cdr() -> None: """Test ROS1 to CDR conversion.""" - register_types(dict(get_types_from_msg(STATIC_16_64, 'test_msgs/msg/static_16_64'))) + msgtype = 'test_msgs/msg/static_16_64' + register_types(dict(get_types_from_msg(STATIC_16_64, msgtype))) msg_ros = (b'\x01\x00' b'\x00\x00\x00\x00\x00\x00\x00\x02') msg_cdr = ( @@ -396,9 +419,11 @@ def test_ros1_to_cdr() -> None: b'\x00\x00\x00\x00\x00\x00' b'\x00\x00\x00\x00\x00\x00\x00\x02' ) - assert ros1_to_cdr(msg_ros, 'test_msgs/msg/static_16_64') == msg_cdr + assert ros1_to_cdr(msg_ros, msgtype) == msg_cdr + assert serialize_cdr(deserialize_ros1(msg_ros, msgtype), msgtype) == msg_cdr - register_types(dict(get_types_from_msg(DYNAMIC_S_64, 'test_msgs/msg/dynamic_s_64'))) + msgtype = 'test_msgs/msg/dynamic_s_64' + register_types(dict(get_types_from_msg(DYNAMIC_S_64, msgtype))) msg_ros = (b'\x01\x00\x00\x00X' b'\x00\x00\x00\x00\x00\x00\x00\x02') msg_cdr = ( @@ -407,12 +432,14 @@ def test_ros1_to_cdr() -> None: b'\x00\x00' b'\x00\x00\x00\x00\x00\x00\x00\x02' ) - assert ros1_to_cdr(msg_ros, 'test_msgs/msg/dynamic_s_64') == msg_cdr + assert ros1_to_cdr(msg_ros, msgtype) == msg_cdr + assert serialize_cdr(deserialize_ros1(msg_ros, msgtype), msgtype) == msg_cdr def test_cdr_to_ros1() -> None: """Test CDR to ROS1 conversion.""" - register_types(dict(get_types_from_msg(STATIC_16_64, 'test_msgs/msg/static_16_64'))) + msgtype = 'test_msgs/msg/static_16_64' + register_types(dict(get_types_from_msg(STATIC_16_64, msgtype))) msg_ros = (b'\x01\x00' b'\x00\x00\x00\x00\x00\x00\x00\x02') msg_cdr = ( @@ -421,9 +448,11 @@ def test_cdr_to_ros1() -> None: b'\x00\x00\x00\x00\x00\x00' b'\x00\x00\x00\x00\x00\x00\x00\x02' ) - assert cdr_to_ros1(msg_cdr, 'test_msgs/msg/static_16_64') == msg_ros + assert cdr_to_ros1(msg_cdr, msgtype) == msg_ros + assert serialize_ros1(deserialize_cdr(msg_cdr, msgtype), msgtype) == msg_ros - register_types(dict(get_types_from_msg(DYNAMIC_S_64, 'test_msgs/msg/dynamic_s_64'))) + msgtype = 'test_msgs/msg/dynamic_s_64' + register_types(dict(get_types_from_msg(DYNAMIC_S_64, msgtype))) msg_ros = (b'\x01\x00\x00\x00X' b'\x00\x00\x00\x00\x00\x00\x00\x02') msg_cdr = ( @@ -432,7 +461,8 @@ def test_cdr_to_ros1() -> None: b'\x00\x00' b'\x00\x00\x00\x00\x00\x00\x00\x02' ) - assert cdr_to_ros1(msg_cdr, 'test_msgs/msg/dynamic_s_64') == msg_ros + assert cdr_to_ros1(msg_cdr, msgtype) == msg_ros + assert serialize_ros1(deserialize_cdr(msg_cdr, msgtype), msgtype) == msg_ros header = Header(stamp=Time(42, 666), frame_id='frame') msg_ros = cdr_to_ros1(serialize_cdr(header, 'std_msgs/msg/Header'), 'std_msgs/msg/Header')