Update lint

This commit is contained in:
Marko Durkovic
2022-04-11 00:07:53 +02:00
parent 7315a4ab4d
commit 19f0678645
22 changed files with 215 additions and 149 deletions
+1 -1
View File
@@ -171,5 +171,5 @@ def convert(src: Path, dst: Optional[Path]) -> None:
raise ConverterError(f'Reading source bag: {err}') from err
except (WriterError1, WriterError2) as err:
raise ConverterError(f'Writing destination bag: {err}') from err
except Exception as err: # pylint: disable=broad-except
except Exception as err:
raise ConverterError(f'Converting rosbag: {err!r}') from err
+11 -13
View File
@@ -106,9 +106,9 @@ class IndexData(NamedTuple):
def __eq__(self, other: object) -> bool:
"""Compare by time only."""
if not isinstance(other, IndexData): # pragma: no cover
return NotImplemented
return self.time == other[0]
if isinstance(other, IndexData):
return self.time == other[0]
return NotImplemented # pragma: no cover
def __ge__(self, other: tuple[int, ...]) -> bool:
"""Compare by time only."""
@@ -120,9 +120,9 @@ class IndexData(NamedTuple):
def __ne__(self, other: object) -> bool:
"""Compare by time only."""
if not isinstance(other, IndexData): # pragma: no cover
return NotImplemented
return self.time != other[0]
if isinstance(other, IndexData):
return self.time != other[0]
return NotImplemented # pragma: no cover
deserialize_uint8: Callable[[bytes], tuple[int]] = struct.Struct('<B').unpack # type: ignore
@@ -369,7 +369,7 @@ class Reader:
self.current_chunk: tuple[int, BinaryIO] = (-1, BytesIO())
self.topics: dict[str, TopicInfo] = {}
def open(self) -> None: # pylint: disable=too-many-branches,too-many-locals,too-many-statements
def open(self) -> None: # pylint: disable=too-many-branches,too-many-locals
"""Open rosbag and read metadata."""
try:
self.bio = self.path.open('rb')
@@ -394,13 +394,11 @@ class Reader:
conn_count = header.get_uint32('conn_count')
chunk_count = header.get_uint32('chunk_count')
try:
encryptor = header.get_string('encryptor')
if encryptor:
raise ValueError
except ValueError:
raise ReaderError(f'Bag encryption {encryptor!r} is not supported.') from None
encryptor: Optional[str] = header.get_string('encryptor')
except ReaderError:
pass
encryptor = None
if encryptor:
raise ReaderError(f'Bag encryption {encryptor!r} is not supported.') from None
if index_pos == 0:
raise ReaderError('Bag is not indexed, reindex before reading.')
+3 -2
View File
@@ -31,6 +31,7 @@ class WriterError(Exception):
@dataclass
class WriteChunk:
"""In progress chunk."""
data: BytesIO
pos: int
start: int
@@ -126,7 +127,7 @@ class Header(Dict[str, Any]):
return size + 4
class Writer: # pylint: disable=too-many-instance-attributes
class Writer:
"""Rosbag1 writer.
This class implements writing of rosbag1 files in version 2.0. It should be
@@ -212,7 +213,7 @@ class Writer: # pylint: disable=too-many-instance-attributes
md5sum: Optional[str] = None,
callerid: Optional[str] = None,
latching: Optional[int] = None,
**_kw: Any,
**_kw: Any, # noqa: ANN401
) -> Connection:
"""Add a connection.
+46 -11
View File
@@ -18,7 +18,44 @@ from .connection import Connection
if TYPE_CHECKING:
from types import TracebackType
from typing import Any, Generator, Iterable, Literal, Optional, Type, Union
from typing import Any, Generator, Iterable, Literal, Optional, Type, TypedDict, Union
class StartingTime(TypedDict):
"""Bag starting time."""
nanoseconds_since_epoch: int
class Duration(TypedDict):
"""Bag starting time."""
nanoseconds: int
class TopicMetadata(TypedDict):
"""Topic metadata."""
name: str
type: str
serialization_format: str
offered_qos_profiles: str
class TopicWithMessageCount(TypedDict):
"""Topic with message count."""
message_count: int
topic_metadata: TopicMetadata
class Metadata(TypedDict):
"""Rosbag2 metadata file."""
version: int
storage_identifier: str
relative_file_paths: list[str]
starting_time: StartingTime
duration: Duration
message_count: int
compression_format: str
compression_mode: str
topics_with_message_count: list[TopicWithMessageCount]
class ReaderError(Exception):
@@ -72,13 +109,14 @@ class Reader:
Raises:
ReaderError: Bag not readable or bag metadata.
"""
path = Path(path)
self.path = Path
yamlpath = path / 'metadata.yaml'
self.path = path
self.bio = False
try:
yaml = YAML(typ='safe')
yamlpath = path / 'metadata.yaml'
dct = yaml.load(yamlpath.read_text())
except OSError as err:
raise ReaderError(f'Could not read metadata at {yamlpath}: {err}.') from None
@@ -86,7 +124,7 @@ class Reader:
raise ReaderError(f'Could not load YAML from {yamlpath}: {exc}') from None
try:
self.metadata = dct['rosbag2_bagfile_information']
self.metadata: Metadata = dct['rosbag2_bagfile_information']
if (ver := self.metadata['version']) > 4:
raise ReaderError(f'Rosbag2 version {ver} not supported; please report issue.')
if storageid := self.metadata['storage_identifier'] != 'sqlite3':
@@ -95,8 +133,7 @@ class Reader:
)
self.paths = [path / Path(x).name for x in self.metadata['relative_file_paths']]
missing = [x for x in self.paths if not x.exists()]
if missing:
if missing := [x for x in self.paths if not x.exists()]:
raise ReaderError(f'Some database files are missing: {[str(x) for x in missing]!r}')
self.connections = {
@@ -110,7 +147,7 @@ class Reader:
) for idx, x in enumerate(self.metadata['topics_with_message_count'])
}
noncdr = {
y for x in self.connections.values() if (y := x.serialization_format) != 'cdr'
fmt for x in self.connections.values() if (fmt := x.serialization_format) != 'cdr'
}
if noncdr:
raise ReaderError(f'Serialization format {noncdr!r} is not supported.')
@@ -140,8 +177,7 @@ class Reader:
@property
def start_time(self) -> int:
"""Timestamp in nanoseconds of the earliest message."""
nsecs: int = self.metadata['starting_time']['nanoseconds_since_epoch']
return nsecs
return self.metadata['starting_time']['nanoseconds_since_epoch']
@property
def end_time(self) -> int:
@@ -151,8 +187,7 @@ class Reader:
@property
def message_count(self) -> int:
"""Total message count."""
count: int = self.metadata['message_count']
return count
return self.metadata['message_count']
@property
def compression_format(self) -> Optional[str]:
+4 -2
View File
@@ -18,6 +18,8 @@ if TYPE_CHECKING:
from types import TracebackType
from typing import Any, Literal, Optional, Type, Union
from .reader import Metadata
class WriterError(Exception):
"""Writer Error."""
@@ -125,7 +127,7 @@ class Writer: # pylint: disable=too-many-instance-attributes
msgtype: str,
serialization_format: str = 'cdr',
offered_qos_profiles: str = '',
**_kw: Any,
**_kw: Any, # noqa: ANN401
) -> Connection:
"""Add a connection.
@@ -218,7 +220,7 @@ class Writer: # pylint: disable=too-many-instance-attributes
self.compressor.copy_stream(infile, outfile)
src.unlink()
metadata = {
metadata: dict[str, Metadata] = {
'rosbag2_bagfile_information': {
'version': 4,
'storage_identifier': 'sqlite3',
+45 -45
View File
@@ -86,22 +86,22 @@ def generate_getsize_cdr(fields: list[Field]) -> tuple[CDRSerSize, int]:
else:
assert subdesc.valtype == Valtype.MESSAGE
anext = align(subdesc)
anext_before = align(subdesc)
anext_after = align_after(subdesc)
if subdesc.args.size_cdr:
for _ in range(length):
if anext > anext_after:
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
size = (size + anext - 1) & -anext
if anext_before > anext_after:
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
size = (size + anext_before - 1) & -anext_before
lines.append(f' pos += {subdesc.args.size_cdr}')
size += subdesc.args.size_cdr
else:
lines.append(f' func = get_msgdef("{subdesc.args.name}").getsize_cdr')
lines.append(f' val = message.{fieldname}')
for idx in range(length):
if anext > anext_after:
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
if anext_before > anext_after:
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(f' pos = func(pos, val[{idx}])')
is_stat = False
aligned = align_after(subdesc)
@@ -117,45 +117,45 @@ def generate_getsize_cdr(fields: list[Field]) -> tuple[CDRSerSize, int]:
lines.append(' pos += 4 + len(val.encode()) + 1')
aligned = 1
else:
anext = align(subdesc)
if aligned < anext:
anext_before = align(subdesc)
if aligned < anext_before:
lines.append(f' if len(message.{fieldname}):')
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
aligned = anext
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
aligned = anext_before
lines.append(f' pos += len(message.{fieldname}) * {SIZEMAP[subdesc.args]}')
else:
assert subdesc.valtype == Valtype.MESSAGE
anext = align(subdesc)
anext_before = align(subdesc)
anext_after = align_after(subdesc)
lines.append(f' val = message.{fieldname}')
if subdesc.args.size_cdr:
if aligned < anext <= anext_after:
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
if aligned < anext_before <= anext_after:
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(' for _ in val:')
if anext > anext_after:
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
if anext_before > anext_after:
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(f' pos += {subdesc.args.size_cdr}')
else:
lines.append(f' func = get_msgdef("{subdesc.args.name}").getsize_cdr')
if aligned < anext <= anext_after:
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
if aligned < anext_before <= anext_after:
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(' for item in val:')
if anext > anext_after:
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
if anext_before > anext_after:
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(' pos = func(pos, item)')
aligned = align_after(subdesc)
aligned = min([aligned, 4])
is_stat = False
if fnext and aligned < (anext := align(fnext.descriptor)):
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
aligned = anext
if fnext and aligned < (anext_before := align(fnext.descriptor)):
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
aligned = anext_before
is_stat = False
lines.append(' return pos')
return compile_lines(lines).getsize_cdr, is_stat * size # type: ignore
return compile_lines(lines).getsize_cdr, is_stat * size
def generate_serialize_cdr(fields: list[Field], endianess: str) -> CDRSer:
@@ -240,14 +240,14 @@ def generate_serialize_cdr(fields: list[Field], endianess: str) -> CDRSer:
else:
assert subdesc.valtype == Valtype.MESSAGE
anext = align(subdesc)
anext_before = align(subdesc)
anext_after = align_after(subdesc)
lines.append(
f' func = get_msgdef("{subdesc.args.name}").serialize_cdr_{endianess}',
)
for idx in range(length):
if anext > anext_after:
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
if anext_before > anext_after:
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(f' pos = func(rawdata, pos, val[{idx}])')
aligned = align_after(subdesc)
else:
@@ -272,28 +272,28 @@ def generate_serialize_cdr(fields: list[Field], endianess: str) -> CDRSer:
lines.append(f' size = len(val) * {SIZEMAP[subdesc.args]}')
if (endianess == 'le') != (sys.byteorder == 'little'):
lines.append(' val = val.byteswap()')
if aligned < (anext := align(subdesc)):
if aligned < (anext_before := align(subdesc)):
lines.append(' if size:')
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(' rawdata[pos:pos + size] = val.view(numpy.uint8)')
lines.append(' pos += size')
aligned = anext
aligned = anext_before
if subdesc.valtype == Valtype.MESSAGE:
anext = align(subdesc)
anext_before = align(subdesc)
lines.append(
f' func = get_msgdef("{subdesc.args.name}").serialize_cdr_{endianess}',
)
lines.append(' for item in val:')
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(' pos = func(rawdata, pos, item)')
aligned = align_after(subdesc)
aligned = min([4, aligned])
if fnext and aligned < (anext := align(fnext.descriptor)):
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
aligned = anext
if fnext and aligned < (anext_before := align(fnext.descriptor)):
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
aligned = anext_before
lines.append(' return pos')
return compile_lines(lines).serialize_cdr # type: ignore
@@ -384,13 +384,13 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser:
lines.append(f' pos += {size}')
else:
assert subdesc.valtype == Valtype.MESSAGE
anext = align(subdesc)
anext_before = align(subdesc)
anext_after = align_after(subdesc)
lines.append(f' msgdef = get_msgdef("{subdesc.args.name}")')
lines.append(' value = []')
for _ in range(length):
if anext > anext_after:
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
if anext_before > anext_after:
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls)')
lines.append(' value.append(obj)')
lines.append(' values.append(value)')
@@ -418,9 +418,9 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser:
aligned = 1
else:
lines.append(f' length = size * {SIZEMAP[subdesc.args]}')
if aligned < (anext := align(subdesc)):
if aligned < (anext_before := align(subdesc)):
lines.append(' if size:')
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(
f' val = numpy.frombuffer(rawdata, '
f'dtype=numpy.{subdesc.args}, count=size, offset=pos)',
@@ -429,14 +429,14 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser:
lines.append(' val = val.byteswap()')
lines.append(' values.append(val)')
lines.append(' pos += length')
aligned = anext
aligned = anext_before
if subdesc.valtype == Valtype.MESSAGE:
anext = align(subdesc)
anext_before = align(subdesc)
lines.append(f' msgdef = get_msgdef("{subdesc.args.name}")')
lines.append(' value = []')
lines.append(' for _ in range(size):')
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
lines.append(f' obj, pos = msgdef.{funcname}(rawdata, pos, msgdef.cls)')
lines.append(' value.append(obj)')
lines.append(' values.append(value)')
@@ -444,9 +444,9 @@ def generate_deserialize_cdr(fields: list[Field], endianess: str) -> CDRDeser:
aligned = min([4, aligned])
if fnext and aligned < (anext := align(fnext.descriptor)):
lines.append(f' pos = (pos + {anext} - 1) & -{anext}')
aligned = anext
if fnext and aligned < (anext_before := align(fnext.descriptor)):
lines.append(f' pos = (pos + {anext_before} - 1) & -{anext_before}')
aligned = anext_before
lines.append(' return cls(*values), pos')
return compile_lines(lines).deserialize_cdr # type: ignore
+6 -2
View File
@@ -14,7 +14,7 @@ from .typing import Descriptor, Field, Msgdef
from .utils import Valtype
if TYPE_CHECKING:
from typing import Any
from rosbags.typesys.base import Fielddesc
MSGDEFCACHE: dict[str, Msgdef] = {}
@@ -38,14 +38,18 @@ def get_msgdef(typename: str) -> Msgdef:
if typename not in MSGDEFCACHE:
entries = types.FIELDDEFS[typename][1]
def fixup(entry: Any) -> Descriptor:
def fixup(entry: Fielddesc) -> Descriptor:
if entry[0] == Valtype.BASE:
assert isinstance(entry[1], str)
return Descriptor(Valtype.BASE, entry[1])
if entry[0] == Valtype.MESSAGE:
assert isinstance(entry[1], str)
return Descriptor(Valtype.MESSAGE, get_msgdef(entry[1]))
if entry[0] == Valtype.ARRAY:
assert not isinstance(entry[1][0], str)
return Descriptor(Valtype.ARRAY, (fixup(entry[1][0]), entry[1][1]))
if entry[0] == Valtype.SEQUENCE:
assert not isinstance(entry[1][0], str)
return Descriptor(Valtype.SEQUENCE, (fixup(entry[1][0]), entry[1][1]))
raise SerdeError( # pragma: no cover
f'Unknown field type {entry[0]!r} encountered.',
+23 -23
View File
@@ -18,7 +18,7 @@ from .typing import Field
from .utils import SIZEMAP, Valtype, align, align_after, compile_lines
if TYPE_CHECKING:
from typing import Union # pylint: disable=ungrouped-imports
from typing import Union
from .typing import Bitcvt, BitcvtSize
@@ -114,13 +114,13 @@ def generate_ros1_to_cdr(
aligned = SIZEMAP[subdesc.args]
if subdesc.valtype == Valtype.MESSAGE:
anext = align(subdesc)
anext_before = align(subdesc)
anext_after = align_after(subdesc)
lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}')
for _ in range(length):
if anext > anext_after:
lines.append(f' opos = (opos + {anext} - 1) & -{anext}')
if anext_before > anext_after:
lines.append(f' opos = (opos + {anext_before} - 1) & -{anext_before}')
lines.append(' ipos, opos = func(input, ipos, output, opos)')
aligned = anext_after
else:
@@ -150,30 +150,30 @@ def generate_ros1_to_cdr(
lines.append(' opos += length')
aligned = 1
else:
if aligned < (anext := align(subdesc)):
if aligned < (anext_before := align(subdesc)):
lines.append(' if size:')
lines.append(f' opos = (opos + {anext} - 1) & -{anext}')
lines.append(f' opos = (opos + {anext_before} - 1) & -{anext_before}')
lines.append(f' length = size * {SIZEMAP[subdesc.args]}')
if copy:
lines.append(' output[opos:opos + length] = input[ipos:ipos + length]')
lines.append(' ipos += length')
lines.append(' opos += length')
aligned = anext
aligned = anext_before
else:
assert subdesc.valtype == Valtype.MESSAGE
anext = align(subdesc)
anext_before = align(subdesc)
lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}')
lines.append(' for _ in range(size):')
lines.append(f' opos = (opos + {anext} - 1) & -{anext}')
lines.append(f' opos = (opos + {anext_before} - 1) & -{anext_before}')
lines.append(' ipos, opos = func(input, ipos, output, opos)')
aligned = align_after(subdesc)
aligned = min([aligned, 4])
if fnext and aligned < (anext := align(fnext.descriptor)):
lines.append(f' opos = (opos + {anext} - 1) & -{anext}')
aligned = anext
if fnext and aligned < (anext_before := align(fnext.descriptor)):
lines.append(f' opos = (opos + {anext_before} - 1) & -{anext_before}')
aligned = anext_before
lines.append(' return ipos, opos')
return getattr(compile_lines(lines), funcname) # type: ignore
@@ -270,13 +270,13 @@ def generate_cdr_to_ros1(
aligned = SIZEMAP[subdesc.args]
if subdesc.valtype == Valtype.MESSAGE:
anext = align(subdesc)
anext_before = align(subdesc)
anext_after = align_after(subdesc)
lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}')
for _ in range(length):
if anext > anext_after:
lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}')
if anext_before > anext_after:
lines.append(f' ipos = (ipos + {anext_before} - 1) & -{anext_before}')
lines.append(' ipos, opos = func(input, ipos, output, opos)')
aligned = anext_after
else:
@@ -304,30 +304,30 @@ def generate_cdr_to_ros1(
lines.append(' opos += length')
aligned = 1
else:
if aligned < (anext := align(subdesc)):
if aligned < (anext_before := align(subdesc)):
lines.append(' if size:')
lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}')
lines.append(f' ipos = (ipos + {anext_before} - 1) & -{anext_before}')
lines.append(f' length = size * {SIZEMAP[subdesc.args]}')
if copy:
lines.append(' output[opos:opos + length] = input[ipos:ipos + length]')
lines.append(' ipos += length')
lines.append(' opos += length')
aligned = anext
aligned = anext_before
else:
assert subdesc.valtype == Valtype.MESSAGE
anext = align(subdesc)
anext_before = align(subdesc)
lines.append(f' func = get_msgdef("{subdesc.args.name}").{funcname}')
lines.append(' for _ in range(size):')
lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}')
lines.append(f' ipos = (ipos + {anext_before} - 1) & -{anext_before}')
lines.append(' ipos, opos = func(input, ipos, output, opos)')
aligned = align_after(subdesc)
aligned = min([aligned, 4])
if fnext and aligned < (anext := align(fnext.descriptor)):
lines.append(f' ipos = (ipos + {anext} - 1) & -{anext}')
aligned = anext
if fnext and aligned < (anext_before := align(fnext.descriptor)):
lines.append(f' ipos = (ipos + {anext_before} - 1) & -{anext_before}')
aligned = anext_before
lines.append(' return ipos, opos')
return getattr(compile_lines(lines), funcname) # type: ignore
+2 -2
View File
@@ -14,7 +14,7 @@ if TYPE_CHECKING:
from typing import Any
def deserialize_cdr(rawdata: bytes, typename: str) -> Any:
def deserialize_cdr(rawdata: bytes, typename: str) -> Any: # noqa: ANN401
"""Deserialize raw data into a message object.
Args:
@@ -35,7 +35,7 @@ def deserialize_cdr(rawdata: bytes, typename: str) -> Any:
def serialize_cdr(
message: Any,
message: object,
typename: str,
little_endian: bool = sys.byteorder == 'little',
) -> memoryview:
+2 -2
View File
@@ -13,8 +13,8 @@ if TYPE_CHECKING:
BitcvtSize = Callable[[bytes, int, None, int], Tuple[int, int]]
CDRDeser = Callable[[bytes, int, type], Tuple[Any, int]]
CDRSer = Callable[[bytes, int, type], int]
CDRSerSize = Callable[[int, type], int]
CDRSer = Callable[[bytes, int, object], int]
CDRSerSize = Callable[[int, object], int]
class Descriptor(NamedTuple):
+1 -1
View File
@@ -68,5 +68,5 @@ def parse_message_definition(visitor: Visitor, text: str) -> Typesdict:
npos, trees = rule.parse(text, pos)
assert npos == len(text), f'Could not parse: {text!r}'
return visitor.visit(trees) # type: ignore
except Exception as err: # pylint: disable=broad-except
except Exception as err:
raise TypesysError(f'Could not parse: {text!r}') from err
+6 -5
View File
@@ -31,9 +31,8 @@ def get_typehint(desc: tuple[int, Union[str, tuple[tuple[int, str], Optional[int
"""
if desc[0] == Nodetype.BASE:
if match := INTLIKE.match(desc[1]): # type: ignore
return match.group(1)
return 'str'
assert isinstance(desc[1], str)
return match.group(1) if (match := INTLIKE.match(desc[1])) else 'str'
if desc[0] == Nodetype.NAME:
assert isinstance(desc[1], str)
@@ -43,7 +42,8 @@ def get_typehint(desc: tuple[int, Union[str, tuple[tuple[int, str], Optional[int
if INTLIKE.match(sub[1]):
typ = 'bool8' if sub[1] == 'bool' else sub[1]
return f'numpy.ndarray[Any, numpy.dtype[numpy.{typ}]]'
return f'list[{get_typehint(sub)}]' # type: ignore
assert isinstance(sub, tuple)
return f'list[{get_typehint(sub)}]'
def generate_python_code(typs: Typesdict) -> str:
@@ -142,6 +142,7 @@ def register_types(typs: Typesdict) -> None:
Raises:
TypesysError: Type already present with different definition.
"""
code = generate_python_code(typs)
name = 'rosbags.usertypes'
@@ -150,7 +151,7 @@ def register_types(typs: Typesdict) -> None:
module = module_from_spec(spec)
sys.modules[name] = module
exec(code, module.__dict__) # pylint: disable=exec-used
fielddefs: Typesdict = module.FIELDDEFS # type: ignore
fielddefs: Typesdict = module.FIELDDEFS
for name, (_, fields) in fielddefs.items():
if name == 'std_msgs/msg/Header':