greenhouse/tests/test_highlevel.py
2023-03-02 12:55:12 +01:00

263 lines
8.8 KiB
Python

# Copyright 2020-2023 Ternaris.
# SPDX-License-Identifier: Apache-2.0
"""Reader tests."""
from __future__ import annotations
from typing import TYPE_CHECKING
from unittest.mock import patch
import pytest
from rosbags.highlevel import AnyReader, AnyReaderError
from rosbags.interfaces import Connection
from rosbags.rosbag1 import Writer as Writer1
from rosbags.rosbag2 import Writer as Writer2
if TYPE_CHECKING:
from pathlib import Path
from typing import Sequence
HEADER = b'\x00\x01\x00\x00'
@pytest.fixture()
def bags1(tmp_path: Path) -> list[Path]:
"""Test data fixture."""
paths = [
tmp_path / 'ros1_1.bag',
tmp_path / 'ros1_2.bag',
tmp_path / 'ros1_3.bag',
tmp_path / 'bad.bag',
]
with (Writer1(paths[0])) as writer:
topic1 = writer.add_connection('/topic1', 'std_msgs/msg/Int8')
topic2 = writer.add_connection('/topic2', 'std_msgs/msg/Int16')
writer.write(topic1, 1, b'\x01')
writer.write(topic2, 2, b'\x02\x00')
writer.write(topic1, 9, b'\x09')
with (Writer1(paths[1])) as writer:
topic1 = writer.add_connection('/topic1', 'std_msgs/msg/Int8')
writer.write(topic1, 5, b'\x05')
with (Writer1(paths[2])) as writer:
topic2 = writer.add_connection('/topic2', 'std_msgs/msg/Int16')
writer.write(topic2, 15, b'\x15\x00')
paths[3].touch()
return paths
@pytest.fixture()
def bags2(tmp_path: Path) -> list[Path]:
"""Test data fixture."""
paths = [
tmp_path / 'ros2_1',
tmp_path / 'bad',
]
with (Writer2(paths[0])) as writer:
topic1 = writer.add_connection('/topic1', 'std_msgs/msg/Int8')
topic2 = writer.add_connection('/topic2', 'std_msgs/msg/Int16')
writer.write(topic1, 1, HEADER + b'\x01')
writer.write(topic2, 2, HEADER + b'\x02\x00')
writer.write(topic1, 9, HEADER + b'\x09')
writer.write(topic1, 5, HEADER + b'\x05')
writer.write(topic2, 15, HEADER + b'\x15\x00')
paths[1].mkdir()
(paths[1] / 'metadata.yaml').write_text(':')
return paths
def test_anyreader1(bags1: Sequence[Path]) -> None: # pylint: disable=redefined-outer-name
"""Test AnyReader on rosbag1."""
# pylint: disable=too-many-statements
with pytest.raises(AnyReaderError, match='at least one'):
AnyReader([])
with pytest.raises(AnyReaderError, match='missing'):
AnyReader([bags1[0] / 'badname'])
reader = AnyReader(bags1)
with pytest.raises(AssertionError):
assert reader.topics
with pytest.raises(AssertionError):
next(reader.messages())
reader = AnyReader(bags1)
with pytest.raises(AnyReaderError, match='seems to be empty'):
reader.open()
assert all(not x.bio for x in reader.readers) # type: ignore[union-attr]
with AnyReader(bags1[:3]) as reader:
assert reader.duration == 15
assert reader.start_time == 1
assert reader.end_time == 16
assert reader.message_count == 5
assert list(reader.topics.keys()) == ['/topic1', '/topic2']
assert len(reader.topics['/topic1'].connections) == 2
assert reader.topics['/topic1'].msgcount == 3
assert len(reader.topics['/topic2'].connections) == 2
assert reader.topics['/topic2'].msgcount == 2
gen = reader.messages()
nxt = next(gen)
assert nxt[0].topic == '/topic1'
assert nxt[1:] == (1, b'\x01')
msg = reader.deserialize(nxt[2], nxt[0].msgtype)
assert msg.data == 1 # type: ignore
nxt = next(gen)
assert nxt[0].topic == '/topic2'
assert nxt[1:] == (2, b'\x02\x00')
msg = reader.deserialize(nxt[2], nxt[0].msgtype)
assert msg.data == 2 # type: ignore
nxt = next(gen)
assert nxt[0].topic == '/topic1'
assert nxt[1:] == (5, b'\x05')
msg = reader.deserialize(nxt[2], nxt[0].msgtype)
assert msg.data == 5 # type: ignore
nxt = next(gen)
assert nxt[0].topic == '/topic1'
assert nxt[1:] == (9, b'\x09')
msg = reader.deserialize(nxt[2], nxt[0].msgtype)
assert msg.data == 9 # type: ignore
nxt = next(gen)
assert nxt[0].topic == '/topic2'
assert nxt[1:] == (15, b'\x15\x00')
msg = reader.deserialize(nxt[2], nxt[0].msgtype)
assert msg.data == 21 # type: ignore
with pytest.raises(StopIteration):
next(gen)
gen = reader.messages(connections=reader.topics['/topic1'].connections)
nxt = next(gen)
assert nxt[0].topic == '/topic1'
nxt = next(gen)
assert nxt[0].topic == '/topic1'
nxt = next(gen)
assert nxt[0].topic == '/topic1'
with pytest.raises(StopIteration):
next(gen)
def test_anyreader2(bags2: list[Path]) -> None: # pylint: disable=redefined-outer-name
"""Test AnyReader on rosbag2."""
# pylint: disable=too-many-statements
with pytest.raises(AnyReaderError, match='multiple rosbag2'):
AnyReader(bags2)
with pytest.raises(AnyReaderError, match='YAML'):
AnyReader([bags2[1]])
with AnyReader([bags2[0]]) as reader:
assert reader.duration == 15
assert reader.start_time == 1
assert reader.end_time == 16
assert reader.message_count == 5
assert list(reader.topics.keys()) == ['/topic1', '/topic2']
assert len(reader.topics['/topic1'].connections) == 1
assert reader.topics['/topic1'].msgcount == 3
assert len(reader.topics['/topic2'].connections) == 1
assert reader.topics['/topic2'].msgcount == 2
gen = reader.messages()
nxt = next(gen)
assert nxt[0].topic == '/topic1'
assert nxt[1:] == (1, HEADER + b'\x01')
msg = reader.deserialize(nxt[2], nxt[0].msgtype)
assert msg.data == 1 # type: ignore
nxt = next(gen)
assert nxt[0].topic == '/topic2'
assert nxt[1:] == (2, HEADER + b'\x02\x00')
msg = reader.deserialize(nxt[2], nxt[0].msgtype)
assert msg.data == 2 # type: ignore
nxt = next(gen)
assert nxt[0].topic == '/topic1'
assert nxt[1:] == (5, HEADER + b'\x05')
msg = reader.deserialize(nxt[2], nxt[0].msgtype)
assert msg.data == 5 # type: ignore
nxt = next(gen)
assert nxt[0].topic == '/topic1'
assert nxt[1:] == (9, HEADER + b'\x09')
msg = reader.deserialize(nxt[2], nxt[0].msgtype)
assert msg.data == 9 # type: ignore
nxt = next(gen)
assert nxt[0].topic == '/topic2'
assert nxt[1:] == (15, HEADER + b'\x15\x00')
msg = reader.deserialize(nxt[2], nxt[0].msgtype)
assert msg.data == 21 # type: ignore
with pytest.raises(StopIteration):
next(gen)
gen = reader.messages(connections=reader.topics['/topic1'].connections)
nxt = next(gen)
assert nxt[0].topic == '/topic1'
nxt = next(gen)
assert nxt[0].topic == '/topic1'
nxt = next(gen)
assert nxt[0].topic == '/topic1'
with pytest.raises(StopIteration):
next(gen)
def test_anyreader2_autoregister(bags2: list[Path]) -> None: # pylint: disable=redefined-outer-name
"""Test AnyReader on rosbag2."""
class MockReader:
"""Mock reader."""
# pylint: disable=too-few-public-methods
def __init__(self, paths: list[Path]):
"""Initialize mock."""
_ = paths
self.metadata = {'storage_identifier': 'mcap'}
self.connections = [
Connection(
1,
'/foo',
'test_msg/msg/Foo',
'string foo',
'msg',
0,
None, # type: ignore
self,
),
Connection(
2,
'/bar',
'test_msg/msg/Bar',
'module test_msgs { module msg { struct Bar {string bar;}; }; };',
'idl',
0,
None, # type: ignore
self,
),
Connection(
3,
'/baz',
'test_msg/msg/Baz',
'',
'',
0,
None, # type: ignore
self,
),
]
def open(self) -> None:
"""Unused."""
with patch('rosbags.highlevel.anyreader.Reader2', MockReader), \
patch('rosbags.highlevel.anyreader.register_types') as mock_register_types:
AnyReader([bags2[0]]).open()
mock_register_types.assert_called_once()
assert mock_register_types.call_args[0][0] == {
'test_msg/msg/Foo': ([], [('foo', (1, 'string'))]),
'test_msgs/msg/Bar': ([], [('bar', (1, 'string'))]),
}