greenhouse/docs/examples/edit_rosbags_remove_topic.py

39 lines
1.1 KiB
Python
Raw Normal View History

2022-01-31 13:35:42 +01:00
"""Example: Remove topic."""
from __future__ import annotations
from typing import TYPE_CHECKING, cast
2022-01-31 13:35:42 +01:00
from rosbags.interfaces import ConnectionExtRosbag2
2022-01-31 13:35:42 +01:00
from rosbags.rosbag2 import Reader, Writer
if TYPE_CHECKING:
from pathlib import Path
def remove_topic(src: Path, dst: Path, topic: str) -> None:
"""Remove topic from rosbag2.
Args:
src: Source path.
dst: Destination path.
topic: Name of topic to remove.
"""
with Reader(src) as reader, Writer(dst) as writer:
conn_map = {}
2022-04-21 15:15:10 +02:00
for conn in reader.connections:
2022-01-31 13:35:42 +01:00
if conn.topic == topic:
continue
ext = cast(ConnectionExtRosbag2, conn.ext)
2022-01-31 13:35:42 +01:00
conn_map[conn.id] = writer.add_connection(
conn.topic,
conn.msgtype,
ext.serialization_format,
ext.offered_qos_profiles,
2022-01-31 13:35:42 +01:00
)
rconns = [reader.connections[x] for x in conn_map]
for conn, timestamp, data in reader.messages(connections=rconns):
writer.write(conn_map[conn.id], timestamp, data)