diff --git a/src/rosbags/convert/converter.py b/src/rosbags/convert/converter.py index 8f76567a..ef8565d4 100644 --- a/src/rosbags/convert/converter.py +++ b/src/rosbags/convert/converter.py @@ -15,7 +15,7 @@ from rosbags.typesys import get_types_from_msg, register_types if TYPE_CHECKING: from pathlib import Path - from typing import Any, Dict, Optional + from typing import Any, Optional from rosbags.rosbag1.reader import Connection as RConnection @@ -80,8 +80,8 @@ def convert(src: Path, dst: Optional[Path]) -> None: try: with Reader(src) as reader, Writer(dst) as writer: - typs: Dict[str, Any] = {} - connmap: Dict[int, WConnection] = {} + typs: dict[str, Any] = {} + connmap: dict[int, WConnection] = {} for rconn in reader.connections.values(): candidate = convert_connection(rconn) diff --git a/src/rosbags/rosbag1/reader.py b/src/rosbags/rosbag1/reader.py index b928cff3..47223d07 100644 --- a/src/rosbags/rosbag1/reader.py +++ b/src/rosbags/rosbag1/reader.py @@ -23,19 +23,7 @@ from rosbags.typesys.msg import normalize_msgtype if TYPE_CHECKING: from types import TracebackType - from typing import ( - BinaryIO, - Callable, - Dict, - Generator, - Iterable, - List, - Literal, - Optional, - Tuple, - Type, - Union, - ) + from typing import BinaryIO, Callable, Generator, Iterable, Literal, Optional, Type, Union class ReaderError(Exception): @@ -71,7 +59,7 @@ class Connection(NamedTuple): msgdef: str callerid: Optional[str] latching: Optional[int] - indexes: List + indexes: list class ChunkInfo(NamedTuple): @@ -80,7 +68,7 @@ class ChunkInfo(NamedTuple): pos: int start_time: int end_time: int - connection_counts: Dict[int, int] + connection_counts: dict[int, int] class Chunk(NamedTuple): @@ -107,11 +95,11 @@ class IndexData(NamedTuple): chunk_pos: int offset: int - def __lt__(self, other: Tuple[int, ...]) -> bool: + def __lt__(self, other: tuple[int, ...]) -> bool: """Compare by time only.""" return self.time < other[0] - def __le__(self, other: Tuple[int, ...]) -> bool: + def __le__(self, other: tuple[int, ...]) -> bool: """Compare by time only.""" return self.time <= other[0] @@ -121,11 +109,11 @@ class IndexData(NamedTuple): return NotImplemented return self.time == other[0] - def __ge__(self, other: Tuple[int, ...]) -> bool: + def __ge__(self, other: tuple[int, ...]) -> bool: """Compare by time only.""" return self.time >= other[0] - def __gt__(self, other: Tuple[int, ...]) -> bool: + def __gt__(self, other: tuple[int, ...]) -> bool: """Compare by time only.""" return self.time > other[0] @@ -371,11 +359,11 @@ class Reader: raise ReaderError(f'File {str(self.path)!r} does not exist.') self.bio: Optional[BinaryIO] = None - self.connections: Dict[int, Connection] = {} - self.chunk_infos: List[ChunkInfo] = [] - self.chunks: Dict[int, Chunk] = {} + self.connections: dict[int, Connection] = {} + self.chunk_infos: list[ChunkInfo] = [] + self.chunks: dict[int, Chunk] = {} self.current_chunk = (-1, BytesIO()) - self.topics: Dict[str, TopicInfo] = {} + self.topics: dict[str, TopicInfo] = {} def open(self): # pylint: disable=too-many-branches,too-many-locals """Open rosbag and read metadata.""" @@ -480,7 +468,7 @@ class Reader: """Total message count.""" return reduce(lambda x, y: x + y, (x.msgcount for x in self.topics.values()), 0) - def read_connection(self) -> Tuple[int, Connection]: + def read_connection(self) -> tuple[int, Connection]: """Read connection record from current position.""" assert self.bio header = Header.read(self.bio, RecordType.CONNECTION) @@ -552,7 +540,7 @@ class Reader: decompressor, ) - def read_index_data(self, pos: int) -> Tuple[int, List[IndexData]]: + def read_index_data(self, pos: int) -> tuple[int, list[IndexData]]: """Read index data from position. Args: @@ -576,7 +564,7 @@ class Reader: self.bio.seek(4, os.SEEK_CUR) - index: List[IndexData] = [] + index: list[IndexData] = [] for _ in range(count): time = deserialize_time(self.bio.read(8)) offset = read_uint32(self.bio) @@ -588,7 +576,7 @@ class Reader: topics: Optional[Iterable[str]] = None, start: Optional[int] = None, stop: Optional[int] = None, - ) -> Generator[Tuple[Connection, int, bytes], None, None]: + ) -> Generator[tuple[Connection, int, bytes], None, None]: """Read messages from bag. Args: diff --git a/src/rosbags/rosbag2/reader.py b/src/rosbags/rosbag2/reader.py index 03973657..636c0edc 100644 --- a/src/rosbags/rosbag2/reader.py +++ b/src/rosbags/rosbag2/reader.py @@ -17,7 +17,7 @@ from .connection import Connection if TYPE_CHECKING: from types import TracebackType - from typing import Any, Dict, Generator, Iterable, List, Literal, Optional, Tuple, Type, Union + from typing import Any, Generator, Iterable, Literal, Optional, Type, Union class ReaderError(Exception): @@ -162,7 +162,7 @@ class Reader: return mode if mode != 'none' else None @property - def topics(self) -> Dict[str, Connection]: + def topics(self) -> dict[str, Connection]: """Topic information. For the moment this a dictionary mapping topic names to connections. @@ -175,7 +175,7 @@ class Reader: connections: Iterable[Connection] = (), start: Optional[int] = None, stop: Optional[int] = None, - ) -> Generator[Tuple[Connection, int, bytes], None, None]: + ) -> Generator[tuple[Connection, int, bytes], None, None]: """Read messages from bag. Args: @@ -185,7 +185,7 @@ class Reader: stop: Yield only messages before this timestamp (ns). Yields: - Tuples of connection, timestamp (ns), and rawdata. + tuples of connection, timestamp (ns), and rawdata. Raises: ReaderError: Bag not open. @@ -198,7 +198,7 @@ class Reader: 'SELECT topics.id,messages.timestamp,messages.data', 'FROM messages JOIN topics ON messages.topic_id=topics.id', ] - args: List[Any] = [] + args: list[Any] = [] clause = 'WHERE' if connections: diff --git a/src/rosbags/rosbag2/writer.py b/src/rosbags/rosbag2/writer.py index 18becac6..386aef23 100644 --- a/src/rosbags/rosbag2/writer.py +++ b/src/rosbags/rosbag2/writer.py @@ -16,7 +16,7 @@ from .connection import Connection if TYPE_CHECKING: from types import TracebackType - from typing import Any, Dict, Literal, Optional, Type, Union + from typing import Any, Literal, Optional, Type, Union class WriterError(Exception): @@ -79,7 +79,7 @@ class Writer: # pylint: disable=too-many-instance-attributes self.compression_mode = '' self.compression_format = '' self.compressor: Optional[zstandard.ZstdCompressor] = None - self.connections: Dict[int, Connection] = {} + self.connections: dict[int, Connection] = {} self.conn = None self.cursor: Optional[sqlite3.Cursor] = None diff --git a/src/rosbags/serde/cdr.py b/src/rosbags/serde/cdr.py index de3ff282..5b777878 100644 --- a/src/rosbags/serde/cdr.py +++ b/src/rosbags/serde/cdr.py @@ -13,16 +13,16 @@ from __future__ import annotations import sys from itertools import tee -from typing import TYPE_CHECKING, Iterator, Optional, Tuple, cast +from typing import TYPE_CHECKING, Iterator, cast from .typing import Field from .utils import SIZEMAP, Valtype, align, align_after, compile_lines if TYPE_CHECKING: - from typing import Callable, List + from typing import Callable -def generate_getsize_cdr(fields: List[Field]) -> Tuple[Callable, int]: +def generate_getsize_cdr(fields: list[Field]) -> tuple[Callable, int]: """Generate cdr size calculation function. Args: @@ -37,7 +37,9 @@ def generate_getsize_cdr(fields: List[Field]) -> Tuple[Callable, int]: is_stat = True aligned = 8 - icurr, inext = cast(Tuple[Iterator[Field], Iterator[Optional[Field]]], tee([*fields, None])) + iterators = tee([*fields, None]) + icurr = cast(Iterator[Field], iterators[0]) + inext = iterators[1] next(inext) lines = [ 'import sys', @@ -155,7 +157,7 @@ def generate_getsize_cdr(fields: List[Field]) -> Tuple[Callable, int]: return compile_lines(lines).getsize_cdr, is_stat * size # type: ignore -def generate_serialize_cdr(fields: List[Field], endianess: str) -> Callable: +def generate_serialize_cdr(fields: list[Field], endianess: str) -> Callable: """Generate cdr serialization function. Args: @@ -168,7 +170,9 @@ def generate_serialize_cdr(fields: List[Field], endianess: str) -> Callable: """ # pylint: disable=too-many-branches,too-many-locals,too-many-statements aligned = 8 - icurr, inext = cast(Tuple[Iterator[Field], Iterator[Optional[Field]]], tee([*fields, None])) + iterators = tee([*fields, None]) + icurr = cast(Iterator[Field], iterators[0]) + inext = iterators[1] next(inext) lines = [ 'import sys', @@ -292,7 +296,7 @@ def generate_serialize_cdr(fields: List[Field], endianess: str) -> Callable: return compile_lines(lines).serialize_cdr # type: ignore -def generate_deserialize_cdr(fields: List[Field], endianess: str) -> Callable: +def generate_deserialize_cdr(fields: list[Field], endianess: str) -> Callable: """Generate cdr deserialization function. Args: @@ -305,7 +309,9 @@ def generate_deserialize_cdr(fields: List[Field], endianess: str) -> Callable: """ # pylint: disable=too-many-branches,too-many-locals,too-many-nested-blocks,too-many-statements aligned = 8 - icurr, inext = cast(Tuple[Iterator[Field], Iterator[Optional[Field]]], tee([*fields, None])) + iterators = tee([*fields, None]) + icurr = cast(Iterator[Field], iterators[0]) + inext = iterators[1] next(inext) lines = [ 'import sys', diff --git a/src/rosbags/serde/messages.py b/src/rosbags/serde/messages.py index 7c535023..ed06449b 100644 --- a/src/rosbags/serde/messages.py +++ b/src/rosbags/serde/messages.py @@ -14,9 +14,9 @@ from .typing import Descriptor, Field, Msgdef from .utils import Valtype if TYPE_CHECKING: - from typing import Any, Dict + from typing import Any -MSGDEFCACHE: Dict[str, Msgdef] = {} +MSGDEFCACHE: dict[str, Msgdef] = {} class SerdeError(Exception): diff --git a/src/rosbags/serde/ros1.py b/src/rosbags/serde/ros1.py index 5dd21967..a12feef6 100644 --- a/src/rosbags/serde/ros1.py +++ b/src/rosbags/serde/ros1.py @@ -12,16 +12,16 @@ conversion of ROS1 to CDR. from __future__ import annotations from itertools import tee -from typing import TYPE_CHECKING, Iterator, Optional, Tuple, cast +from typing import TYPE_CHECKING, Iterator, cast from .typing import Field from .utils import SIZEMAP, Valtype, align, align_after, compile_lines if TYPE_CHECKING: - from typing import Callable, List # pylint: disable=ungrouped-imports + from typing import Callable # pylint: disable=ungrouped-imports -def generate_ros1_to_cdr(fields: List[Field], typename: str, copy: bool) -> Callable: +def generate_ros1_to_cdr(fields: list[Field], typename: str, copy: bool) -> Callable: """Generate ROS1 to CDR conversion function. Args: @@ -35,7 +35,9 @@ def generate_ros1_to_cdr(fields: List[Field], typename: str, copy: bool) -> Call """ # pylint: disable=too-many-branches,too-many-locals,too-many-nested-blocks,too-many-statements aligned = 8 - icurr, inext = cast(Tuple[Iterator[Field], Iterator[Optional[Field]]], tee([*fields, None])) + iterators = tee([*fields, None]) + icurr = cast(Iterator[Field], iterators[0]) + inext = iterators[1] next(inext) funcname = 'ros1_to_cdr' if copy else 'getsize_ros1_to_cdr' lines = [ @@ -170,7 +172,7 @@ def generate_ros1_to_cdr(fields: List[Field], typename: str, copy: bool) -> Call return getattr(compile_lines(lines), funcname) -def generate_cdr_to_ros1(fields: List[Field], typename: str, copy: bool) -> Callable: +def generate_cdr_to_ros1(fields: list[Field], typename: str, copy: bool) -> Callable: """Generate CDR to ROS1 conversion function. Args: @@ -184,7 +186,9 @@ def generate_cdr_to_ros1(fields: List[Field], typename: str, copy: bool) -> Call """ # pylint: disable=too-many-branches,too-many-locals,too-many-nested-blocks,too-many-statements aligned = 8 - icurr, inext = cast(Tuple[Iterator[Field], Iterator[Optional[Field]]], tee([*fields, None])) + iterators = tee([*fields, None]) + icurr = cast(Iterator[Field], iterators[0]) + inext = iterators[1] next(inext) funcname = 'cdr_to_ros1' if copy else 'getsize_cdr_to_ros1' lines = [ diff --git a/src/rosbags/serde/utils.py b/src/rosbags/serde/utils.py index 5ebaa16c..41cdf3fe 100644 --- a/src/rosbags/serde/utils.py +++ b/src/rosbags/serde/utils.py @@ -10,7 +10,6 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from types import ModuleType - from typing import Dict, List from .typing import Descriptor @@ -24,7 +23,7 @@ class Valtype(IntEnum): SEQUENCE = 4 -SIZEMAP: Dict[str, int] = { +SIZEMAP: dict[str, int] = { 'bool': 1, 'int8': 1, 'int16': 2, @@ -83,7 +82,7 @@ def align_after(entry: Descriptor) -> int: return min([4, align_after(entry.args[0])]) -def compile_lines(lines: List[str]) -> ModuleType: +def compile_lines(lines: list[str]) -> ModuleType: """Compile lines of code to module. Args: diff --git a/src/rosbags/typesys/msg.py b/src/rosbags/typesys/msg.py index 514d77e7..51936893 100644 --- a/src/rosbags/typesys/msg.py +++ b/src/rosbags/typesys/msg.py @@ -21,7 +21,7 @@ from .peg import Rule, Visitor, parse_grammar from .types import FIELDDEFS if TYPE_CHECKING: - from typing import Any, List + from typing import Any from .base import Fielddesc, Typesdict @@ -91,7 +91,7 @@ def normalize_msgtype(name: str) -> str: return str(path) -def normalize_fieldtype(typename: str, field: Fielddesc, names: List[str]) -> Fielddesc: +def normalize_fieldtype(typename: str, field: Fielddesc, names: list[str]) -> Fielddesc: """Normalize field typename. Args: @@ -235,7 +235,7 @@ def get_types_from_msg(text: str, name: str) -> Typesdict: name: Message typename. Returns: - List with single message name and parsetree. + list with single message name and parsetree. """ return parse_message_definition(VisitorMSG(), f'MSG: {name}\n{text}') diff --git a/src/rosbags/typesys/peg.py b/src/rosbags/typesys/peg.py index 92cc229b..3298b2f9 100644 --- a/src/rosbags/typesys/peg.py +++ b/src/rosbags/typesys/peg.py @@ -14,7 +14,7 @@ import re from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Any, Dict, List, Optional, Tuple + from typing import Any, Optional class Rule: @@ -23,7 +23,7 @@ class Rule: LIT = 'LITERAL' WS = re.compile(r'\s+', re.M | re.S) - def __init__(self, value: Any, rules: Dict[str, Rule], name: Optional[str] = None): + def __init__(self, value: Any, rules: dict[str, Rule], name: Optional[str] = None): """Initialize. Args: @@ -58,7 +58,7 @@ class Rule: class RuleLiteral(Rule): """Rule to match string literal.""" - def __init__(self, value: Any, rules: Dict[str, Rule], name: Optional[str] = None): + def __init__(self, value: Any, rules: dict[str, Rule], name: Optional[str] = None): """Initialize. Args: @@ -70,7 +70,7 @@ class RuleLiteral(Rule): super().__init__(value, rules, name) self.value = value[1:-1].replace('\\\'', '\'') - def parse(self, text: str, pos: int) -> Tuple[int, Any]: + def parse(self, text: str, pos: int) -> tuple[int, Any]: """Apply rule at position.""" value = self.value if text[pos:pos + len(value)] == value: @@ -83,7 +83,7 @@ class RuleLiteral(Rule): class RuleRegex(Rule): """Rule to match regular expression.""" - def __init__(self, value: Any, rules: Dict[str, Rule], name: Optional[str] = None): + def __init__(self, value: Any, rules: dict[str, Rule], name: Optional[str] = None): """Initialize. Args: @@ -95,7 +95,7 @@ class RuleRegex(Rule): super().__init__(value, rules, name) self.value = re.compile(value[2:-1], re.M | re.S) - def parse(self, text: str, pos: int) -> Tuple[int, Any]: + def parse(self, text: str, pos: int) -> tuple[int, Any]: """Apply rule at position.""" match = self.value.match(text, pos) if not match: @@ -107,7 +107,7 @@ class RuleRegex(Rule): class RuleToken(Rule): """Rule to match token.""" - def parse(self, text: str, pos: int) -> Tuple[int, Any]: + def parse(self, text: str, pos: int) -> tuple[int, Any]: """Apply rule at position.""" token = self.rules[self.value] npos, data = token.parse(text, pos) @@ -119,7 +119,7 @@ class RuleToken(Rule): class RuleOneof(Rule): """Rule to match first matching subrule.""" - def parse(self, text: str, pos: int) -> Tuple[int, Any]: + def parse(self, text: str, pos: int) -> tuple[int, Any]: """Apply rule at position.""" for value in self.value: npos, data = value.parse(text, pos) @@ -131,7 +131,7 @@ class RuleOneof(Rule): class RuleSequence(Rule): """Rule to match a sequence of subrules.""" - def parse(self, text: str, pos: int) -> Tuple[int, Any]: + def parse(self, text: str, pos: int) -> tuple[int, Any]: """Apply rule at position.""" data = [] npos = pos @@ -146,9 +146,9 @@ class RuleSequence(Rule): class RuleZeroPlus(Rule): """Rule to match zero or more occurences of subrule.""" - def parse(self, text: str, pos: int) -> Tuple[int, Any]: + def parse(self, text: str, pos: int) -> tuple[int, Any]: """Apply rule at position.""" - data: List[Any] = [] + data: list[Any] = [] lpos = pos while True: npos, node = self.value.parse(text, lpos) @@ -161,7 +161,7 @@ class RuleZeroPlus(Rule): class RuleOnePlus(Rule): """Rule to match one or more occurences of subrule.""" - def parse(self, text: str, pos: int) -> Tuple[int, Any]: + def parse(self, text: str, pos: int) -> tuple[int, Any]: """Apply rule at position.""" npos, node = self.value.parse(text, pos) if npos == -1: @@ -179,7 +179,7 @@ class RuleOnePlus(Rule): class RuleZeroOne(Rule): """Rule to match zero or one occurence of subrule.""" - def parse(self, text: str, pos: int) -> Tuple[int, Any]: + def parse(self, text: str, pos: int) -> tuple[int, Any]: """Apply rule at position.""" npos, node = self.value.parse(text, pos) if npos == -1: @@ -190,7 +190,7 @@ class RuleZeroOne(Rule): class Visitor: # pylint: disable=too-few-public-methods """Visitor transforming parse trees.""" - RULES: Dict[str, Rule] = {} + RULES: dict[str, Rule] = {} def __init__(self): """Initialize.""" @@ -208,15 +208,15 @@ class Visitor: # pylint: disable=too-few-public-methods return func(tree['data']) -def split_token(tok: str) -> List[str]: +def split_token(tok: str) -> list[str]: """Split repetition and grouping tokens.""" return list(filter(None, re.split(r'(^\()|(\)(?=[*+?]?$))|([*+?]$)', tok))) -def collapse_tokens(toks: List[Optional[Rule]], rules: Dict[str, Rule]) -> Rule: +def collapse_tokens(toks: list[Optional[Rule]], rules: dict[str, Rule]) -> Rule: """Collapse linear list of tokens to oneof of sequences.""" - value: List[Rule] = [] - seq: List[Rule] = [] + value: list[Rule] = [] + seq: list[Rule] = [] for tok in toks: if tok: seq.append(tok) @@ -227,9 +227,9 @@ def collapse_tokens(toks: List[Optional[Rule]], rules: Dict[str, Rule]) -> Rule: return RuleOneof(value, rules) if len(value) > 1 else value[0] -def parse_grammar(grammar: str) -> Dict[str, Rule]: +def parse_grammar(grammar: str) -> dict[str, Rule]: """Parse grammar into rule dictionary.""" - rules: Dict[str, Rule] = {} + rules: dict[str, Rule] = {} for token in grammar.split('\n\n'): lines = token.strip().split('\n') name, *defs = lines @@ -237,8 +237,8 @@ def parse_grammar(grammar: str) -> Dict[str, Rule]: assert items assert items[0] == '=' items.pop(0) - stack: List[Optional[Rule]] = [] - parens: List[int] = [] + stack: list[Optional[Rule]] = [] + parens: list[int] = [] while items: tok = items.pop(0) if tok in ['*', '+', '?']: