ROS2の基本概念をわかりやすく解説【ノード・トピック・サービス・アクション】
ROS2を学び始めたとき、最初に混乱するのが「ノード」「トピック」「サービス」「アクション」の違いです。この記事では実際に動くPythonコードを使って丁寧に説明します。
ノード(Node)
ノードはROS2の最小実行単位です。1つのプロセス=1つのノードと考えると分かりやすいです。
たとえばカメラを制御するノード、画像処理をするノード、ロボットを動かすノードのように、機能ごとに分けて実装します。
import rclpy
from rclpy.node import Node
class MyNode(Node):
def __init__(self):
super().__init__('my_node') # ノード名(ros2 node listで表示される名前)
self.get_logger().info('ノード起動!')
# タイマー:1秒ごとにcallbackを呼ぶ
self.timer = self.create_timer(1.0, self.timer_callback)
def timer_callback(self):
self.get_logger().info('1秒経過')
def main():
rclpy.init()
node = MyNode()
rclpy.spin(node) # ノードを動かし続ける
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
# 別ターミナルでノードを確認
ros2 node list
# /my_node
トピック(Topic)
トピックは一方向の非同期通信です。センサーデータの送受信によく使います。
パブリッシャー → [トピック名] → サブスクライバー
(送信側) (受信側)
パブリッシャー(送信側)
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class TalkerNode(Node):
def __init__(self):
super().__init__('talker')
# トピック名: 'chatter', メッセージ型: String, キューサイズ: 10
self.publisher = self.create_publisher(String, 'chatter', 10)
self.timer = self.create_timer(0.5, self.publish_message)
self.count = 0
def publish_message(self):
msg = String()
msg.data = f'Hello ROS2! count={self.count}'
self.publisher.publish(msg)
self.get_logger().info(f'送信: {msg.data}')
self.count += 1
サブスクライバー(受信側)
class ListenerNode(Node):
def __init__(self):
super().__init__('listener')
self.subscription = self.create_subscription(
String, # メッセージ型
'chatter', # トピック名(パブリッシャーと一致させる)
self.on_message, # コールバック関数
10 # キューサイズ
)
def on_message(self, msg: String):
self.get_logger().info(f'受信: {msg.data}')
# トピックの確認
ros2 topic list
ros2 topic echo /chatter # 流れているデータを表示
ros2 topic hz /chatter # 周波数を確認
ros2 topic info /chatter # パブリッシャー・サブスクライバー数
よく使うメッセージ型:
| パッケージ | 型 | 用途 |
|---|---|---|
std_msgs |
String, Float64, Bool |
基本型 |
geometry_msgs |
Twist, Pose, PoseStamped |
位置・速度 |
sensor_msgs |
LaserScan, Image, Imu |
センサー |
nav_msgs |
OccupancyGrid, Path |
ナビゲーション |
サービス(Service)
サービスは双方向の同期通信です。「リクエストを送ったらレスポンスが返ってくる」形式です。
クライアント → [リクエスト] → サーバー
← [レスポンス] ←
一時的な処理(パラメーター変更・ファイル保存・計算)に使います。
サービスサーバー
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class AddTwoIntsServer(Node):
def __init__(self):
super().__init__('add_two_ints_server')
self.srv = self.create_service(
AddTwoInts, # サービス型
'add_two_ints', # サービス名
self.handle_request
)
self.get_logger().info('サービス準備完了')
def handle_request(self, request, response):
response.sum = request.a + request.b
self.get_logger().info(f'{request.a} + {request.b} = {response.sum}')
return response
サービスクライアント
from example_interfaces.srv import AddTwoInts
class AddTwoIntsClient(Node):
def __init__(self):
super().__init__('add_two_ints_client')
self.client = self.create_client(AddTwoInts, 'add_two_ints')
# サービスが起動するまで待機
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('サービス待機中...')
def send_request(self, a: int, b: int):
req = AddTwoInts.Request()
req.a = a
req.b = b
future = self.client.call_async(req)
rclpy.spin_until_future_complete(self, future)
return future.result()
# 使い方
client_node = AddTwoIntsClient()
result = client_node.send_request(3, 5)
print(f'結果: {result.sum}') # 8
# コマンドラインからサービスを呼ぶ
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 3, b: 5}"
アクション(Action)
アクションは長時間かかる処理に使います。途中でフィードバックを受け取ったり、キャンセルしたりできます。
クライアント → [ゴール送信] → サーバー
← [フィードバック] ← (処理中に何度も)
← [結果] ← (完了時に1回)
ナビゲーション(「目的地まで移動して」)や、アームの動作などに使われます。
from rclpy.action import ActionServer
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_server')
self._action_server = ActionServer(
self, Fibonacci, 'fibonacci', self.execute
)
async def execute(self, goal_handle):
self.get_logger().info(f'フィボナッチ数列を{goal_handle.request.order}個計算します')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]
for i in range(1, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1]
)
goal_handle.publish_feedback(feedback_msg) # 途中経過を送信
await asyncio.sleep(0.5)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result
まとめ:どれを使うべきか
| 通信方式 | 方向 | 同期 | キャンセル | 典型的な用途 |
|---|---|---|---|---|
| トピック | 一方向 | 非同期 | ✗ | センサーデータ・速度指令 |
| サービス | 双方向 | 同期 | ✗ | 設定変更・単純な計算 |
| アクション | 双方向 | 非同期 | ✅ | ナビゲーション・長時間処理 |
迷ったら「センサーデータ → トピック」「一回限りの処理 → サービス」「長い処理 → アクション」で判断するとシンプルです。
次回はROS2 + Nav2を使った自律ナビゲーションの実装を解説します。