Add CDR to ROS1 bytestream conversion
This commit is contained in:
parent
03b4d7e5c7
commit
ef97081e5a
@ -9,10 +9,11 @@ convert directly between different serialization formats.
|
||||
"""
|
||||
|
||||
from .messages import SerdeError
|
||||
from .serdes import deserialize_cdr, ros1_to_cdr, serialize_cdr
|
||||
from .serdes import cdr_to_ros1, deserialize_cdr, ros1_to_cdr, serialize_cdr
|
||||
|
||||
__all__ = [
|
||||
'SerdeError',
|
||||
'cdr_to_ros1',
|
||||
'deserialize_cdr',
|
||||
'ros1_to_cdr',
|
||||
'serialize_cdr',
|
||||
|
||||
@ -9,7 +9,7 @@ from typing import TYPE_CHECKING
|
||||
from rosbags.typesys import types
|
||||
|
||||
from .cdr import generate_deserialize_cdr, generate_getsize_cdr, generate_serialize_cdr
|
||||
from .ros1 import generate_ros1_to_cdr
|
||||
from .ros1 import generate_cdr_to_ros1, generate_ros1_to_cdr
|
||||
from .typing import Descriptor, Field, Msgdef
|
||||
from .utils import Valtype
|
||||
|
||||
@ -67,5 +67,7 @@ def get_msgdef(typename: str) -> Msgdef:
|
||||
generate_deserialize_cdr(fields, 'be'),
|
||||
generate_ros1_to_cdr(fields, typename, False),
|
||||
generate_ros1_to_cdr(fields, typename, True),
|
||||
generate_cdr_to_ros1(fields, typename, False),
|
||||
generate_cdr_to_ros1(fields, typename, True),
|
||||
)
|
||||
return MSGDEFCACHE[typename]
|
||||
|
||||
@ -168,3 +168,150 @@ def generate_ros1_to_cdr(fields: List[Field], typename: str, copy: bool) -> Call
|
||||
|
||||
lines.append(' return ipos, opos')
|
||||
return getattr(compile_lines(lines), funcname)
|
||||
|
||||
|
||||
def generate_cdr_to_ros1(fields: List[Field], typename: str, copy: bool) -> Callable:
|
||||
"""Generate CDR to ROS1 conversion function.
|
||||
|
||||
Args:
|
||||
fields: Fields of message.
|
||||
typename: Message type name.
|
||||
copy: Generate conversion or sizing function.
|
||||
|
||||
Returns:
|
||||
CDR to ROS1 conversion function.
|
||||
|
||||
"""
|
||||
# pylint: disable=too-many-branches,too-many-locals,too-many-nested-blocks,too-many-statements
|
||||
aligned = 8
|
||||
icurr, inext = cast(Tuple[Iterator[Field], Iterator[Optional[Field]]], tee([*fields, None]))
|
||||
next(inext)
|
||||
funcname = 'cdr_to_ros1' if copy else 'getsize_cdr_to_ros1'
|
||||
lines = [
|
||||
'import sys',
|
||||
'import numpy',
|
||||
'from rosbags.serde.messages import SerdeError, get_msgdef',
|
||||
'from rosbags.serde.primitives import pack_int32_le',
|
||||
'from rosbags.serde.primitives import unpack_int32_le',
|
||||
f'def {funcname}(input, ipos, output, opos):',
|
||||
]
|
||||
|
||||
if typename == 'std_msgs/msg/Header':
|
||||
lines.append(' opos += 4')
|
||||
|
||||
for fcurr, fnext in zip(icurr, inext):
|
||||
_, desc = fcurr
|
||||
|
||||
if desc.valtype == Valtype.MESSAGE:
|
||||
lines.append(f' func = get_msgdef("{desc.args.name}").{funcname}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos)')
|
||||
aligned = align_after(desc)
|
||||
|
||||
elif desc.valtype == Valtype.BASE:
|
||||
if desc.args == 'string':
|
||||
lines.append(' length = unpack_int32_le(input, ipos)[0] - 1')
|
||||
if copy:
|
||||
lines.append(' pack_int32_le(output, opos, length)')
|
||||
lines.append(' ipos += 4')
|
||||
lines.append(' opos += 4')
|
||||
if copy:
|
||||
lines.append(' output[opos:opos + length] = input[ipos:ipos + length]')
|
||||
lines.append(' ipos += length + 1')
|
||||
lines.append(' opos += length')
|
||||
aligned = 1
|
||||
else:
|
||||
size = SIZEMAP[desc.args]
|
||||
if copy:
|
||||
lines.append(f' output[opos:opos + {size}] = input[ipos:ipos + {size}]')
|
||||
lines.append(f' ipos += {size}')
|
||||
lines.append(f' opos += {size}')
|
||||
aligned = size
|
||||
|
||||
elif desc.valtype == Valtype.ARRAY:
|
||||
subdesc, length = desc.args
|
||||
|
||||
if subdesc.valtype == Valtype.BASE:
|
||||
if subdesc.args == 'string':
|
||||
for _ in range(length):
|
||||
lines.append(' ipos = (ipos + 4 - 1) & -4')
|
||||
lines.append(' length = unpack_int32_le(input, ipos)[0] - 1')
|
||||
if copy:
|
||||
lines.append(' pack_int32_le(output, opos, length)')
|
||||
lines.append(' ipos += 4')
|
||||
lines.append(' opos += 4')
|
||||
if copy:
|
||||
lines.append(
|
||||
' output[opos:opos + length] = input[ipos:ipos + length]',
|
||||
)
|
||||
lines.append(' ipos += length + 1')
|
||||
lines.append(' opos += length')
|
||||
aligned = 1
|
||||
else:
|
||||
size = length * SIZEMAP[subdesc.args]
|
||||
if copy:
|
||||
lines.append(f' output[opos:opos + {size}] = input[ipos:ipos + {size}]')
|
||||
lines.append(f' ipos += {size}')
|
||||
lines.append(f' opos += {size}')
|
||||
aligned = SIZEMAP[subdesc.args]
|
||||
|
||||
if subdesc.valtype == Valtype.MESSAGE:
|
||||
anext = align(subdesc)
|
||||
anext_after = align_after(subdesc)
|
||||
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}')
|
||||
for _ in range(length):
|
||||
if anext > anext_after:
|
||||
lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos)')
|
||||
aligned = anext_after
|
||||
else:
|
||||
assert desc.valtype == Valtype.SEQUENCE
|
||||
lines.append(' size = unpack_int32_le(input, ipos)[0]')
|
||||
if copy:
|
||||
lines.append(' pack_int32_le(output, opos, size)')
|
||||
lines.append(' ipos += 4')
|
||||
lines.append(' opos += 4')
|
||||
subdesc = desc.args[0]
|
||||
aligned = 4
|
||||
|
||||
if subdesc.valtype == Valtype.BASE:
|
||||
if subdesc.args == 'string':
|
||||
lines.append(' for _ in range(size):')
|
||||
lines.append(' ipos = (ipos + 4 - 1) & -4')
|
||||
lines.append(' length = unpack_int32_le(input, ipos)[0] - 1')
|
||||
if copy:
|
||||
lines.append(' pack_int32_le(output, opos, length)')
|
||||
lines.append(' ipos += 4')
|
||||
lines.append(' opos += 4')
|
||||
if copy:
|
||||
lines.append(' output[opos:opos + length] = input[ipos:ipos + length]')
|
||||
lines.append(' ipos += length + 1')
|
||||
lines.append(' opos += length')
|
||||
aligned = 1
|
||||
else:
|
||||
if aligned < (anext := align(subdesc)):
|
||||
lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}')
|
||||
lines.append(f' length = size * {SIZEMAP[subdesc.args]}')
|
||||
if copy:
|
||||
lines.append(' output[opos:opos + length] = input[ipos:ipos + length]')
|
||||
lines.append(' ipos += length')
|
||||
lines.append(' opos += length')
|
||||
aligned = anext
|
||||
|
||||
else:
|
||||
assert subdesc.valtype == Valtype.MESSAGE
|
||||
anext = align(subdesc)
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}')
|
||||
lines.append(' for _ in range(size):')
|
||||
lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos)')
|
||||
aligned = align_after(subdesc)
|
||||
|
||||
aligned = min([aligned, 4])
|
||||
|
||||
if fnext and aligned < (anext := align(fnext.descriptor)):
|
||||
lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}')
|
||||
aligned = anext
|
||||
|
||||
lines.append(' return ipos, opos')
|
||||
return getattr(compile_lines(lines), funcname)
|
||||
|
||||
@ -100,3 +100,44 @@ def ros1_to_cdr(raw: bytes, typename: str) -> memoryview:
|
||||
assert ipos == len(raw)
|
||||
assert opos + 4 == size
|
||||
return rawdata.toreadonly()
|
||||
|
||||
|
||||
def cdr_to_ros1(raw: bytes, typename: str) -> memoryview:
|
||||
"""Convert serialized CDR message directly to ROS1.
|
||||
|
||||
This should be reasonably fast as conversions happen on a byte-level
|
||||
without going through deserialization and serialization.
|
||||
|
||||
Args:
|
||||
raw: CDR serialized message.
|
||||
typename: Message type name.
|
||||
|
||||
Returns:
|
||||
ROS1 serialized message.
|
||||
|
||||
"""
|
||||
assert raw[1] == 1, 'Message byte order is not little endian'
|
||||
|
||||
msgdef = get_msgdef(typename)
|
||||
|
||||
ipos, opos = msgdef.getsize_cdr_to_ros1(
|
||||
raw[4:],
|
||||
0,
|
||||
None,
|
||||
0,
|
||||
)
|
||||
assert ipos + 4 == len(raw)
|
||||
|
||||
raw = memoryview(raw)
|
||||
size = opos
|
||||
rawdata = memoryview(bytearray(size))
|
||||
|
||||
ipos, opos = msgdef.cdr_to_ros1(
|
||||
raw[4:],
|
||||
0,
|
||||
rawdata,
|
||||
0,
|
||||
)
|
||||
assert ipos + 4 == len(raw)
|
||||
assert opos == size
|
||||
return rawdata.toreadonly()
|
||||
|
||||
@ -38,3 +38,5 @@ class Msgdef(NamedTuple):
|
||||
deserialize_cdr_be: Callable
|
||||
getsize_ros1_to_cdr: Callable
|
||||
ros1_to_cdr: Callable
|
||||
getsize_cdr_to_ros1: Callable
|
||||
cdr_to_ros1: Callable
|
||||
|
||||
@ -10,9 +10,10 @@ from unittest.mock import MagicMock, patch
|
||||
import numpy
|
||||
import pytest
|
||||
|
||||
from rosbags.serde import SerdeError, deserialize_cdr, ros1_to_cdr, serialize_cdr
|
||||
from rosbags.serde import SerdeError, cdr_to_ros1, deserialize_cdr, ros1_to_cdr, serialize_cdr
|
||||
from rosbags.serde.messages import get_msgdef
|
||||
from rosbags.typesys import get_types_from_msg, register_types
|
||||
from rosbags.typesys.types import builtin_interfaces__msg__Time, std_msgs__msg__Header
|
||||
|
||||
from .cdr import deserialize, serialize
|
||||
|
||||
@ -380,3 +381,30 @@ def test_ros1_to_cdr():
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x02'
|
||||
)
|
||||
assert ros1_to_cdr(msg_ros, 'test_msgs/msg/dynamic_s_64') == msg_cdr
|
||||
|
||||
|
||||
def test_cdr_to_ros1():
|
||||
"""Test CDR to ROS1 conversion."""
|
||||
register_types(dict(get_types_from_msg(STATIC_16_64, 'test_msgs/msg/static_16_64')))
|
||||
msg_ros = (b'\x01\x00' b'\x00\x00\x00\x00\x00\x00\x00\x02')
|
||||
msg_cdr = (
|
||||
b'\x00\x01\x00\x00'
|
||||
b'\x01\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x02'
|
||||
)
|
||||
assert cdr_to_ros1(msg_cdr, 'test_msgs/msg/static_16_64') == msg_ros
|
||||
|
||||
register_types(dict(get_types_from_msg(DYNAMIC_S_64, 'test_msgs/msg/dynamic_s_64')))
|
||||
msg_ros = (b'\x01\x00\x00\x00X' b'\x00\x00\x00\x00\x00\x00\x00\x02')
|
||||
msg_cdr = (
|
||||
b'\x00\x01\x00\x00'
|
||||
b'\x02\x00\x00\x00X\x00'
|
||||
b'\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x02'
|
||||
)
|
||||
assert cdr_to_ros1(msg_cdr, 'test_msgs/msg/dynamic_s_64') == msg_ros
|
||||
|
||||
header = std_msgs__msg__Header(stamp=builtin_interfaces__msg__Time(42, 666), frame_id='frame')
|
||||
msg_ros = cdr_to_ros1(serialize_cdr(header, 'std_msgs/msg/Header'), 'std_msgs/msg/Header')
|
||||
assert msg_ros == b'\x00\x00\x00\x00*\x00\x00\x00\x9a\x02\x00\x00\x05\x00\x00\x00frame'
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user