Allow alternative type stores
This commit is contained in:
parent
e88241074e
commit
dd357fa3e4
@ -44,7 +44,7 @@ def generate_getsize_cdr(fields: list[Field]) -> tuple[CDRSerSize, int]:
|
||||
lines = [
|
||||
'import sys',
|
||||
'from rosbags.serde.messages import get_msgdef',
|
||||
'def getsize_cdr(pos, message):',
|
||||
'def getsize_cdr(pos, message, typestore):',
|
||||
]
|
||||
for fcurr, fnext in zip(icurr, inext):
|
||||
fieldname, desc = fcurr
|
||||
@ -54,8 +54,8 @@ def generate_getsize_cdr(fields: list[Field]) -> tuple[CDRSerSize, int]:
|
||||
lines.append(f' pos += {desc.args.size_cdr}')
|
||||
size += desc.args.size_cdr
|
||||
else:
|
||||
lines.append(f' func = get_msgdef("{desc.args.name}").getsize_cdr')
|
||||
lines.append(f' pos = func(pos, message.{fieldname})')
|
||||
lines.append(f' func = get_msgdef("{desc.args.name}", typestore).getsize_cdr')
|
||||
lines.append(f' pos = func(pos, message.{fieldname}, typestore)')
|
||||
is_stat = False
|
||||
aligned = align_after(desc)
|
||||
|
||||
@ -97,12 +97,14 @@ def generate_getsize_cdr(fields: list[Field]) -> tuple[CDRSerSize, int]:
|
||||
lines.append(f' pos += {subdesc.args.size_cdr}')
|
||||
size += subdesc.args.size_cdr
|
||||
else:
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}").getsize_cdr')
|
||||
lines.append(
|
||||
f' func = get_msgdef("{subdesc.args.name}", typestore).getsize_cdr',
|
||||
)
|
||||
lines.append(f' val = message.{fieldname}')
|
||||
for idx in range(length):
|
||||
if anext_before > anext_after:
|
||||
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(f' pos = func(pos, val[{idx}])')
|
||||
lines.append(f' pos = func(pos, val[{idx}], typestore)')
|
||||
is_stat = False
|
||||
aligned = align_after(subdesc)
|
||||
else:
|
||||
@ -138,13 +140,15 @@ def generate_getsize_cdr(fields: list[Field]) -> tuple[CDRSerSize, int]:
|
||||
lines.append(f' pos += {subdesc.args.size_cdr}')
|
||||
|
||||
else:
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}").getsize_cdr')
|
||||
lines.append(
|
||||
f' func = get_msgdef("{subdesc.args.name}", typestore).getsize_cdr',
|
||||
)
|
||||
if aligned < anext_before <= anext_after:
|
||||
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(' for item in val:')
|
||||
if anext_before > anext_after:
|
||||
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(' pos = func(pos, item)')
|
||||
lines.append(' pos = func(pos, item, typestore)')
|
||||
aligned = align_after(subdesc)
|
||||
|
||||
aligned = min([aligned, 4])
|
||||
@ -190,15 +194,16 @@ def generate_serialize_cdr(fields: list[Field], endianess: str) -> CDRSer:
|
||||
f'from rosbags.serde.primitives import pack_uint64_{endianess}',
|
||||
f'from rosbags.serde.primitives import pack_float32_{endianess}',
|
||||
f'from rosbags.serde.primitives import pack_float64_{endianess}',
|
||||
'def serialize_cdr(rawdata, pos, message):',
|
||||
'def serialize_cdr(rawdata, pos, message, typestore):',
|
||||
]
|
||||
for fcurr, fnext in zip(icurr, inext):
|
||||
fieldname, desc = fcurr
|
||||
|
||||
lines.append(f' val = message.{fieldname}')
|
||||
if desc.valtype == Valtype.MESSAGE:
|
||||
lines.append(f' func = get_msgdef("{desc.args.name}").serialize_cdr_{endianess}')
|
||||
lines.append(' pos = func(rawdata, pos, val)')
|
||||
name = desc.args.name
|
||||
lines.append(f' func = get_msgdef("{name}", typestore).serialize_cdr_{endianess}')
|
||||
lines.append(' pos = func(rawdata, pos, val, typestore)')
|
||||
aligned = align_after(desc)
|
||||
|
||||
elif desc.valtype == Valtype.BASE:
|
||||
@ -242,13 +247,12 @@ def generate_serialize_cdr(fields: list[Field], endianess: str) -> CDRSer:
|
||||
assert subdesc.valtype == Valtype.MESSAGE
|
||||
anext_before = align(subdesc)
|
||||
anext_after = align_after(subdesc)
|
||||
lines.append(
|
||||
f' func = get_msgdef("{subdesc.args.name}").serialize_cdr_{endianess}',
|
||||
)
|
||||
name = subdesc.args.name
|
||||
lines.append(f' func = get_msgdef("{name}", typestore).serialize_cdr_{endianess}')
|
||||
for idx in range(length):
|
||||
if anext_before > anext_after:
|
||||
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(f' pos = func(rawdata, pos, val[{idx}])')
|
||||
lines.append(f' pos = func(rawdata, pos, val[{idx}], typestore)')
|
||||
aligned = align_after(subdesc)
|
||||
else:
|
||||
assert desc.valtype == Valtype.SEQUENCE
|
||||
@ -281,12 +285,11 @@ def generate_serialize_cdr(fields: list[Field], endianess: str) -> CDRSer:
|
||||
|
||||
if subdesc.valtype == Valtype.MESSAGE:
|
||||
anext_before = align(subdesc)
|
||||
lines.append(
|
||||
f' func = get_msgdef("{subdesc.args.name}").serialize_cdr_{endianess}',
|
||||
)
|
||||
name = subdesc.args.name
|
||||
lines.append(f' func = get_msgdef("{name}", typestore).serialize_cdr_{endianess}')
|
||||
lines.append(' for item in val:')
|
||||
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(' pos = func(rawdata, pos, item)')
|
||||
lines.append(' pos = func(rawdata, pos, item, typestore)')
|
||||
aligned = align_after(subdesc)
|
||||
|
||||
aligned = min([4, aligned])
|
||||
@ -330,7 +333,7 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser:
|
||||
f'from rosbags.serde.primitives import unpack_uint64_{endianess}',
|
||||
f'from rosbags.serde.primitives import unpack_float32_{endianess}',
|
||||
f'from rosbags.serde.primitives import unpack_float64_{endianess}',
|
||||
'def deserialize_cdr(rawdata, pos, cls):',
|
||||
'def deserialize_cdr(rawdata, pos, cls, typestore):',
|
||||
]
|
||||
|
||||
funcname = f'deserialize_cdr_{endianess}'
|
||||
@ -339,8 +342,8 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser:
|
||||
desc = fcurr[1]
|
||||
|
||||
if desc.valtype == Valtype.MESSAGE:
|
||||
lines.append(f' msgdef = get_msgdef("{desc.args.name}")')
|
||||
lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls)')
|
||||
lines.append(f' msgdef = get_msgdef("{desc.args.name}", typestore)')
|
||||
lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)')
|
||||
lines.append(' values.append(obj)')
|
||||
aligned = align_after(desc)
|
||||
|
||||
@ -386,12 +389,14 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser:
|
||||
assert subdesc.valtype == Valtype.MESSAGE
|
||||
anext_before = align(subdesc)
|
||||
anext_after = align_after(subdesc)
|
||||
lines.append(f' msgdef = get_msgdef("{subdesc.args.name}")')
|
||||
lines.append(f' msgdef = get_msgdef("{subdesc.args.name}", typestore)')
|
||||
lines.append(' value = []')
|
||||
for _ in range(length):
|
||||
if anext_before > anext_after:
|
||||
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls)')
|
||||
lines.append(
|
||||
f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)',
|
||||
)
|
||||
lines.append(' value.append(obj)')
|
||||
lines.append(' values.append(value)')
|
||||
aligned = align_after(subdesc)
|
||||
@ -433,11 +438,13 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser:
|
||||
|
||||
if subdesc.valtype == Valtype.MESSAGE:
|
||||
anext_before = align(subdesc)
|
||||
lines.append(f' msgdef = get_msgdef("{subdesc.args.name}")')
|
||||
lines.append(f' msgdef = get_msgdef("{subdesc.args.name}", typestore)')
|
||||
lines.append(' value = []')
|
||||
lines.append(' for _ in range(size):')
|
||||
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls)')
|
||||
lines.append(
|
||||
f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)',
|
||||
)
|
||||
lines.append(' value.append(obj)')
|
||||
lines.append(' values.append(value)')
|
||||
aligned = align_after(subdesc)
|
||||
|
||||
@ -6,8 +6,6 @@ from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from rosbags.typesys import types
|
||||
|
||||
from .cdr import generate_deserialize_cdr, generate_getsize_cdr, generate_serialize_cdr
|
||||
from .ros1 import generate_cdr_to_ros1, generate_ros1_to_cdr
|
||||
from .typing import Descriptor, Field, Msgdef
|
||||
@ -15,28 +13,34 @@ from .utils import Valtype
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from rosbags.typesys.base import Fielddesc
|
||||
from rosbags.typesys.register import Typestore
|
||||
|
||||
MSGDEFCACHE: dict[str, Msgdef] = {}
|
||||
MSGDEFCACHE: dict[Typestore, dict[str, Msgdef]] = {}
|
||||
|
||||
|
||||
class SerdeError(Exception):
|
||||
"""Serialization and Deserialization Error."""
|
||||
|
||||
|
||||
def get_msgdef(typename: str) -> Msgdef:
|
||||
def get_msgdef(typename: str, typestore: Typestore) -> Msgdef:
|
||||
"""Retrieve message definition for typename.
|
||||
|
||||
Message definitions are cached globally and generated as needed.
|
||||
|
||||
Args:
|
||||
typename: Msgdef type name to load.
|
||||
typestore: Type store.
|
||||
|
||||
Returns:
|
||||
Message definition.
|
||||
|
||||
"""
|
||||
if typename not in MSGDEFCACHE:
|
||||
entries = types.FIELDDEFS[typename][1]
|
||||
if typestore not in MSGDEFCACHE:
|
||||
MSGDEFCACHE[typestore] = {}
|
||||
cache = MSGDEFCACHE[typestore]
|
||||
|
||||
if typename not in cache:
|
||||
entries = typestore.FIELDDEFS[typename][1]
|
||||
|
||||
def fixup(entry: Fielddesc) -> Descriptor:
|
||||
if entry[0] == Valtype.BASE:
|
||||
@ -44,7 +48,7 @@ def get_msgdef(typename: str) -> Msgdef:
|
||||
return Descriptor(Valtype.BASE, entry[1])
|
||||
if entry[0] == Valtype.MESSAGE:
|
||||
assert isinstance(entry[1], str)
|
||||
return Descriptor(Valtype.MESSAGE, get_msgdef(entry[1]))
|
||||
return Descriptor(Valtype.MESSAGE, get_msgdef(entry[1], typestore))
|
||||
if entry[0] == Valtype.ARRAY:
|
||||
assert not isinstance(entry[1][0], str)
|
||||
return Descriptor(Valtype.ARRAY, (fixup(entry[1][0]), entry[1][1]))
|
||||
@ -59,10 +63,10 @@ def get_msgdef(typename: str) -> Msgdef:
|
||||
|
||||
getsize_cdr, size_cdr = generate_getsize_cdr(fields)
|
||||
|
||||
MSGDEFCACHE[typename] = Msgdef(
|
||||
cache[typename] = Msgdef(
|
||||
typename,
|
||||
fields,
|
||||
getattr(types, typename.replace('/', '__')),
|
||||
getattr(typestore, typename.replace('/', '__')),
|
||||
size_cdr,
|
||||
getsize_cdr,
|
||||
generate_serialize_cdr(fields, 'le'),
|
||||
@ -74,4 +78,4 @@ def get_msgdef(typename: str) -> Msgdef:
|
||||
generate_cdr_to_ros1(fields, typename, False), # type: ignore
|
||||
generate_cdr_to_ros1(fields, typename, True), # type: ignore
|
||||
)
|
||||
return MSGDEFCACHE[typename]
|
||||
return cache[typename]
|
||||
|
||||
@ -52,7 +52,7 @@ def generate_ros1_to_cdr(
|
||||
'from rosbags.serde.messages import SerdeError, get_msgdef',
|
||||
'from rosbags.serde.primitives import pack_int32_le',
|
||||
'from rosbags.serde.primitives import unpack_int32_le',
|
||||
f'def {funcname}(input, ipos, output, opos):',
|
||||
f'def {funcname}(input, ipos, output, opos, typestore):',
|
||||
]
|
||||
|
||||
if typename == 'std_msgs/msg/Header':
|
||||
@ -62,8 +62,8 @@ def generate_ros1_to_cdr(
|
||||
_, desc = fcurr
|
||||
|
||||
if desc.valtype == Valtype.MESSAGE:
|
||||
lines.append(f' func = get_msgdef("{desc.args.name}").{funcname}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos)')
|
||||
lines.append(f' func = get_msgdef("{desc.args.name}", typestore).{funcname}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)')
|
||||
aligned = align_after(desc)
|
||||
|
||||
elif desc.valtype == Valtype.BASE:
|
||||
@ -117,11 +117,11 @@ def generate_ros1_to_cdr(
|
||||
anext_before = align(subdesc)
|
||||
anext_after = align_after(subdesc)
|
||||
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}')
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}", typestore).{funcname}')
|
||||
for _ in range(length):
|
||||
if anext_before > anext_after:
|
||||
lines.append(f' opos = (opos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos)')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)')
|
||||
aligned = anext_after
|
||||
else:
|
||||
assert desc.valtype == Valtype.SEQUENCE
|
||||
@ -163,10 +163,10 @@ def generate_ros1_to_cdr(
|
||||
else:
|
||||
assert subdesc.valtype == Valtype.MESSAGE
|
||||
anext_before = align(subdesc)
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}')
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}", typestore).{funcname}')
|
||||
lines.append(' for _ in range(size):')
|
||||
lines.append(f' opos = (opos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos)')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)')
|
||||
aligned = align_after(subdesc)
|
||||
|
||||
aligned = min([aligned, 4])
|
||||
@ -208,7 +208,7 @@ def generate_cdr_to_ros1(
|
||||
'from rosbags.serde.messages import SerdeError, get_msgdef',
|
||||
'from rosbags.serde.primitives import pack_int32_le',
|
||||
'from rosbags.serde.primitives import unpack_int32_le',
|
||||
f'def {funcname}(input, ipos, output, opos):',
|
||||
f'def {funcname}(input, ipos, output, opos, typestore):',
|
||||
]
|
||||
|
||||
if typename == 'std_msgs/msg/Header':
|
||||
@ -218,8 +218,8 @@ def generate_cdr_to_ros1(
|
||||
_, desc = fcurr
|
||||
|
||||
if desc.valtype == Valtype.MESSAGE:
|
||||
lines.append(f' func = get_msgdef("{desc.args.name}").{funcname}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos)')
|
||||
lines.append(f' func = get_msgdef("{desc.args.name}", typestore).{funcname}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)')
|
||||
aligned = align_after(desc)
|
||||
|
||||
elif desc.valtype == Valtype.BASE:
|
||||
@ -273,11 +273,11 @@ def generate_cdr_to_ros1(
|
||||
anext_before = align(subdesc)
|
||||
anext_after = align_after(subdesc)
|
||||
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}')
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}", typestore).{funcname}')
|
||||
for _ in range(length):
|
||||
if anext_before > anext_after:
|
||||
lines.append(f' ipos = (ipos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos)')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)')
|
||||
aligned = anext_after
|
||||
else:
|
||||
assert desc.valtype == Valtype.SEQUENCE
|
||||
@ -317,10 +317,10 @@ def generate_cdr_to_ros1(
|
||||
else:
|
||||
assert subdesc.valtype == Valtype.MESSAGE
|
||||
anext_before = align(subdesc)
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}')
|
||||
lines.append(f' func = get_msgdef("{subdesc.args.name}", typestore).{funcname}')
|
||||
lines.append(' for _ in range(size):')
|
||||
lines.append(f' ipos = (ipos + {anext_before} - 1) & -{anext_before}')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos)')
|
||||
lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)')
|
||||
aligned = align_after(subdesc)
|
||||
|
||||
aligned = min([aligned, 4])
|
||||
|
||||
@ -8,18 +8,27 @@ import sys
|
||||
from struct import pack_into
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from rosbags.typesys import types
|
||||
|
||||
from .messages import get_msgdef
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any
|
||||
|
||||
from rosbags.typesys.register import Typestore
|
||||
|
||||
def deserialize_cdr(rawdata: bytes, typename: str) -> Any: # noqa: ANN401
|
||||
|
||||
def deserialize_cdr(
|
||||
rawdata: bytes,
|
||||
typename: str,
|
||||
typestore: Typestore = types,
|
||||
) -> Any: # noqa: ANN401
|
||||
"""Deserialize raw data into a message object.
|
||||
|
||||
Args:
|
||||
rawdata: Serialized data.
|
||||
typename: Message type name.
|
||||
typestore: Type store.
|
||||
|
||||
Returns:
|
||||
Deserialized message object.
|
||||
@ -27,9 +36,9 @@ def deserialize_cdr(rawdata: bytes, typename: str) -> Any: # noqa: ANN401
|
||||
"""
|
||||
little_endian = bool(rawdata[1])
|
||||
|
||||
msgdef = get_msgdef(typename)
|
||||
msgdef = get_msgdef(typename, typestore)
|
||||
func = msgdef.deserialize_cdr_le if little_endian else msgdef.deserialize_cdr_be
|
||||
message, pos = func(rawdata[4:], 0, msgdef.cls)
|
||||
message, pos = func(rawdata[4:], 0, msgdef.cls, typestore)
|
||||
assert pos + 4 + 3 >= len(rawdata)
|
||||
return message
|
||||
|
||||
@ -38,6 +47,7 @@ def serialize_cdr(
|
||||
message: object,
|
||||
typename: str,
|
||||
little_endian: bool = sys.byteorder == 'little',
|
||||
typestore: Typestore = types,
|
||||
) -> memoryview:
|
||||
"""Serialize message object to bytes.
|
||||
|
||||
@ -45,24 +55,25 @@ def serialize_cdr(
|
||||
message: Message object.
|
||||
typename: Message type name.
|
||||
little_endian: Should use little endianess.
|
||||
typestore: Type store.
|
||||
|
||||
Returns:
|
||||
Serialized bytes.
|
||||
|
||||
"""
|
||||
msgdef = get_msgdef(typename)
|
||||
size = 4 + msgdef.getsize_cdr(0, message)
|
||||
msgdef = get_msgdef(typename, typestore)
|
||||
size = 4 + msgdef.getsize_cdr(0, message, typestore)
|
||||
rawdata = memoryview(bytearray(size))
|
||||
pack_into('BB', rawdata, 0, 0, little_endian)
|
||||
|
||||
func = msgdef.serialize_cdr_le if little_endian else msgdef.serialize_cdr_be
|
||||
|
||||
pos = func(rawdata[4:], 0, message)
|
||||
pos = func(rawdata[4:], 0, message, typestore)
|
||||
assert pos + 4 == size
|
||||
return rawdata.toreadonly()
|
||||
|
||||
|
||||
def ros1_to_cdr(raw: bytes, typename: str) -> memoryview:
|
||||
def ros1_to_cdr(raw: bytes, typename: str, typestore: Typestore = types) -> memoryview:
|
||||
"""Convert serialized ROS1 message directly to CDR.
|
||||
|
||||
This should be reasonably fast as conversions happen on a byte-level
|
||||
@ -71,18 +82,20 @@ def ros1_to_cdr(raw: bytes, typename: str) -> memoryview:
|
||||
Args:
|
||||
raw: ROS1 serialized message.
|
||||
typename: Message type name.
|
||||
typestore: Type store.
|
||||
|
||||
Returns:
|
||||
CDR serialized message.
|
||||
|
||||
"""
|
||||
msgdef = get_msgdef(typename)
|
||||
msgdef = get_msgdef(typename, typestore)
|
||||
|
||||
ipos, opos = msgdef.getsize_ros1_to_cdr(
|
||||
raw,
|
||||
0,
|
||||
None,
|
||||
0,
|
||||
typestore,
|
||||
)
|
||||
assert ipos == len(raw)
|
||||
|
||||
@ -96,13 +109,14 @@ def ros1_to_cdr(raw: bytes, typename: str) -> memoryview:
|
||||
0,
|
||||
rawdata[4:],
|
||||
0,
|
||||
typestore,
|
||||
)
|
||||
assert ipos == len(raw)
|
||||
assert opos + 4 == size
|
||||
return rawdata.toreadonly()
|
||||
|
||||
|
||||
def cdr_to_ros1(raw: bytes, typename: str) -> memoryview:
|
||||
def cdr_to_ros1(raw: bytes, typename: str, typestore: Typestore = types) -> memoryview:
|
||||
"""Convert serialized CDR message directly to ROS1.
|
||||
|
||||
This should be reasonably fast as conversions happen on a byte-level
|
||||
@ -111,6 +125,7 @@ def cdr_to_ros1(raw: bytes, typename: str) -> memoryview:
|
||||
Args:
|
||||
raw: CDR serialized message.
|
||||
typename: Message type name.
|
||||
typestore: Type store.
|
||||
|
||||
Returns:
|
||||
ROS1 serialized message.
|
||||
@ -118,13 +133,14 @@ def cdr_to_ros1(raw: bytes, typename: str) -> memoryview:
|
||||
"""
|
||||
assert raw[1] == 1, 'Message byte order is not little endian'
|
||||
|
||||
msgdef = get_msgdef(typename)
|
||||
msgdef = get_msgdef(typename, typestore)
|
||||
|
||||
ipos, opos = msgdef.getsize_cdr_to_ros1(
|
||||
raw[4:],
|
||||
0,
|
||||
None,
|
||||
0,
|
||||
typestore,
|
||||
)
|
||||
assert ipos + 4 + 3 >= len(raw)
|
||||
|
||||
@ -137,6 +153,7 @@ def cdr_to_ros1(raw: bytes, typename: str) -> memoryview:
|
||||
0,
|
||||
rawdata,
|
||||
0,
|
||||
typestore,
|
||||
)
|
||||
assert ipos + 4 + 3 >= len(raw)
|
||||
assert opos == size
|
||||
|
||||
@ -9,12 +9,14 @@ from typing import TYPE_CHECKING, NamedTuple
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Callable, Tuple
|
||||
|
||||
Bitcvt = Callable[[bytes, int, bytes, int], Tuple[int, int]]
|
||||
BitcvtSize = Callable[[bytes, int, None, int], Tuple[int, int]]
|
||||
from rosbags.typesys.register import Typestore
|
||||
|
||||
CDRDeser = Callable[[bytes, int, type], Tuple[Any, int]]
|
||||
CDRSer = Callable[[bytes, int, object], int]
|
||||
CDRSerSize = Callable[[int, object], int]
|
||||
Bitcvt = Callable[[bytes, int, bytes, int, Typestore], Tuple[int, int]]
|
||||
BitcvtSize = Callable[[bytes, int, None, int, Typestore], Tuple[int, int]]
|
||||
|
||||
CDRDeser = Callable[[bytes, int, type, Typestore], Tuple[Any, int]]
|
||||
CDRSer = Callable[[bytes, int, object, Typestore], int]
|
||||
CDRSerSize = Callable[[int, object, Typestore], int]
|
||||
|
||||
|
||||
class Descriptor(NamedTuple):
|
||||
|
||||
@ -13,10 +13,16 @@ from . import types
|
||||
from .base import Nodetype, TypesysError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Optional, Union
|
||||
from typing import Any, Optional, Protocol, Union
|
||||
|
||||
from .base import Typesdict
|
||||
|
||||
class Typestore(Protocol): # pylint: disable=too-few-public-methods
|
||||
"""Type storage."""
|
||||
|
||||
FIELDDEFS: Typesdict
|
||||
|
||||
|
||||
INTLIKE = re.compile('^u?(bool|int|float)')
|
||||
|
||||
|
||||
@ -134,11 +140,12 @@ def generate_python_code(typs: Typesdict) -> str:
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
def register_types(typs: Typesdict) -> None:
|
||||
def register_types(typs: Typesdict, typestore: Typestore = types) -> None:
|
||||
"""Register types in type system.
|
||||
|
||||
Args:
|
||||
typs: Dictionary mapping message typenames to parsetrees.
|
||||
typestore: Type store.
|
||||
|
||||
Raises:
|
||||
TypesysError: Type already present with different definition.
|
||||
@ -156,14 +163,14 @@ def register_types(typs: Typesdict) -> None:
|
||||
for name, (_, fields) in fielddefs.items():
|
||||
if name == 'std_msgs/msg/Header':
|
||||
continue
|
||||
if have := types.FIELDDEFS.get(name):
|
||||
if have := typestore.FIELDDEFS.get(name):
|
||||
_, have_fields = have
|
||||
have_fields = [(x[0].lower(), x[1]) for x in have_fields]
|
||||
fields = [(x[0].lower(), x[1]) for x in fields]
|
||||
if have_fields != fields:
|
||||
raise TypesysError(f'Type {name!r} is already present with different definition.')
|
||||
|
||||
for name in fielddefs.keys() - types.FIELDDEFS.keys():
|
||||
for name in fielddefs.keys() - typestore.FIELDDEFS.keys():
|
||||
pyname = name.replace('/', '__')
|
||||
setattr(types, pyname, getattr(module, pyname))
|
||||
types.FIELDDEFS[name] = fielddefs[name]
|
||||
setattr(typestore, pyname, getattr(module, pyname))
|
||||
typestore.FIELDDEFS[name] = fielddefs[name]
|
||||
|
||||
@ -14,6 +14,7 @@ from numpy.typing import NDArray
|
||||
from rosbags.serde.messages import SerdeError, get_msgdef
|
||||
from rosbags.serde.typing import Msgdef
|
||||
from rosbags.serde.utils import SIZEMAP, Valtype
|
||||
from rosbags.typesys import types
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Tuple
|
||||
@ -187,7 +188,7 @@ def deserialize(rawdata: bytes, typename: str) -> Msgdef:
|
||||
"""
|
||||
_, little_endian = unpack_from('BB', rawdata, 0)
|
||||
|
||||
msgdef = get_msgdef(typename)
|
||||
msgdef = get_msgdef(typename, types)
|
||||
obj, _ = deserialize_message(
|
||||
rawdata[4:],
|
||||
BASETYPEMAP_LE if little_endian else BASETYPEMAP_BE,
|
||||
@ -428,7 +429,7 @@ def serialize(
|
||||
Serialized bytes.
|
||||
|
||||
"""
|
||||
msgdef = get_msgdef(typename)
|
||||
msgdef = get_msgdef(typename, types)
|
||||
size = 4 + get_size(message, msgdef)
|
||||
rawdata = memoryview(bytearray(size))
|
||||
|
||||
|
||||
@ -12,7 +12,7 @@ import pytest
|
||||
|
||||
from rosbags.serde import SerdeError, cdr_to_ros1, deserialize_cdr, ros1_to_cdr, serialize_cdr
|
||||
from rosbags.serde.messages import get_msgdef
|
||||
from rosbags.typesys import get_types_from_msg, register_types
|
||||
from rosbags.typesys import get_types_from_msg, register_types, types
|
||||
from rosbags.typesys.types import builtin_interfaces__msg__Time as Time
|
||||
from rosbags.typesys.types import geometry_msgs__msg__Polygon as Polygon
|
||||
from rosbags.typesys.types import sensor_msgs__msg__MagneticField as MagneticField
|
||||
@ -316,14 +316,14 @@ def test_custom_type() -> None:
|
||||
register_types(dict(get_types_from_msg(DYNAMIC_S_64, 'test_msgs/msg/dynamic_s_64')))
|
||||
register_types(dict(get_types_from_msg(CUSTOM, cname)))
|
||||
|
||||
static_64_64 = get_msgdef('test_msgs/msg/static_64_64').cls
|
||||
static_64_16 = get_msgdef('test_msgs/msg/static_64_16').cls
|
||||
static_16_64 = get_msgdef('test_msgs/msg/static_16_64').cls
|
||||
dynamic_64_64 = get_msgdef('test_msgs/msg/dynamic_64_64').cls
|
||||
dynamic_64_b_64 = get_msgdef('test_msgs/msg/dynamic_64_b_64').cls
|
||||
dynamic_64_s = get_msgdef('test_msgs/msg/dynamic_64_s').cls
|
||||
dynamic_s_64 = get_msgdef('test_msgs/msg/dynamic_s_64').cls
|
||||
custom = get_msgdef('test_msgs/msg/custom').cls
|
||||
static_64_64 = get_msgdef('test_msgs/msg/static_64_64', types).cls
|
||||
static_64_16 = get_msgdef('test_msgs/msg/static_64_16', types).cls
|
||||
static_16_64 = get_msgdef('test_msgs/msg/static_16_64', types).cls
|
||||
dynamic_64_64 = get_msgdef('test_msgs/msg/dynamic_64_64', types).cls
|
||||
dynamic_64_b_64 = get_msgdef('test_msgs/msg/dynamic_64_b_64', types).cls
|
||||
dynamic_64_s = get_msgdef('test_msgs/msg/dynamic_64_s', types).cls
|
||||
dynamic_s_64 = get_msgdef('test_msgs/msg/dynamic_s_64', types).cls
|
||||
custom = get_msgdef('test_msgs/msg/custom', types).cls
|
||||
|
||||
msg = custom(
|
||||
'str',
|
||||
@ -439,7 +439,7 @@ def test_padding_empty_sequence() -> None:
|
||||
"""Test empty sequences do not add item padding."""
|
||||
register_types(dict(get_types_from_msg(SU64_B, 'test_msgs/msg/su64_b')))
|
||||
|
||||
su64_b = get_msgdef('test_msgs/msg/su64_b').cls
|
||||
su64_b = get_msgdef('test_msgs/msg/su64_b', types).cls
|
||||
msg = su64_b(numpy.array([], dtype=numpy.uint64), True)
|
||||
|
||||
cdr = serialize_cdr(msg, msg.__msgtype__)
|
||||
@ -458,7 +458,7 @@ def test_align_after_empty_sequence() -> None:
|
||||
"""Test alignment after empty sequences."""
|
||||
register_types(dict(get_types_from_msg(SU64_U64, 'test_msgs/msg/su64_u64')))
|
||||
|
||||
su64_b = get_msgdef('test_msgs/msg/su64_u64').cls
|
||||
su64_b = get_msgdef('test_msgs/msg/su64_u64', types).cls
|
||||
msg = su64_b(numpy.array([], dtype=numpy.uint64), 42)
|
||||
|
||||
cdr = serialize_cdr(msg, msg.__msgtype__)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user