Implement direct ros1 (de)serialization

This commit is contained in:
Marko Durkovic 2022-09-22 21:46:00 +02:00
parent 1309d42b64
commit 219a4d9846
9 changed files with 483 additions and 19 deletions

View File

@ -3,7 +3,7 @@
import numpy
from rosbags.rosbag1 import Writer
from rosbags.serde import cdr_to_ros1, serialize_cdr
from rosbags.serde import serialize_ros1
from rosbags.typesys.types import builtin_interfaces__msg__Time as Time
from rosbags.typesys.types import sensor_msgs__msg__CompressedImage as CompressedImage
from rosbags.typesys.types import std_msgs__msg__Header as Header
@ -39,8 +39,5 @@ def save_images() -> None:
writer.write(
conn,
timestamp,
cdr_to_ros1(
serialize_cdr(message, CompressedImage.__msgtype__),
CompressedImage.__msgtype__,
),
serialize_ros1(message, CompressedImage.__msgtype__),
)

View File

@ -15,6 +15,16 @@ Deserialize a CDR bytes object using :py:func:`deserialize_cdr() <rosbags.serde.
# rawdata is of type bytes and contains serialized message
msg = deserialize_cdr(rawdata, 'geometry_msgs/msg/Quaternion')
Deserialize a ROS1 bytes object using :py:func:`deserialize_ros1() <rosbags.serde.deserialize_ros1>`:
.. code-block:: python
from rosbags.serde import deserialize_ros1
# rawdata is of type bytes and contains serialized message
msg = deserialize_ros1(rawdata, 'geometry_msgs/msg/Quaternion')
Serialization
---------------
@ -29,3 +39,11 @@ Serialize a message with CDR using :py:func:`serialize_cdr() <rosbags.serde.seri
# serialize message with explicit endianess
serialized = serialize_cdr(msg, 'geometry_msgs/msg/Quaternion', little_endian=False)
Serialize a message with ROS1 using :py:func:`serialize_ros1() <rosbags.serde.serialize_ros1>`:
.. code-block:: python
from rosbags.serde import serialize_ros1
serialized = serialize_ros1(msg, 'geometry_msgs/msg/Quaternion')

View File

@ -15,7 +15,7 @@ from rosbags.rosbag1 import Reader as Reader1
from rosbags.rosbag1 import ReaderError as ReaderError1
from rosbags.rosbag2 import Reader as Reader2
from rosbags.rosbag2 import ReaderError as ReaderError2
from rosbags.serde import deserialize_cdr, ros1_to_cdr
from rosbags.serde import deserialize_cdr, deserialize_ros1
from rosbags.typesys import get_types_from_msg, register_types, types
if TYPE_CHECKING:
@ -101,7 +101,7 @@ class AnyReader:
def _deser_ros1(self, rawdata: bytes, typ: str) -> object:
"""Deserialize ROS1 message."""
return deserialize_cdr(ros1_to_cdr(rawdata, typ, self.typestore), typ, self.typestore)
return deserialize_ros1(rawdata, typ, self.typestore)
def _deser_ros2(self, rawdata: bytes, typ: str) -> object:
"""Deserialize CDR message."""

View File

@ -9,12 +9,21 @@ convert directly between different serialization formats.
"""
from .messages import SerdeError
from .serdes import cdr_to_ros1, deserialize_cdr, ros1_to_cdr, serialize_cdr
from .serdes import (
cdr_to_ros1,
deserialize_cdr,
deserialize_ros1,
ros1_to_cdr,
serialize_cdr,
serialize_ros1,
)
__all__ = [
'SerdeError',
'cdr_to_ros1',
'deserialize_cdr',
'deserialize_ros1',
'ros1_to_cdr',
'serialize_cdr',
'serialize_ros1',
]

View File

@ -7,7 +7,13 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from .cdr import generate_deserialize_cdr, generate_getsize_cdr, generate_serialize_cdr
from .ros1 import generate_cdr_to_ros1, generate_ros1_to_cdr
from .ros1 import (
generate_cdr_to_ros1,
generate_deserialize_ros1,
generate_getsize_ros1,
generate_ros1_to_cdr,
generate_serialize_ros1,
)
from .typing import Descriptor, Field, Msgdef
from .utils import Valtype
@ -62,6 +68,7 @@ def get_msgdef(typename: str, typestore: Typestore) -> Msgdef:
fields = [Field(name, fixup(desc)) for name, desc in entries]
getsize_cdr, size_cdr = generate_getsize_cdr(fields)
getsize_ros1, size_ros1 = generate_getsize_ros1(fields, typename)
cache[typename] = Msgdef(
typename,
@ -73,6 +80,10 @@ def get_msgdef(typename: str, typestore: Typestore) -> Msgdef:
generate_serialize_cdr(fields, 'be'),
generate_deserialize_cdr(fields, 'le'),
generate_deserialize_cdr(fields, 'be'),
size_ros1,
getsize_ros1,
generate_serialize_ros1(fields, typename),
generate_deserialize_ros1(fields, typename),
generate_ros1_to_cdr(fields, typename, False), # type: ignore
generate_ros1_to_cdr(fields, typename, True), # type: ignore
generate_cdr_to_ros1(fields, typename, False), # type: ignore

View File

@ -11,6 +11,7 @@ conversion of ROS1 to CDR.
from __future__ import annotations
import sys
from itertools import tee
from typing import TYPE_CHECKING, Iterator, cast
@ -20,7 +21,7 @@ from .utils import SIZEMAP, Valtype, align, align_after, compile_lines
if TYPE_CHECKING:
from typing import Union
from .typing import Bitcvt, BitcvtSize
from .typing import Bitcvt, BitcvtSize, CDRDeser, CDRSer, CDRSerSize
def generate_ros1_to_cdr(
@ -331,3 +332,349 @@ def generate_cdr_to_ros1(
lines.append(' return ipos, opos')
return getattr(compile_lines(lines), funcname) # type: ignore
def generate_getsize_ros1(fields: list[Field], typename: str) -> tuple[CDRSerSize, int]:
"""Generate ros1 size calculation function.
Args:
fields: Fields of message.
typename: Message type name.
Returns:
Size calculation function and static size.
"""
# pylint: disable=too-many-branches,too-many-statements
size = 0
is_stat = True
lines = [
'import sys',
'from rosbags.serde.messages import get_msgdef',
'def getsize_ros1(pos, message, typestore):',
]
if typename == 'std_msgs/msg/Header':
lines.append(' pos += 4')
for fcurr in fields:
fieldname, desc = fcurr
if desc.valtype == Valtype.MESSAGE:
if desc.args.size_ros1:
lines.append(f' pos += {desc.args.size_ros1}')
size += desc.args.size_ros1
else:
lines.append(f' func = get_msgdef("{desc.args.name}", typestore).getsize_ros1')
lines.append(f' pos = func(pos, message.{fieldname}, typestore)')
is_stat = False
elif desc.valtype == Valtype.BASE:
if desc.args == 'string':
lines.append(f' pos += 4 + len(message.{fieldname}.encode())')
is_stat = False
else:
lines.append(f' pos += {SIZEMAP[desc.args]}')
size += SIZEMAP[desc.args]
elif desc.valtype == Valtype.ARRAY:
subdesc, length = desc.args
if subdesc.valtype == Valtype.BASE:
if subdesc.args == 'string':
lines.append(f' val = message.{fieldname}')
for idx in range(length):
lines.append(f' pos += 4 + len(val[{idx}].encode())')
is_stat = False
else:
lines.append(f' pos += {length * SIZEMAP[subdesc.args]}')
size += length * SIZEMAP[subdesc.args]
else:
assert subdesc.valtype == Valtype.MESSAGE
if subdesc.args.size_ros1:
for _ in range(length):
lines.append(f' pos += {subdesc.args.size_ros1}')
size += subdesc.args.size_ros1
else:
lines.append(
f' func = get_msgdef("{subdesc.args.name}", typestore).getsize_ros1',
)
lines.append(f' val = message.{fieldname}')
for idx in range(length):
lines.append(f' pos = func(pos, val[{idx}], typestore)')
is_stat = False
else:
assert desc.valtype == Valtype.SEQUENCE
lines.append(' pos += 4')
subdesc = desc.args[0]
if subdesc.valtype == Valtype.BASE:
if subdesc.args == 'string':
lines.append(f' for val in message.{fieldname}:')
lines.append(' pos += 4 + len(val.encode())')
else:
lines.append(f' pos += len(message.{fieldname}) * {SIZEMAP[subdesc.args]}')
else:
assert subdesc.valtype == Valtype.MESSAGE
lines.append(f' val = message.{fieldname}')
if subdesc.args.size_ros1:
lines.append(f' pos += {subdesc.args.size_ros1} * len(val)')
else:
lines.append(
f' func = get_msgdef("{subdesc.args.name}", typestore).getsize_ros1',
)
lines.append(' for item in val:')
lines.append(' pos = func(pos, item, typestore)')
is_stat = False
lines.append(' return pos')
return compile_lines(lines).getsize_ros1, is_stat * size
def generate_serialize_ros1(fields: list[Field], typename: str) -> CDRSer:
"""Generate ros1 serialization function.
Args:
fields: Fields of message.
typename: Message type name.
Returns:
Serializer function.
"""
# pylint: disable=too-many-branches,too-many-statements
lines = [
'import sys',
'import numpy',
'from rosbags.serde.messages import SerdeError, get_msgdef',
'from rosbags.serde.primitives import pack_bool_le',
'from rosbags.serde.primitives import pack_int8_le',
'from rosbags.serde.primitives import pack_int16_le',
'from rosbags.serde.primitives import pack_int32_le',
'from rosbags.serde.primitives import pack_int64_le',
'from rosbags.serde.primitives import pack_uint8_le',
'from rosbags.serde.primitives import pack_uint16_le',
'from rosbags.serde.primitives import pack_uint32_le',
'from rosbags.serde.primitives import pack_uint64_le',
'from rosbags.serde.primitives import pack_float32_le',
'from rosbags.serde.primitives import pack_float64_le',
'def serialize_ros1(rawdata, pos, message, typestore):',
]
if typename == 'std_msgs/msg/Header':
lines.append(' pos += 4')
be_syms = ('>',) if sys.byteorder == 'little' else ('=', '>')
for fcurr in fields:
fieldname, desc = fcurr
lines.append(f' val = message.{fieldname}')
if desc.valtype == Valtype.MESSAGE:
name = desc.args.name
lines.append(f' func = get_msgdef("{name}", typestore).serialize_ros1')
lines.append(' pos = func(rawdata, pos, val, typestore)')
elif desc.valtype == Valtype.BASE:
if desc.args == 'string':
lines.append(' bval = memoryview(val.encode())')
lines.append(' length = len(bval)')
lines.append(' pack_int32_le(rawdata, pos, length)')
lines.append(' pos += 4')
lines.append(' rawdata[pos:pos + length] = bval')
lines.append(' pos += length')
else:
lines.append(f' pack_{desc.args}_le(rawdata, pos, val)')
lines.append(f' pos += {SIZEMAP[desc.args]}')
elif desc.valtype == Valtype.ARRAY:
subdesc, length = desc.args
lines.append(f' if len(val) != {length}:')
lines.append(' raise SerdeError(\'Unexpected array length\')')
if subdesc.valtype == Valtype.BASE:
if subdesc.args == 'string':
for idx in range(length):
lines.append(f' bval = memoryview(val[{idx}].encode())')
lines.append(' length = len(bval)')
lines.append(' pack_int32_le(rawdata, pos, length)')
lines.append(' pos += 4')
lines.append(' rawdata[pos:pos + length] = bval')
lines.append(' pos += length')
else:
lines.append(f' if val.dtype.byteorder in {be_syms}:')
lines.append(' val = val.byteswap()')
size = length * SIZEMAP[subdesc.args]
lines.append(f' rawdata[pos:pos + {size}] = val.view(numpy.uint8)')
lines.append(f' pos += {size}')
else:
assert subdesc.valtype == Valtype.MESSAGE
name = subdesc.args.name
lines.append(f' func = get_msgdef("{name}", typestore).serialize_ros1')
for idx in range(length):
lines.append(f' pos = func(rawdata, pos, val[{idx}], typestore)')
else:
assert desc.valtype == Valtype.SEQUENCE
lines.append(' pack_int32_le(rawdata, pos, len(val))')
lines.append(' pos += 4')
subdesc = desc.args[0]
if subdesc.valtype == Valtype.BASE:
if subdesc.args == 'string':
lines.append(' for item in val:')
lines.append(' bval = memoryview(item.encode())')
lines.append(' length = len(bval)')
lines.append(' pack_int32_le(rawdata, pos, length)')
lines.append(' pos += 4')
lines.append(' rawdata[pos:pos + length] = bval')
lines.append(' pos += length')
else:
lines.append(f' size = len(val) * {SIZEMAP[subdesc.args]}')
lines.append(f' if val.dtype.byteorder in {be_syms}:')
lines.append(' val = val.byteswap()')
lines.append(' rawdata[pos:pos + size] = val.view(numpy.uint8)')
lines.append(' pos += size')
if subdesc.valtype == Valtype.MESSAGE:
name = subdesc.args.name
lines.append(f' func = get_msgdef("{name}", typestore).serialize_ros1')
lines.append(' for item in val:')
lines.append(' pos = func(rawdata, pos, item, typestore)')
lines.append(' return pos')
return compile_lines(lines).serialize_ros1 # type: ignore
def generate_deserialize_ros1(fields: list[Field], typename: str) -> CDRDeser:
"""Generate ros1 deserialization function.
Args:
fields: Fields of message.
typename: Message type name.
Returns:
Deserializer function.
"""
# pylint: disable=too-many-branches,too-many-statements
lines = [
'import sys',
'import numpy',
'from rosbags.serde.messages import SerdeError, get_msgdef',
'from rosbags.serde.primitives import unpack_bool_le',
'from rosbags.serde.primitives import unpack_int8_le',
'from rosbags.serde.primitives import unpack_int16_le',
'from rosbags.serde.primitives import unpack_int32_le',
'from rosbags.serde.primitives import unpack_int64_le',
'from rosbags.serde.primitives import unpack_uint8_le',
'from rosbags.serde.primitives import unpack_uint16_le',
'from rosbags.serde.primitives import unpack_uint32_le',
'from rosbags.serde.primitives import unpack_uint64_le',
'from rosbags.serde.primitives import unpack_float32_le',
'from rosbags.serde.primitives import unpack_float64_le',
'def deserialize_ros1(rawdata, pos, cls, typestore):',
]
if typename == 'std_msgs/msg/Header':
lines.append(' pos += 4')
be_syms = ('>',) if sys.byteorder == 'little' else ('=', '>')
funcname = 'deserialize_ros1'
lines.append(' values = []')
for fcurr in fields:
desc = fcurr[1]
if desc.valtype == Valtype.MESSAGE:
lines.append(f' msgdef = get_msgdef("{desc.args.name}", typestore)')
lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)')
lines.append(' values.append(obj)')
elif desc.valtype == Valtype.BASE:
if desc.args == 'string':
lines.append(' length = unpack_int32_le(rawdata, pos)[0]')
lines.append(' string = bytes(rawdata[pos + 4:pos + 4 + length]).decode()')
lines.append(' values.append(string)')
lines.append(' pos += 4 + length')
else:
lines.append(f' value = unpack_{desc.args}_le(rawdata, pos)[0]')
lines.append(' values.append(value)')
lines.append(f' pos += {SIZEMAP[desc.args]}')
elif desc.valtype == Valtype.ARRAY:
subdesc, length = desc.args
if subdesc.valtype == Valtype.BASE:
if subdesc.args == 'string':
lines.append(' value = []')
for _ in range(length):
lines.append(' length = unpack_int32_le(rawdata, pos)[0]')
lines.append(
' value.append(bytes(rawdata[pos + 4:pos + 4 + length]).decode())',
)
lines.append(' pos += 4 + length')
lines.append(' values.append(value)')
else:
size = length * SIZEMAP[subdesc.args]
lines.append(
f' val = numpy.frombuffer(rawdata, '
f'dtype=numpy.{subdesc.args}, count={length}, offset=pos)',
)
lines.append(f' if val.dtype.byteorder in {be_syms}:')
lines.append(' val = val.byteswap()')
lines.append(' values.append(val)')
lines.append(f' pos += {size}')
else:
assert subdesc.valtype == Valtype.MESSAGE
lines.append(f' msgdef = get_msgdef("{subdesc.args.name}", typestore)')
lines.append(' value = []')
for _ in range(length):
lines.append(
f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)',
)
lines.append(' value.append(obj)')
lines.append(' values.append(value)')
else:
assert desc.valtype == Valtype.SEQUENCE
lines.append(' size = unpack_int32_le(rawdata, pos)[0]')
lines.append(' pos += 4')
subdesc = desc.args[0]
if subdesc.valtype == Valtype.BASE:
if subdesc.args == 'string':
lines.append(' value = []')
lines.append(' for _ in range(size):')
lines.append(' length = unpack_int32_le(rawdata, pos)[0]')
lines.append(
' value.append(bytes(rawdata[pos + 4:pos + 4 + length])'
'.decode())',
)
lines.append(' pos += 4 + length')
lines.append(' values.append(value)')
else:
lines.append(f' length = size * {SIZEMAP[subdesc.args]}')
lines.append(
f' val = numpy.frombuffer(rawdata, '
f'dtype=numpy.{subdesc.args}, count=size, offset=pos)',
)
lines.append(f' if val.dtype.byteorder in {be_syms}:')
lines.append(' val = val.byteswap()')
lines.append(' values.append(val)')
lines.append(' pos += length')
if subdesc.valtype == Valtype.MESSAGE:
lines.append(f' msgdef = get_msgdef("{subdesc.args.name}", typestore)')
lines.append(' value = []')
lines.append(' for _ in range(size):')
lines.append(
f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)',
)
lines.append(' value.append(obj)')
lines.append(' values.append(value)')
lines.append(' return cls(*values), pos')
return compile_lines(lines).deserialize_ros1 # type: ignore

View File

@ -73,6 +73,54 @@ def serialize_cdr(
return rawdata.toreadonly()
def deserialize_ros1(
rawdata: bytes,
typename: str,
typestore: Typestore = types,
) -> Any: # noqa: ANN401
"""Deserialize raw data into a message object.
Args:
rawdata: Serialized data.
typename: Message type name.
typestore: Type store.
Returns:
Deserialized message object.
"""
msgdef = get_msgdef(typename, typestore)
func = msgdef.deserialize_ros1
message, pos = func(rawdata, 0, msgdef.cls, typestore)
assert pos == len(rawdata)
return message
def serialize_ros1(
message: object,
typename: str,
typestore: Typestore = types,
) -> memoryview:
"""Serialize message object to bytes.
Args:
message: Message object.
typename: Message type name.
typestore: Type store.
Returns:
Serialized bytes.
"""
msgdef = get_msgdef(typename, typestore)
size = msgdef.getsize_ros1(0, message, typestore)
rawdata = memoryview(bytearray(size))
func = msgdef.serialize_ros1
pos = func(rawdata, 0, message, typestore)
assert pos == size
return rawdata.toreadonly()
def ros1_to_cdr(raw: bytes, typename: str, typestore: Typestore = types) -> memoryview:
"""Convert serialized ROS1 message directly to CDR.

View File

@ -45,6 +45,10 @@ class Msgdef(NamedTuple):
serialize_cdr_be: CDRSer
deserialize_cdr_le: CDRDeser
deserialize_cdr_be: CDRDeser
size_ros1: int
getsize_ros1: CDRSerSize
serialize_ros1: CDRSer
deserialize_ros1: CDRDeser
getsize_ros1_to_cdr: BitcvtSize
ros1_to_cdr: Bitcvt
getsize_cdr_to_ros1: BitcvtSize

View File

@ -10,7 +10,15 @@ from unittest.mock import MagicMock, patch
import numpy
import pytest
from rosbags.serde import SerdeError, cdr_to_ros1, deserialize_cdr, ros1_to_cdr, serialize_cdr
from rosbags.serde import (
SerdeError,
cdr_to_ros1,
deserialize_cdr,
deserialize_ros1,
ros1_to_cdr,
serialize_cdr,
serialize_ros1,
)
from rosbags.serde.messages import get_msgdef
from rosbags.typesys import get_types_from_msg, register_types, types
from rosbags.typesys.types import builtin_interfaces__msg__Time as Time
@ -206,6 +214,8 @@ def _comparable() -> Generator[None, None, None]:
def __init__(self, *args: Any, **kwargs: Any): # noqa: ANN401
super().__init__(*args, **kwargs)
self.dtype = kwargs['wraps'].dtype
self.reshape = kwargs['wraps'].reshape
self.__eq__ = arreq # type: ignore
def byteswap(self, *args: Any) -> CNDArray: # noqa: ANN401
@ -230,6 +240,11 @@ def test_serde(message: tuple[bytes, str, bool]) -> None:
assert len(rawdata) - len(serdeser) < 4
assert all(x == 0 for x in rawdata[len(serdeser):])
if rawdata[1] == 1:
rawdata = cdr_to_ros1(rawdata, typ)
serdeser = serialize_ros1(deserialize_ros1(rawdata, typ), typ)
assert serdeser == rawdata
@pytest.mark.usefixtures('_comparable')
def test_deserializer() -> None:
@ -244,6 +259,8 @@ def test_deserializer() -> None:
assert msg.points[1].x == 1.25
assert msg.points[1].y == 2.25
assert msg.points[1].z == 3.25
msg_ros1 = deserialize_ros1(cdr_to_ros1(*MSG_POLY[:2]), MSG_POLY[1])
assert msg_ros1 == msg
msg = deserialize_cdr(*MSG_MAGN[:2])
assert msg == deserialize(*MSG_MAGN[:2])
@ -256,6 +273,8 @@ def test_deserializer() -> None:
assert (field.x, field.y, field.z) == (128., 128., 128.)
diag = numpy.diag(msg.magnetic_field_covariance.reshape(3, 3))
assert (diag == [1., 1., 1.]).all()
msg_ros1 = deserialize_ros1(cdr_to_ros1(*MSG_MAGN[:2]), MSG_MAGN[1])
assert msg_ros1 == msg
msg_big = deserialize_cdr(*MSG_MAGN_BIG[:2])
assert msg_big == deserialize(*MSG_MAGN_BIG[:2])
@ -384,10 +403,14 @@ def test_custom_type() -> None:
assert res == deserialize(serialize(msg, cname), cname)
assert res == msg
res = deserialize_ros1(serialize_ros1(msg, cname), cname)
assert res == msg
def test_ros1_to_cdr() -> None:
"""Test ROS1 to CDR conversion."""
register_types(dict(get_types_from_msg(STATIC_16_64, 'test_msgs/msg/static_16_64')))
msgtype = 'test_msgs/msg/static_16_64'
register_types(dict(get_types_from_msg(STATIC_16_64, msgtype)))
msg_ros = (b'\x01\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x02')
msg_cdr = (
@ -396,9 +419,11 @@ def test_ros1_to_cdr() -> None:
b'\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x02'
)
assert ros1_to_cdr(msg_ros, 'test_msgs/msg/static_16_64') == msg_cdr
assert ros1_to_cdr(msg_ros, msgtype) == msg_cdr
assert serialize_cdr(deserialize_ros1(msg_ros, msgtype), msgtype) == msg_cdr
register_types(dict(get_types_from_msg(DYNAMIC_S_64, 'test_msgs/msg/dynamic_s_64')))
msgtype = 'test_msgs/msg/dynamic_s_64'
register_types(dict(get_types_from_msg(DYNAMIC_S_64, msgtype)))
msg_ros = (b'\x01\x00\x00\x00X'
b'\x00\x00\x00\x00\x00\x00\x00\x02')
msg_cdr = (
@ -407,12 +432,14 @@ def test_ros1_to_cdr() -> None:
b'\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x02'
)
assert ros1_to_cdr(msg_ros, 'test_msgs/msg/dynamic_s_64') == msg_cdr
assert ros1_to_cdr(msg_ros, msgtype) == msg_cdr
assert serialize_cdr(deserialize_ros1(msg_ros, msgtype), msgtype) == msg_cdr
def test_cdr_to_ros1() -> None:
"""Test CDR to ROS1 conversion."""
register_types(dict(get_types_from_msg(STATIC_16_64, 'test_msgs/msg/static_16_64')))
msgtype = 'test_msgs/msg/static_16_64'
register_types(dict(get_types_from_msg(STATIC_16_64, msgtype)))
msg_ros = (b'\x01\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x02')
msg_cdr = (
@ -421,9 +448,11 @@ def test_cdr_to_ros1() -> None:
b'\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x02'
)
assert cdr_to_ros1(msg_cdr, 'test_msgs/msg/static_16_64') == msg_ros
assert cdr_to_ros1(msg_cdr, msgtype) == msg_ros
assert serialize_ros1(deserialize_cdr(msg_cdr, msgtype), msgtype) == msg_ros
register_types(dict(get_types_from_msg(DYNAMIC_S_64, 'test_msgs/msg/dynamic_s_64')))
msgtype = 'test_msgs/msg/dynamic_s_64'
register_types(dict(get_types_from_msg(DYNAMIC_S_64, msgtype)))
msg_ros = (b'\x01\x00\x00\x00X'
b'\x00\x00\x00\x00\x00\x00\x00\x02')
msg_cdr = (
@ -432,7 +461,8 @@ def test_cdr_to_ros1() -> None:
b'\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x02'
)
assert cdr_to_ros1(msg_cdr, 'test_msgs/msg/dynamic_s_64') == msg_ros
assert cdr_to_ros1(msg_cdr, msgtype) == msg_ros
assert serialize_ros1(deserialize_cdr(msg_cdr, msgtype), msgtype) == msg_ros
header = Header(stamp=Time(42, 666), frame_id='frame')
msg_ros = cdr_to_ros1(serialize_cdr(header, 'std_msgs/msg/Header'), 'std_msgs/msg/Header')