diff --git a/src/rosbags/serde/cdr.py b/src/rosbags/serde/cdr.py index 8dc87c96..894cc5fc 100644 --- a/src/rosbags/serde/cdr.py +++ b/src/rosbags/serde/cdr.py @@ -44,7 +44,7 @@ def generate_getsize_cdr(fields: list[Field]) -> tuple[CDRSerSize, int]: lines = [ 'import sys', 'from rosbags.serde.messages import get_msgdef', - 'def getsize_cdr(pos, message):', + 'def getsize_cdr(pos, message, typestore):', ] for fcurr, fnext in zip(icurr, inext): fieldname, desc = fcurr @@ -54,8 +54,8 @@ def generate_getsize_cdr(fields: list[Field]) -> tuple[CDRSerSize, int]: lines.append(f' pos += {desc.args.size_cdr}') size += desc.args.size_cdr else: - lines.append(f' func = get_msgdef("{desc.args.name}").getsize_cdr') - lines.append(f' pos = func(pos, message.{fieldname})') + lines.append(f' func = get_msgdef("{desc.args.name}", typestore).getsize_cdr') + lines.append(f' pos = func(pos, message.{fieldname}, typestore)') is_stat = False aligned = align_after(desc) @@ -97,12 +97,14 @@ def generate_getsize_cdr(fields: list[Field]) -> tuple[CDRSerSize, int]: lines.append(f' pos += {subdesc.args.size_cdr}') size += subdesc.args.size_cdr else: - lines.append(f' func = get_msgdef("{subdesc.args.name}").getsize_cdr') + lines.append( + f' func = get_msgdef("{subdesc.args.name}", typestore).getsize_cdr', + ) lines.append(f' val = message.{fieldname}') for idx in range(length): if anext_before > anext_after: lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}') - lines.append(f' pos = func(pos, val[{idx}])') + lines.append(f' pos = func(pos, val[{idx}], typestore)') is_stat = False aligned = align_after(subdesc) else: @@ -138,13 +140,15 @@ def generate_getsize_cdr(fields: list[Field]) -> tuple[CDRSerSize, int]: lines.append(f' pos += {subdesc.args.size_cdr}') else: - lines.append(f' func = get_msgdef("{subdesc.args.name}").getsize_cdr') + lines.append( + f' func = get_msgdef("{subdesc.args.name}", typestore).getsize_cdr', + ) if aligned < anext_before <= anext_after: lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}') lines.append(' for item in val:') if anext_before > anext_after: lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}') - lines.append(' pos = func(pos, item)') + lines.append(' pos = func(pos, item, typestore)') aligned = align_after(subdesc) aligned = min([aligned, 4]) @@ -190,15 +194,16 @@ def generate_serialize_cdr(fields: list[Field], endianess: str) -> CDRSer: f'from rosbags.serde.primitives import pack_uint64_{endianess}', f'from rosbags.serde.primitives import pack_float32_{endianess}', f'from rosbags.serde.primitives import pack_float64_{endianess}', - 'def serialize_cdr(rawdata, pos, message):', + 'def serialize_cdr(rawdata, pos, message, typestore):', ] for fcurr, fnext in zip(icurr, inext): fieldname, desc = fcurr lines.append(f' val = message.{fieldname}') if desc.valtype == Valtype.MESSAGE: - lines.append(f' func = get_msgdef("{desc.args.name}").serialize_cdr_{endianess}') - lines.append(' pos = func(rawdata, pos, val)') + name = desc.args.name + lines.append(f' func = get_msgdef("{name}", typestore).serialize_cdr_{endianess}') + lines.append(' pos = func(rawdata, pos, val, typestore)') aligned = align_after(desc) elif desc.valtype == Valtype.BASE: @@ -242,13 +247,12 @@ def generate_serialize_cdr(fields: list[Field], endianess: str) -> CDRSer: assert subdesc.valtype == Valtype.MESSAGE anext_before = align(subdesc) anext_after = align_after(subdesc) - lines.append( - f' func = get_msgdef("{subdesc.args.name}").serialize_cdr_{endianess}', - ) + name = subdesc.args.name + lines.append(f' func = get_msgdef("{name}", typestore).serialize_cdr_{endianess}') for idx in range(length): if anext_before > anext_after: lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}') - lines.append(f' pos = func(rawdata, pos, val[{idx}])') + lines.append(f' pos = func(rawdata, pos, val[{idx}], typestore)') aligned = align_after(subdesc) else: assert desc.valtype == Valtype.SEQUENCE @@ -281,12 +285,11 @@ def generate_serialize_cdr(fields: list[Field], endianess: str) -> CDRSer: if subdesc.valtype == Valtype.MESSAGE: anext_before = align(subdesc) - lines.append( - f' func = get_msgdef("{subdesc.args.name}").serialize_cdr_{endianess}', - ) + name = subdesc.args.name + lines.append(f' func = get_msgdef("{name}", typestore).serialize_cdr_{endianess}') lines.append(' for item in val:') lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}') - lines.append(' pos = func(rawdata, pos, item)') + lines.append(' pos = func(rawdata, pos, item, typestore)') aligned = align_after(subdesc) aligned = min([4, aligned]) @@ -330,7 +333,7 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser: f'from rosbags.serde.primitives import unpack_uint64_{endianess}', f'from rosbags.serde.primitives import unpack_float32_{endianess}', f'from rosbags.serde.primitives import unpack_float64_{endianess}', - 'def deserialize_cdr(rawdata, pos, cls):', + 'def deserialize_cdr(rawdata, pos, cls, typestore):', ] funcname = f'deserialize_cdr_{endianess}' @@ -339,8 +342,8 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser: desc = fcurr[1] if desc.valtype == Valtype.MESSAGE: - lines.append(f' msgdef = get_msgdef("{desc.args.name}")') - lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls)') + lines.append(f' msgdef = get_msgdef("{desc.args.name}", typestore)') + lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)') lines.append(' values.append(obj)') aligned = align_after(desc) @@ -386,12 +389,14 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser: assert subdesc.valtype == Valtype.MESSAGE anext_before = align(subdesc) anext_after = align_after(subdesc) - lines.append(f' msgdef = get_msgdef("{subdesc.args.name}")') + lines.append(f' msgdef = get_msgdef("{subdesc.args.name}", typestore)') lines.append(' value = []') for _ in range(length): if anext_before > anext_after: lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}') - lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls)') + lines.append( + f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)', + ) lines.append(' value.append(obj)') lines.append(' values.append(value)') aligned = align_after(subdesc) @@ -433,11 +438,13 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser: if subdesc.valtype == Valtype.MESSAGE: anext_before = align(subdesc) - lines.append(f' msgdef = get_msgdef("{subdesc.args.name}")') + lines.append(f' msgdef = get_msgdef("{subdesc.args.name}", typestore)') lines.append(' value = []') lines.append(' for _ in range(size):') lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}') - lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls)') + lines.append( + f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls, typestore)', + ) lines.append(' value.append(obj)') lines.append(' values.append(value)') aligned = align_after(subdesc) diff --git a/src/rosbags/serde/messages.py b/src/rosbags/serde/messages.py index bad20c4e..c4f52097 100644 --- a/src/rosbags/serde/messages.py +++ b/src/rosbags/serde/messages.py @@ -6,8 +6,6 @@ from __future__ import annotations from typing import TYPE_CHECKING -from rosbags.typesys import types - from .cdr import generate_deserialize_cdr, generate_getsize_cdr, generate_serialize_cdr from .ros1 import generate_cdr_to_ros1, generate_ros1_to_cdr from .typing import Descriptor, Field, Msgdef @@ -15,28 +13,34 @@ from .utils import Valtype if TYPE_CHECKING: from rosbags.typesys.base import Fielddesc + from rosbags.typesys.register import Typestore -MSGDEFCACHE: dict[str, Msgdef] = {} +MSGDEFCACHE: dict[Typestore, dict[str, Msgdef]] = {} class SerdeError(Exception): """Serialization and Deserialization Error.""" -def get_msgdef(typename: str) -> Msgdef: +def get_msgdef(typename: str, typestore: Typestore) -> Msgdef: """Retrieve message definition for typename. Message definitions are cached globally and generated as needed. Args: typename: Msgdef type name to load. + typestore: Type store. Returns: Message definition. """ - if typename not in MSGDEFCACHE: - entries = types.FIELDDEFS[typename][1] + if typestore not in MSGDEFCACHE: + MSGDEFCACHE[typestore] = {} + cache = MSGDEFCACHE[typestore] + + if typename not in cache: + entries = typestore.FIELDDEFS[typename][1] def fixup(entry: Fielddesc) -> Descriptor: if entry[0] == Valtype.BASE: @@ -44,7 +48,7 @@ def get_msgdef(typename: str) -> Msgdef: return Descriptor(Valtype.BASE, entry[1]) if entry[0] == Valtype.MESSAGE: assert isinstance(entry[1], str) - return Descriptor(Valtype.MESSAGE, get_msgdef(entry[1])) + return Descriptor(Valtype.MESSAGE, get_msgdef(entry[1], typestore)) if entry[0] == Valtype.ARRAY: assert not isinstance(entry[1][0], str) return Descriptor(Valtype.ARRAY, (fixup(entry[1][0]), entry[1][1])) @@ -59,10 +63,10 @@ def get_msgdef(typename: str) -> Msgdef: getsize_cdr, size_cdr = generate_getsize_cdr(fields) - MSGDEFCACHE[typename] = Msgdef( + cache[typename] = Msgdef( typename, fields, - getattr(types, typename.replace('/', '__')), + getattr(typestore, typename.replace('/', '__')), size_cdr, getsize_cdr, generate_serialize_cdr(fields, 'le'), @@ -74,4 +78,4 @@ def get_msgdef(typename: str) -> Msgdef: generate_cdr_to_ros1(fields, typename, False), # type: ignore generate_cdr_to_ros1(fields, typename, True), # type: ignore ) - return MSGDEFCACHE[typename] + return cache[typename] diff --git a/src/rosbags/serde/ros1.py b/src/rosbags/serde/ros1.py index 402afb8c..38322e4c 100644 --- a/src/rosbags/serde/ros1.py +++ b/src/rosbags/serde/ros1.py @@ -52,7 +52,7 @@ def generate_ros1_to_cdr( 'from rosbags.serde.messages import SerdeError, get_msgdef', 'from rosbags.serde.primitives import pack_int32_le', 'from rosbags.serde.primitives import unpack_int32_le', - f'def {funcname}(input, ipos, output, opos):', + f'def {funcname}(input, ipos, output, opos, typestore):', ] if typename == 'std_msgs/msg/Header': @@ -62,8 +62,8 @@ def generate_ros1_to_cdr( _, desc = fcurr if desc.valtype == Valtype.MESSAGE: - lines.append(f' func = get_msgdef("{desc.args.name}").{funcname}') - lines.append(' ipos, opos = func(input, ipos, output, opos)') + lines.append(f' func = get_msgdef("{desc.args.name}", typestore).{funcname}') + lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)') aligned = align_after(desc) elif desc.valtype == Valtype.BASE: @@ -117,11 +117,11 @@ def generate_ros1_to_cdr( anext_before = align(subdesc) anext_after = align_after(subdesc) - lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}') + lines.append(f' func = get_msgdef("{subdesc.args.name}", typestore).{funcname}') for _ in range(length): if anext_before > anext_after: lines.append(f' opos = (opos + {anext_before} - 1) & -{anext_before}') - lines.append(' ipos, opos = func(input, ipos, output, opos)') + lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)') aligned = anext_after else: assert desc.valtype == Valtype.SEQUENCE @@ -163,10 +163,10 @@ def generate_ros1_to_cdr( else: assert subdesc.valtype == Valtype.MESSAGE anext_before = align(subdesc) - lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}') + lines.append(f' func = get_msgdef("{subdesc.args.name}", typestore).{funcname}') lines.append(' for _ in range(size):') lines.append(f' opos = (opos + {anext_before} - 1) & -{anext_before}') - lines.append(' ipos, opos = func(input, ipos, output, opos)') + lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)') aligned = align_after(subdesc) aligned = min([aligned, 4]) @@ -208,7 +208,7 @@ def generate_cdr_to_ros1( 'from rosbags.serde.messages import SerdeError, get_msgdef', 'from rosbags.serde.primitives import pack_int32_le', 'from rosbags.serde.primitives import unpack_int32_le', - f'def {funcname}(input, ipos, output, opos):', + f'def {funcname}(input, ipos, output, opos, typestore):', ] if typename == 'std_msgs/msg/Header': @@ -218,8 +218,8 @@ def generate_cdr_to_ros1( _, desc = fcurr if desc.valtype == Valtype.MESSAGE: - lines.append(f' func = get_msgdef("{desc.args.name}").{funcname}') - lines.append(' ipos, opos = func(input, ipos, output, opos)') + lines.append(f' func = get_msgdef("{desc.args.name}", typestore).{funcname}') + lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)') aligned = align_after(desc) elif desc.valtype == Valtype.BASE: @@ -273,11 +273,11 @@ def generate_cdr_to_ros1( anext_before = align(subdesc) anext_after = align_after(subdesc) - lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}') + lines.append(f' func = get_msgdef("{subdesc.args.name}", typestore).{funcname}') for _ in range(length): if anext_before > anext_after: lines.append(f' ipos = (ipos + {anext_before} - 1) & -{anext_before}') - lines.append(' ipos, opos = func(input, ipos, output, opos)') + lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)') aligned = anext_after else: assert desc.valtype == Valtype.SEQUENCE @@ -317,10 +317,10 @@ def generate_cdr_to_ros1( else: assert subdesc.valtype == Valtype.MESSAGE anext_before = align(subdesc) - lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}') + lines.append(f' func = get_msgdef("{subdesc.args.name}", typestore).{funcname}') lines.append(' for _ in range(size):') lines.append(f' ipos = (ipos + {anext_before} - 1) & -{anext_before}') - lines.append(' ipos, opos = func(input, ipos, output, opos)') + lines.append(' ipos, opos = func(input, ipos, output, opos, typestore)') aligned = align_after(subdesc) aligned = min([aligned, 4]) diff --git a/src/rosbags/serde/serdes.py b/src/rosbags/serde/serdes.py index 96d4aabe..6d56a7b0 100644 --- a/src/rosbags/serde/serdes.py +++ b/src/rosbags/serde/serdes.py @@ -8,18 +8,27 @@ import sys from struct import pack_into from typing import TYPE_CHECKING +from rosbags.typesys import types + from .messages import get_msgdef if TYPE_CHECKING: from typing import Any + from rosbags.typesys.register import Typestore -def deserialize_cdr(rawdata: bytes, typename: str) -> Any: # noqa: ANN401 + +def deserialize_cdr( + rawdata: bytes, + typename: str, + typestore: Typestore = types, +) -> Any: # noqa: ANN401 """Deserialize raw data into a message object. Args: rawdata: Serialized data. typename: Message type name. + typestore: Type store. Returns: Deserialized message object. @@ -27,9 +36,9 @@ def deserialize_cdr(rawdata: bytes, typename: str) -> Any: # noqa: ANN401 """ little_endian = bool(rawdata[1]) - msgdef = get_msgdef(typename) + msgdef = get_msgdef(typename, typestore) func = msgdef.deserialize_cdr_le if little_endian else msgdef.deserialize_cdr_be - message, pos = func(rawdata[4:], 0, msgdef.cls) + message, pos = func(rawdata[4:], 0, msgdef.cls, typestore) assert pos + 4 + 3 >= len(rawdata) return message @@ -38,6 +47,7 @@ def serialize_cdr( message: object, typename: str, little_endian: bool = sys.byteorder == 'little', + typestore: Typestore = types, ) -> memoryview: """Serialize message object to bytes. @@ -45,24 +55,25 @@ def serialize_cdr( message: Message object. typename: Message type name. little_endian: Should use little endianess. + typestore: Type store. Returns: Serialized bytes. """ - msgdef = get_msgdef(typename) - size = 4 + msgdef.getsize_cdr(0, message) + msgdef = get_msgdef(typename, typestore) + size = 4 + msgdef.getsize_cdr(0, message, typestore) rawdata = memoryview(bytearray(size)) pack_into('BB', rawdata, 0, 0, little_endian) func = msgdef.serialize_cdr_le if little_endian else msgdef.serialize_cdr_be - pos = func(rawdata[4:], 0, message) + pos = func(rawdata[4:], 0, message, typestore) assert pos + 4 == size return rawdata.toreadonly() -def ros1_to_cdr(raw: bytes, typename: str) -> memoryview: +def ros1_to_cdr(raw: bytes, typename: str, typestore: Typestore = types) -> memoryview: """Convert serialized ROS1 message directly to CDR. This should be reasonably fast as conversions happen on a byte-level @@ -71,18 +82,20 @@ def ros1_to_cdr(raw: bytes, typename: str) -> memoryview: Args: raw: ROS1 serialized message. typename: Message type name. + typestore: Type store. Returns: CDR serialized message. """ - msgdef = get_msgdef(typename) + msgdef = get_msgdef(typename, typestore) ipos, opos = msgdef.getsize_ros1_to_cdr( raw, 0, None, 0, + typestore, ) assert ipos == len(raw) @@ -96,13 +109,14 @@ def ros1_to_cdr(raw: bytes, typename: str) -> memoryview: 0, rawdata[4:], 0, + typestore, ) assert ipos == len(raw) assert opos + 4 == size return rawdata.toreadonly() -def cdr_to_ros1(raw: bytes, typename: str) -> memoryview: +def cdr_to_ros1(raw: bytes, typename: str, typestore: Typestore = types) -> memoryview: """Convert serialized CDR message directly to ROS1. This should be reasonably fast as conversions happen on a byte-level @@ -111,6 +125,7 @@ def cdr_to_ros1(raw: bytes, typename: str) -> memoryview: Args: raw: CDR serialized message. typename: Message type name. + typestore: Type store. Returns: ROS1 serialized message. @@ -118,13 +133,14 @@ def cdr_to_ros1(raw: bytes, typename: str) -> memoryview: """ assert raw[1] == 1, 'Message byte order is not little endian' - msgdef = get_msgdef(typename) + msgdef = get_msgdef(typename, typestore) ipos, opos = msgdef.getsize_cdr_to_ros1( raw[4:], 0, None, 0, + typestore, ) assert ipos + 4 + 3 >= len(raw) @@ -137,6 +153,7 @@ def cdr_to_ros1(raw: bytes, typename: str) -> memoryview: 0, rawdata, 0, + typestore, ) assert ipos + 4 + 3 >= len(raw) assert opos == size diff --git a/src/rosbags/serde/typing.py b/src/rosbags/serde/typing.py index de4e27b5..5275cc2b 100644 --- a/src/rosbags/serde/typing.py +++ b/src/rosbags/serde/typing.py @@ -9,12 +9,14 @@ from typing import TYPE_CHECKING, NamedTuple if TYPE_CHECKING: from typing import Any, Callable, Tuple - Bitcvt = Callable[[bytes, int, bytes, int], Tuple[int, int]] - BitcvtSize = Callable[[bytes, int, None, int], Tuple[int, int]] + from rosbags.typesys.register import Typestore - CDRDeser = Callable[[bytes, int, type], Tuple[Any, int]] - CDRSer = Callable[[bytes, int, object], int] - CDRSerSize = Callable[[int, object], int] + Bitcvt = Callable[[bytes, int, bytes, int, Typestore], Tuple[int, int]] + BitcvtSize = Callable[[bytes, int, None, int, Typestore], Tuple[int, int]] + + CDRDeser = Callable[[bytes, int, type, Typestore], Tuple[Any, int]] + CDRSer = Callable[[bytes, int, object, Typestore], int] + CDRSerSize = Callable[[int, object, Typestore], int] class Descriptor(NamedTuple): diff --git a/src/rosbags/typesys/register.py b/src/rosbags/typesys/register.py index ebbb53bc..21b0714d 100644 --- a/src/rosbags/typesys/register.py +++ b/src/rosbags/typesys/register.py @@ -13,10 +13,16 @@ from . import types from .base import Nodetype, TypesysError if TYPE_CHECKING: - from typing import Any, Optional, Union + from typing import Any, Optional, Protocol, Union from .base import Typesdict + class Typestore(Protocol): # pylint: disable=too-few-public-methods + """Type storage.""" + + FIELDDEFS: Typesdict + + INTLIKE = re.compile('^u?(bool|int|float)') @@ -134,11 +140,12 @@ def generate_python_code(typs: Typesdict) -> str: return '\n'.join(lines) -def register_types(typs: Typesdict) -> None: +def register_types(typs: Typesdict, typestore: Typestore = types) -> None: """Register types in type system. Args: typs: Dictionary mapping message typenames to parsetrees. + typestore: Type store. Raises: TypesysError: Type already present with different definition. @@ -156,14 +163,14 @@ def register_types(typs: Typesdict) -> None: for name, (_, fields) in fielddefs.items(): if name == 'std_msgs/msg/Header': continue - if have := types.FIELDDEFS.get(name): + if have := typestore.FIELDDEFS.get(name): _, have_fields = have have_fields = [(x[0].lower(), x[1]) for x in have_fields] fields = [(x[0].lower(), x[1]) for x in fields] if have_fields != fields: raise TypesysError(f'Type {name!r} is already present with different definition.') - for name in fielddefs.keys() - types.FIELDDEFS.keys(): + for name in fielddefs.keys() - typestore.FIELDDEFS.keys(): pyname = name.replace('/', '__') - setattr(types, pyname, getattr(module, pyname)) - types.FIELDDEFS[name] = fielddefs[name] + setattr(typestore, pyname, getattr(module, pyname)) + typestore.FIELDDEFS[name] = fielddefs[name] diff --git a/tests/cdr.py b/tests/cdr.py index 562329f3..c7dbd76a 100644 --- a/tests/cdr.py +++ b/tests/cdr.py @@ -14,6 +14,7 @@ from numpy.typing import NDArray from rosbags.serde.messages import SerdeError, get_msgdef from rosbags.serde.typing import Msgdef from rosbags.serde.utils import SIZEMAP, Valtype +from rosbags.typesys import types if TYPE_CHECKING: from typing import Any, Tuple @@ -187,7 +188,7 @@ def deserialize(rawdata: bytes, typename: str) -> Msgdef: """ _, little_endian = unpack_from('BB', rawdata, 0) - msgdef = get_msgdef(typename) + msgdef = get_msgdef(typename, types) obj, _ = deserialize_message( rawdata[4:], BASETYPEMAP_LE if little_endian else BASETYPEMAP_BE, @@ -428,7 +429,7 @@ def serialize( Serialized bytes. """ - msgdef = get_msgdef(typename) + msgdef = get_msgdef(typename, types) size = 4 + get_size(message, msgdef) rawdata = memoryview(bytearray(size)) diff --git a/tests/test_serde.py b/tests/test_serde.py index 8359f52f..276bcd6c 100644 --- a/tests/test_serde.py +++ b/tests/test_serde.py @@ -12,7 +12,7 @@ import pytest from rosbags.serde import SerdeError, cdr_to_ros1, deserialize_cdr, ros1_to_cdr, serialize_cdr from rosbags.serde.messages import get_msgdef -from rosbags.typesys import get_types_from_msg, register_types +from rosbags.typesys import get_types_from_msg, register_types, types from rosbags.typesys.types import builtin_interfaces__msg__Time as Time from rosbags.typesys.types import geometry_msgs__msg__Polygon as Polygon from rosbags.typesys.types import sensor_msgs__msg__MagneticField as MagneticField @@ -316,14 +316,14 @@ def test_custom_type() -> None: register_types(dict(get_types_from_msg(DYNAMIC_S_64, 'test_msgs/msg/dynamic_s_64'))) register_types(dict(get_types_from_msg(CUSTOM, cname))) - static_64_64 = get_msgdef('test_msgs/msg/static_64_64').cls - static_64_16 = get_msgdef('test_msgs/msg/static_64_16').cls - static_16_64 = get_msgdef('test_msgs/msg/static_16_64').cls - dynamic_64_64 = get_msgdef('test_msgs/msg/dynamic_64_64').cls - dynamic_64_b_64 = get_msgdef('test_msgs/msg/dynamic_64_b_64').cls - dynamic_64_s = get_msgdef('test_msgs/msg/dynamic_64_s').cls - dynamic_s_64 = get_msgdef('test_msgs/msg/dynamic_s_64').cls - custom = get_msgdef('test_msgs/msg/custom').cls + static_64_64 = get_msgdef('test_msgs/msg/static_64_64', types).cls + static_64_16 = get_msgdef('test_msgs/msg/static_64_16', types).cls + static_16_64 = get_msgdef('test_msgs/msg/static_16_64', types).cls + dynamic_64_64 = get_msgdef('test_msgs/msg/dynamic_64_64', types).cls + dynamic_64_b_64 = get_msgdef('test_msgs/msg/dynamic_64_b_64', types).cls + dynamic_64_s = get_msgdef('test_msgs/msg/dynamic_64_s', types).cls + dynamic_s_64 = get_msgdef('test_msgs/msg/dynamic_s_64', types).cls + custom = get_msgdef('test_msgs/msg/custom', types).cls msg = custom( 'str', @@ -439,7 +439,7 @@ def test_padding_empty_sequence() -> None: """Test empty sequences do not add item padding.""" register_types(dict(get_types_from_msg(SU64_B, 'test_msgs/msg/su64_b'))) - su64_b = get_msgdef('test_msgs/msg/su64_b').cls + su64_b = get_msgdef('test_msgs/msg/su64_b', types).cls msg = su64_b(numpy.array([], dtype=numpy.uint64), True) cdr = serialize_cdr(msg, msg.__msgtype__) @@ -458,7 +458,7 @@ def test_align_after_empty_sequence() -> None: """Test alignment after empty sequences.""" register_types(dict(get_types_from_msg(SU64_U64, 'test_msgs/msg/su64_u64'))) - su64_b = get_msgdef('test_msgs/msg/su64_u64').cls + su64_b = get_msgdef('test_msgs/msg/su64_u64', types).cls msg = su64_b(numpy.array([], dtype=numpy.uint64), 42) cdr = serialize_cdr(msg, msg.__msgtype__)