Add type system

This commit is contained in:
Marko Durkovic
2021-05-02 14:43:48 +02:00
parent 3628834c21
commit a7461c8ae7
14 changed files with 3805 additions and 0 deletions
+29
View File
@@ -0,0 +1,29 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""Rosbags Type System.
The type system manages ROS message types and ships all standard ROS2
distribution message types by default. The system supports custom message
types through parsers that dynamically parse custom message definitons
from different source formats.
Supported formats:
- IDL files (subset of the standard necessary for parsing ROS2 IDL) `[1]`_
- MSG files `[2]`_
.. _[1]: https://www.omg.org/spec/IDL/About-IDL/
.. _[2]: http://wiki.ros.org/msg
"""
from .base import TypesysError
from .idl import get_types_from_idl
from .msg import get_types_from_msg
from .register import register_types
__all__ = [
'TypesysError',
'get_types_from_idl',
'get_types_from_msg',
'register_types',
]
+45
View File
@@ -0,0 +1,45 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""Tool to update builtin types shipped with rosbags."""
from __future__ import annotations
from os import walk
from pathlib import Path
from typing import TYPE_CHECKING
from .idl import get_types_from_idl
from .msg import get_types_from_msg
from .register import generate_python_code, register_types
if TYPE_CHECKING:
from .base import Typesdict
def main() -> None: # pragma: no cover
"""Update builtin types.
Discover message definitions in filesystem and generate types.py module.
"""
typs: Typesdict = {}
selfdir = Path(__file__).parent
for root, dirnames, files in walk(selfdir.parents[2] / 'tools' / 'messages'):
if '.rosbags_ignore' in files:
dirnames.clear()
continue
for fname in files:
path = Path(root, fname)
if path.suffix == '.idl':
typs.update(get_types_from_idl(path.read_text()))
elif path.suffix == '.msg':
name = path.relative_to(path.parents[2]).with_suffix('')
if '/msg/' not in str(name):
name = name.parent / 'msg' / name.name
typs.update(get_types_from_msg(path.read_text(), str(name)))
register_types(typs)
(selfdir / 'types.py').write_text(generate_python_code(typs))
if __name__ == '__main__':
main()
+70
View File
@@ -0,0 +1,70 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""Types and helpers used by message definition converters."""
from __future__ import annotations
from enum import IntEnum, auto
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Dict, List, Tuple
from .peg import Visitor
Fielddefs = List[Tuple[Any, Any]]
Typesdict = Dict[str, Fielddefs]
class TypesysError(Exception):
"""Parser error."""
class Nodetype(IntEnum):
"""Parse tree node types.
The first four match the Valtypes of final message definitions.
"""
BASE = auto()
NAME = auto()
ARRAY = auto()
SEQUENCE = auto()
LITERAL_STRING = auto()
LITERAL_NUMBER = auto()
LITERAL_BOOLEAN = auto()
LITERAL_CHAR = auto()
MODULE = auto()
CONST = auto()
STRUCT = auto()
SDECLARATOR = auto()
ADECLARATOR = auto()
ANNOTATION = auto()
EXPRESSION_BINARY = auto()
EXPRESSION_UNARY = auto()
def parse_message_definition(visitor: Visitor, text: str) -> Typesdict:
"""Parse message definition.
Args:
visitor: Visitor instance to use.
text: Message definition.
Returns:
Parsetree of message.
Raises:
TypesysError: Message parsing failed.
"""
try:
rule = visitor.RULES['specification']
pos = rule.skip_ws(text, 0)
npos, trees = rule.parse(text, pos)
assert npos == len(text), f'Could not parse: {text!r}'
return visitor.visit(trees)
except Exception as err: # pylint: disable=broad-except
raise TypesysError(f'Could not parse: {text!r}') from err
+465
View File
@@ -0,0 +1,465 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""IDL Parser.
Grammar, parse tree visitor and conversion functions for message definitions in
`IDL`_ format.
.. _IDL: https://www.omg.org/spec/IDL/About-IDL/
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from .base import Nodetype, parse_message_definition
from .peg import Rule, Visitor, parse_grammar
if TYPE_CHECKING:
from typing import Any
from .base import Typesdict
GRAMMAR_IDL = r"""
specification
= definition+
definition
= comment
/ macro
/ include
/ module_dcl ';'
/ const_dcl ';'
/ type_dcl ';'
comment
= r'[/][/][^\n]*'
macro
= ifndef
/ define
/ endif
ifndef
= '#ifndef' r'[a-zA-Z0-9_]+'
define
= '#define' r'[a-zA-Z0-9_]+'
endif
= '#endif'
include
= '#include' include_filename
include_filename
= '<' r'[^>]+' '>'
/ '"' r'[^"]+' '"'
module_dcl
= annotation* 'module' identifier '{' definition+ '}'
const_dcl
= 'const' const_type identifier '=' expression
type_dcl
= typedef_dcl
/ constr_type_dcl
typedef_dcl
= 'typedef' type_declarator
type_declarator
= ( simple_type_spec
/ template_type_spec
/ constr_type_dcl
) any_declarators
simple_type_spec
= base_type_spec
/ scoped_name
template_type_spec
= sequence_type
/ string_type
sequence_type
= 'sequence' '<' type_spec ',' expression '>'
/ 'sequence' '<' type_spec '>'
type_spec
= template_type_spec
/ simple_type_spec
any_declarators
= any_declarator (',' any_declarator)*
any_declarator
= array_declarator
/ simple_declarator
constr_type_dcl
= struct_dcl
struct_dcl
= struct_def
struct_def
= annotation* 'struct' identifier '{' member+ '}'
member
= annotation* type_spec declarators ';'
declarators
= declarator (',' declarator)*
declarator
= array_declarator
/ simple_declarator
simple_declarator
= identifier
array_declarator
= identifier fixed_array_size+
fixed_array_size
= '[' expression ']'
annotation
= '@' scoped_name ('(' annotation_params ')')?
annotation_params
= annotation_param (',' annotation_param)*
/ expression
annotation_param
= identifier '=' expression
const_type
= base_type_spec
/ string_type
/ scoped_name
base_type_spec
= integer_type
/ float_type
/ char_type
/ boolean_type
/ octet_type
integer_type
= r'u?int(64|32|16|8)\b'
/ r'(unsigned\s+)?((long\s+)?long|int|short)\b'
float_type
= r'((long\s+)?double|float)\b'
char_type
= r'char\b'
boolean_type
= r'boolean\b'
octet_type
= r'octet\b'
string_type
= 'string' '<' expression '>'
/ 'string'
scoped_name
= identifier '::' scoped_name
/ '::' scoped_name
/ identifier
identifier
= r'[a-zA-Z_][a-zA-Z_0-9]*'
expression
= primary_expr binary_operator primary_expr
/ primary_expr
/ unary_operator primary_expr
primary_expr
= literal
/ scoped_name
/ '(' expression ')'
binary_operator
= '|'
/ '^'
/ '&'
/ '<<'
/ '>>'
/ '+'
/ '-'
/ '*'
/ '/'
/ '%'
unary_operator
= '+'
/ '-'
/ '~'
literal
= boolean_literal
/ float_literal
/ integer_literal
/ character_literal
/ string_literals
boolean_literal
= 'TRUE'
/ 'FALSE'
integer_literal
= hexadecimal_literal
/ octal_literal
/ decimal_literal
decimal_literal
= r'[-+]?[1-9][0-9]+'
/ r'[-+]?[0-9]'
octal_literal
= r'[-+]?0[0-7]+'
hexadecimal_literal
= r'[-+]?0[xX][a-fA-F0-9]+'
float_literal
= r'[-+]?[0-9]*\.[0-9]+([eE][-+]?[0-9]+)?'
/ r'[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)'
character_literal
= '\'' r'[a-zA-Z0-9_]' '\''
string_literals
= string_literal+
string_literal
= '"' r'(\\"|[^"])*' '"'
"""
class VisitorIDL(Visitor): # pylint: disable=too-many-public-methods
"""IDL file visitor."""
# pylint: disable=no-self-use
RULES = parse_grammar(GRAMMAR_IDL)
def visit_specification(self, children: Any) -> Typesdict:
"""Process start symbol, return only children of modules."""
children = [x[0] for x in children if x is not None]
modules = [y for t, x in children if t == Nodetype.MODULE for y in x]
return {x[1]: x[2] for x in modules if x[0] == Nodetype.STRUCT}
def visit_comment(self, children: Any) -> Any:
"""Process comment, suppress output."""
def visit_macro(self, children: Any) -> Any:
"""Process macro, suppress output."""
def visit_include(self, children: Any) -> Any:
"""Process include, suppress output."""
def visit_type_dcl(self, children: Any) -> Any:
"""Process typedef, pass structs, suppress otherwise."""
if children[0] == Nodetype.STRUCT:
return children
return None
def visit_module_dcl(self, children: Any) -> Any:
"""Process module declaration."""
assert len(children) == 6
assert children[2][0] == Nodetype.NAME
name = children[2][1]
children = children[4]
consts = []
structs = []
modules = []
for item in children:
if not item or item[0] is None:
continue
item = item[0]
if item[0] == Nodetype.CONST:
consts.append(item)
elif item[0] == Nodetype.STRUCT:
structs.append(item)
else:
assert item[0] == Nodetype.MODULE
modules.append(item)
for _, module in modules:
consts += [x for x in module if x[0] == Nodetype.CONST]
structs += [x for x in module if x[0] == Nodetype.STRUCT]
consts = [(x[0], f'{name}/{x[1][0]}', *x[1][1:]) for x in consts]
structs = [(x[0], f'{name}/{x[1]}', *x[2:]) for x in structs]
return (Nodetype.MODULE, consts + structs)
def visit_const_dcl(self, children: Any) -> Any:
"""Process const declaration."""
return (Nodetype.CONST, (children[1][1], *children[2:]))
def visit_type_declarator(self, children: Any) -> Any:
"""Process type declarator, register type mapping in instance typedef dictionary."""
assert len(children) == 2
base, declarators = children
if base[1] in self.typedefs:
base = self.typedefs[base[1]]
declarators = [children[1][0], *[x[1:][0] for x in children[1][1]]]
for declarator in declarators:
if declarator[0] == Nodetype.ADECLARATOR:
value = (Nodetype.ARRAY, declarator[2][1], base)
else:
value = base
self.typedefs[declarator[1][1]] = value
def visit_sequence_type(self, children: Any) -> Any:
"""Process sequence type specification."""
assert len(children) in [4, 6]
if len(children) == 6:
assert children[4][0] == Nodetype.LITERAL_NUMBER
return (Nodetype.SEQUENCE, children[2])
return (Nodetype.SEQUENCE, children[2])
def create_struct_field(self, parts: Any) -> Any:
"""Create struct field and expand typedefs."""
typename, params = parts[1:3]
params = [params[0], *[x[1:][0] for x in params[1]]]
def resolve_name(name: Any) -> Any:
while name[0] == Nodetype.NAME and name[1] in self.typedefs:
name = self.typedefs[name[1]]
return name
yield from ((resolve_name(typename), x[1]) for x in params if x)
def visit_struct_dcl(self, children: Any) -> Any:
"""Process struct declaration."""
assert len(children) == 6
assert children[2][0] == Nodetype.NAME
fields = [y for x in children[4] for y in self.create_struct_field(x)]
return (Nodetype.STRUCT, children[2][1], fields)
def visit_simple_declarator(self, children: Any) -> Any:
"""Process simple declarator."""
assert len(children) == 2
return (Nodetype.SDECLARATOR, children)
def visit_array_declarator(self, children: Any) -> Any:
"""Process array declarator."""
assert len(children) == 2
return (Nodetype.ADECLARATOR, children[0], children[1][0][1])
def visit_annotation(self, children: Any) -> Any:
"""Process annotation."""
assert len(children) == 3
assert children[1][0] == Nodetype.NAME
params = children[2][0][1]
params = [
[z for z in y if z[0] != Rule.LIT] for y in [params[0], *[x[1:][0] for x in params[1]]]
]
return (Nodetype.ANNOTATION, children[1][1], params)
def visit_base_type_spec(self, children: Any) -> Any:
"""Process base type specifier."""
oname = children
name = {
'boolean': 'bool',
'double': 'float64',
'float': 'float32',
'octet': 'uint8',
}.get(oname, oname)
return (Nodetype.BASE, name)
def visit_string_type(self, children: Any) -> Any:
"""Prrocess string type specifier."""
assert len(children) in [2, 4]
if len(children) == 4:
return (Nodetype.BASE, 'string', children[2])
return (Nodetype.BASE, 'string')
def visit_scoped_name(self, children: Any) -> Any:
"""Process scoped name."""
if len(children) == 2:
return (Nodetype.NAME, children[1])
assert len(children) == 3
assert children[1][1] == '::'
return (Nodetype.NAME, f'{children[0][1]}/{children[2][1]}')
def visit_identifier(self, children: Any) -> Any:
"""Process identifier."""
return (Nodetype.NAME, children)
def visit_expression(self, children: Any) -> Any:
"""Process expression, literals are assumed to be integers only."""
if children[0] in [
Nodetype.LITERAL_STRING,
Nodetype.LITERAL_NUMBER,
Nodetype.LITERAL_BOOLEAN,
Nodetype.LITERAL_CHAR,
Nodetype.NAME,
]:
return children
assert len(children) in [2, 3]
if len(children) == 3:
assert isinstance(children[0][1], int)
assert isinstance(children[2][1], int)
return (Nodetype.EXPRESSION_BINARY, children[1], children[0][1], children[2])
assert len(children) == 2
assert isinstance(children[1][1], int), children
return (Nodetype.EXPRESSION_UNARY, children[0][1], children[1])
def visit_boolean_literal(self, children: Any) -> Any:
"""Process boolean literal."""
return (Nodetype.LITERAL_BOOLEAN, children[1] == 'TRUE')
def visit_float_literal(self, children: Any) -> Any:
"""Process float literal."""
return (Nodetype.LITERAL_NUMBER, float(children))
def visit_decimal_literal(self, children: Any) -> Any:
"""Process decimal integer literal."""
return (Nodetype.LITERAL_NUMBER, int(children))
def visit_octal_literal(self, children: Any) -> Any:
"""Process octal integer literal."""
return (Nodetype.LITERAL_NUMBER, int(children, 8))
def visit_hexadecimal_literal(self, children: Any) -> Any:
"""Process hexadecimal integer literal."""
return (Nodetype.LITERAL_NUMBER, int(children, 16))
def visit_character_literal(self, children: Any) -> Any:
"""Process char literal."""
return (Nodetype.LITERAL_CHAR, children[1])
def visit_string_literals(self, children: Any) -> Any:
"""Process string literal."""
return (
Nodetype.LITERAL_STRING,
''.join(y for x in children for y in x if y and y[0] != Rule.LIT),
)
def get_types_from_idl(text: str) -> Typesdict:
"""Get types from idl message definition.
Args:
text: Message definition.
Returns:
List of message message names and parsetrees.
"""
return parse_message_definition(VisitorIDL(), text)
+215
View File
@@ -0,0 +1,215 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""MSG Parser.
Grammar, parse tree visitor and conversion functions for message definitions in
`MSG`_ format. It also supports concatened message definitions as found in
Rosbag1 connection information.
.. _MSG: http://wiki.ros.org/msg
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from .base import Nodetype, parse_message_definition
from .peg import Rule, Visitor, parse_grammar
if TYPE_CHECKING:
from typing import Any, List
from .base import Typesdict
GRAMMAR_MSG = r"""
specification
= msgdef (msgsep msgdef)*
msgdef
= r'MSG:\s' scoped_name definition+
msgsep
= r'================================================================================'
definition
= comment
/ const_dcl
/ field_dcl
comment
= r'#[^\n]*'
const_dcl
= type_spec identifier '=' r'[^=][^\n]*'
field_dcl
= type_spec identifier
type_spec
= array_type_spec
/ simple_type_spec
array_type_spec
= simple_type_spec array_size
simple_type_spec
= scoped_name
array_size
= '[' integer_literal? ']'
integer_literal
= r'[-+]?[1-9][0-9]+'
/ r'[-+]?[0-9]'
scoped_name
= identifier '/' scoped_name
/ identifier
identifier
= r'[a-zA-Z_][a-zA-Z_0-9]*'
"""
def normalize_msgtype(name: str) -> str:
"""Normalize message typename.
Args:
name: Message typename.
Returns:
Normalized name.
"""
path = Path(name)
if path.parent.name != 'msg':
return str(path.parent / 'msg' / path.name)
return name
def normalize_fieldtype(field: Any, names: List[str]):
"""Normalize field typename.
Args:
field: Field definition.
names: Valid message names.
"""
dct = {Path(name).name: name for name in names}
namedef = field[0]
if namedef[0] == Nodetype.NAME:
name = namedef[1]
elif namedef[0] == Nodetype.SEQUENCE:
name = namedef[1][1]
else:
name = namedef[2][1]
if name in VisitorMSG.BASETYPES:
inamedef = (Nodetype.BASE, name)
else:
if name in dct:
name = dct[name]
elif '/msg/' not in name:
ptype = Path(name)
name = str(ptype.parent / 'msg' / ptype.name)
inamedef = (Nodetype.NAME, name)
if namedef[0] == Nodetype.NAME:
namedef = inamedef
elif namedef[0] == Nodetype.SEQUENCE:
namedef = (Nodetype.SEQUENCE, inamedef)
else:
namedef = (Nodetype.ARRAY, namedef[1], inamedef)
field[0] = namedef
class VisitorMSG(Visitor):
"""MSG file visitor."""
# pylint: disable=no-self-use
RULES = parse_grammar(GRAMMAR_MSG)
BASETYPES = {
'bool',
'int8',
'int16',
'int32',
'int64',
'uint8',
'uint16',
'uint32',
'uint64',
'float32',
'float64',
'string',
}
def visit_comment(self, children: Any) -> Any:
"""Process comment, suppress output."""
def visit_const_dcl(self, children: Any) -> Any:
"""Process const declaration, suppress output."""
def visit_specification(self, children: Any) -> Typesdict:
"""Process start symbol."""
typelist = [children[0], *[x[1] for x in children[1]]]
typedict = dict(typelist)
names = list(typedict.keys())
for _, fields in typedict.items():
for field in fields:
normalize_fieldtype(field, names)
return typedict
def visit_msgdef(self, children: Any) -> Any:
"""Process single message definition."""
assert len(children) == 3
return normalize_msgtype(children[1][1]), [x for x in children[2] if x is not None]
def visit_msgsep(self, children: Any) -> Any:
"""Process message separator, suppress output."""
def visit_array_type_spec(self, children: Any) -> Any:
"""Process array type specifier."""
length = children[1][1]
if length:
return (Nodetype.ARRAY, int(length[0]), children[0])
return (Nodetype.SEQUENCE, children[0])
def visit_simple_type_spec(self, children: Any) -> Any:
"""Process simple type specifier."""
dct = {
'time': 'builtin_interfaces/msg/Time',
'duration': 'builtin_interfaces/msg/Duration',
'byte': 'uint8',
'char': 'uint8',
}
return Nodetype.NAME, dct.get(children[1], children[1])
def visit_scoped_name(self, children: Any) -> Any:
"""Process scoped name."""
if len(children) == 2:
return children
assert len(children) == 3
return (Nodetype.NAME, '/'.join(x[1] for x in children if x[0] != Rule.LIT))
def visit_identifier(self, children: Any) -> Any:
"""Process identifier."""
return (Nodetype.NAME, children)
def get_types_from_msg(text: str, name: str) -> Typesdict:
"""Get type from msg message definition.
Args:
text: Message definiton.
name: Message typename.
Returns:
List with single message name and parsetree.
"""
return parse_message_definition(VisitorMSG(), f'MSG: {name}\n{text}')
+247
View File
@@ -0,0 +1,247 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""PEG Parser.
Parsing expression grammar inspired parser for simple EBNF-like notations. It
implements just enough features to support parsing of the different ROS message
definition formats.
"""
from __future__ import annotations
import re
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Dict, List, Optional, Tuple
class Rule:
"""Rule base class."""
LIT = 'LITERAL'
WS = re.compile(r'\s+', re.M | re.S)
def __init__(self, value: Any, rules: Dict[str, Rule], name: Optional[str] = None):
"""Initialize.
Args:
value: Value of this rule.
rules: Grammar containing all rules.
name: Name of this rule.
"""
self.value = value
self.rules = rules
self.name = name
def skip_ws(self, text: str, pos: int) -> int:
"""Skip whitespace."""
match = self.WS.match(text, pos)
return match.span()[1] if match else pos
def make_node(self, data: Any) -> Any:
"""Make node for parse tree."""
if self.name:
return {
'node': self.name,
'data': data,
}
return data
def parse(self, text: str, pos: int):
"""Apply rule at position."""
raise NotImplementedError # pragma: no cover
class RuleLiteral(Rule):
"""Rule to match string literal."""
def parse(self, text: str, pos: int) -> Tuple[int, Any]:
"""Apply rule at position."""
value: str = self.value[1:-1].replace('\\\'', '\'')
if text[pos:].startswith(value):
npos = pos + len(value)
npos = self.skip_ws(text, npos)
return npos, (self.LIT, value)
return -1, ()
class RuleRegex(Rule):
"""Rule to match regular expression."""
def parse(self, text: str, pos: int) -> Tuple[int, Any]:
"""Apply rule at position."""
pattern = re.compile(self.value[2:-1], re.M | re.S)
match = pattern.match(text, pos)
if not match:
return -1, []
npos = self.skip_ws(text, match.span()[1])
return npos, self.make_node(match.group())
class RuleToken(Rule):
"""Rule to match token."""
def parse(self, text: str, pos: int) -> Tuple[int, Any]:
"""Apply rule at position."""
token = self.rules[self.value]
npos, data = token.parse(text, pos)
if npos == -1:
return npos, data
return npos, self.make_node(data)
class RuleOneof(Rule):
"""Rule to match first matching subrule."""
def parse(self, text: str, pos: int) -> Tuple[int, Any]:
"""Apply rule at position."""
for value in self.value:
npos, data = value.parse(text, pos)
if npos != -1:
return npos, self.make_node(data)
return -1, []
class RuleSequence(Rule):
"""Rule to match a sequence of subrules."""
def parse(self, text: str, pos: int) -> Tuple[int, Any]:
"""Apply rule at position."""
data = []
npos = pos
for value in self.value:
npos, node = value.parse(text, npos)
if npos == -1:
return -1, []
data.append(node)
return npos, self.make_node(data)
class RuleZeroPlus(Rule):
"""Rule to match zero or more occurences of subrule."""
def parse(self, text: str, pos: int) -> Tuple[int, Any]:
"""Apply rule at position."""
data: List[Any] = []
lpos = pos
while True:
npos, node = self.value.parse(text, lpos)
if npos == -1:
return lpos, self.make_node(data)
data.append(node)
lpos = npos
class RuleOnePlus(Rule):
"""Rule to match one or more occurences of subrule."""
def parse(self, text: str, pos: int) -> Tuple[int, Any]:
"""Apply rule at position."""
npos, node = self.value.parse(text, pos)
if npos == -1:
return -1, []
data = [node]
lpos = npos
while True:
npos, node = self.value.parse(text, lpos)
if npos == -1:
return lpos, self.make_node(data)
data.append(node)
lpos = npos
class RuleZeroOne(Rule):
"""Rule to match zero or one occurence of subrule."""
def parse(self, text: str, pos: int) -> Tuple[int, Any]:
"""Apply rule at position."""
npos, node = self.value.parse(text, pos)
if npos == -1:
return pos, self.make_node([])
return npos, self.make_node([node])
class Visitor: # pylint: disable=too-few-public-methods
"""Visitor transforming parse trees."""
RULES: Dict[str, Rule] = {}
def __init__(self):
"""Initialize."""
self.typedefs = {}
def visit(self, tree: Any) -> Any:
"""Visit all nodes in parse tree."""
if isinstance(tree, list):
return [self.visit(x) for x in tree]
if not isinstance(tree, dict):
return tree
tree['data'] = self.visit(tree['data'])
func = getattr(self, f'visit_{tree["node"]}', lambda x: x)
return func(tree['data'])
def split_token(tok: str) -> List[str]:
"""Split repetition and grouping tokens."""
return list(filter(None, re.split(r'(^\()|(\)(?=[*+?]?$))|([*+?]$)', tok)))
def collapse_tokens(toks: List[Optional[Rule]], rules: Dict[str, Rule]) -> Rule:
"""Collapse linear list of tokens to oneof of sequences."""
value: List[Rule] = []
seq: List[Rule] = []
for tok in toks:
if tok:
seq.append(tok)
else:
value.append(RuleSequence(seq, rules) if len(seq) > 1 else seq[0])
seq = []
value.append(RuleSequence(seq, rules) if len(seq) > 1 else seq[0])
return RuleOneof(value, rules) if len(value) > 1 else value[0]
def parse_grammar(grammar: str) -> Dict[str, Rule]:
"""Parse grammar into rule dictionary."""
rules: Dict[str, Rule] = {}
for token in grammar.split('\n\n'):
lines = token.strip().split('\n')
name, *defs = lines
items = [z for x in defs for y in x.split(' ') if y for z in split_token(y) if z]
assert items
assert items[0] == '='
items.pop(0)
stack: List[Optional[Rule]] = []
parens: List[int] = []
while items:
tok = items.pop(0)
if tok in ['*', '+', '?']:
stack[-1] = {
'*': RuleZeroPlus,
'+': RuleOnePlus,
'?': RuleZeroOne,
}[tok](stack[-1], rules)
elif tok == '/':
stack.append(None)
elif tok == '(':
parens.append(len(stack))
elif tok == ')':
index = parens.pop()
rule = collapse_tokens(stack[index:], rules)
stack = stack[:index]
stack.append(rule)
elif len(tok) > 2 and tok[:2] == 'r\'':
stack.append(RuleRegex(tok, rules))
elif tok[0] == '\'':
stack.append(RuleLiteral(tok, rules))
else:
stack.append(RuleToken(tok, rules))
res = collapse_tokens(stack, rules)
res.name = name
rules[name] = res
return rules
+112
View File
@@ -0,0 +1,112 @@
# Copyright 2020-2021 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""Code generators and registration functions for the extensible type system."""
from __future__ import annotations
import json
import sys
from importlib.util import module_from_spec, spec_from_loader
from typing import TYPE_CHECKING
from . import types
from .base import TypesysError
if TYPE_CHECKING:
from .base import Typesdict
def generate_python_code(typs: Typesdict) -> str:
"""Generate python code from types dictionary.
Args:
typs: Dictionary mapping message typenames to parsetrees.
Returns:
Code for importable python module.
"""
lines = [
'# Copyright 2020-2021 Ternaris.',
'# SPDX-License-Identifier: Apache-2.0',
'#',
'# THIS FILE IS GENERATED, DO NOT EDIT',
'"""ROS2 message types."""',
'',
'# flake8: noqa N801',
'# pylint: disable=invalid-name,too-many-instance-attributes,too-many-lines',
'',
'from __future__ import annotations',
'',
'from dataclasses import dataclass',
'from typing import TYPE_CHECKING',
'',
'if TYPE_CHECKING:',
' from typing import Any',
'',
'',
]
for name, fields in typs.items():
pyname = name.replace('/', '__')
lines += [
'@dataclass',
f'class {pyname}:',
f' """Class for {name}."""',
'',
*[f' {fname[1]}: Any' for _, fname in fields],
]
lines += [
'',
'',
]
lines += ['FIELDDEFS = {']
for name, fields in typs.items():
pyname = name.replace('/', '__')
lines += [
f' \'{name}\': [',
*[
f' ({repr(fname[1])}, {json.loads(json.dumps(ftype))}),'
for ftype, fname in fields
],
' ],',
]
lines += [
'}',
'',
]
return '\n'.join(lines)
def register_types(typs: Typesdict) -> None:
"""Register types in type system.
Args:
typs: Dictionary mapping message typenames to parsetrees.
Raises:
TypesysError: Type already present with different definition.
"""
code = generate_python_code(typs)
name = 'rosbags.usertypes'
spec = spec_from_loader(name, loader=None)
module = module_from_spec(spec)
sys.modules[name] = module
exec(code, module.__dict__) # pylint: disable=exec-used
fielddefs = module.FIELDDEFS # type: ignore
for name, fields in fielddefs.items():
if name == 'std_msgs/msg/Header':
continue
if have := types.FIELDDEFS.get(name):
have = [(x[0].lower(), x[1]) for x in have]
fields = [(x[0].lower(), x[1]) for x in fields]
if have != fields:
raise TypesysError(f'Type {name!r} is already present with different definition.')
for name in fielddefs.keys() - types.FIELDDEFS.keys():
pyname = name.replace('/', '__')
setattr(types, pyname, getattr(module, pyname))
types.FIELDDEFS[name] = fielddefs[name]
File diff suppressed because it is too large Load Diff