ROS2とPython asyncioを組み合わせた非同期処理【LLM API統合の実装例】
ROS2のLLM統合を実装しているとき、一番困ったのが「LLM APIの呼び出し(非同期・低速)」と「ロボット制御(同期・高速)」の組み合わせです。
素直に実装しようとするとこんなエラーや問題が出ます。
# NG: コールバック内でAPI呼び出しをブロックすると制御が止まる
def scan_callback(self, msg):
response = requests.post(...) # ここでブロック → cmd_velが止まる!
self.publish_command(response)
この記事では、スレッドセーフにasyncioとROS2を組み合わせる方法を実装例付きで解説します。
問題の整理
ROS2の spin() はイベントループを自分で管理していて、Pythonの asyncio イベントループとは別物です。
ROS2 spin()
├── コールバック1(10Hz)
├── コールバック2(30Hz)
└── タイマー(1Hz)
asyncio event loop(別スレッド)
├── async関数1
└── async関数2
これらを直接混ぜることはできませんが、asyncio.run_coroutine_threadsafe() でスレッド間通信ができます。
基本パターン:asyncioループを別スレッドで動かす
import asyncio
import threading
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
class AsyncROS2Node(Node):
def __init__(self):
super().__init__('async_node')
# asyncioループを別スレッドで起動
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(
target=self._run_loop,
daemon=True,
name='asyncio_thread'
)
self._thread.start()
def _run_loop(self):
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
def run_async(self, coro) -> asyncio.Future:
"""コルーチンをasyncioスレッドで実行"""
return asyncio.run_coroutine_threadsafe(coro, self._loop)
def destroy_node(self):
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join(timeout=5.0)
super().destroy_node()
実装例:LLM APIをROS2コールバックから呼ぶ
import anthropic
from std_msgs.msg import String
from geometry_msgs.msg import Twist
from concurrent.futures import Future
class LLMAsyncNode(AsyncROS2Node):
def __init__(self):
super().__init__('llm_async_node')
self.client = anthropic.AsyncAnthropic()
self.cmd_pub = self.create_publisher(Twist, 'cmd_vel', 10)
self.text_sub = self.create_subscription(
String, 'user_command', self.on_command, 10
)
self._pending_future: Future | None = None
self.get_logger().info('LLM非同期ノード起動')
def on_command(self, msg: String):
"""ROS2コールバック(同期)"""
# すでに処理中の場合はスキップ
if self._pending_future and not self._pending_future.done():
self.get_logger().warn('前のリクエスト処理中。スキップ')
return
self.get_logger().info(f'コマンド受信: {msg.data}')
# asyncioスレッドで非同期処理を開始
self._pending_future = self.run_async(
self.process_command(msg.data)
)
# 完了時のコールバックを登録
self._pending_future.add_done_callback(self.on_llm_done)
async def process_command(self, text: str) -> dict | None:
"""非同期でLLM APIを呼ぶ"""
try:
response = await asyncio.wait_for(
self.client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=128,
messages=[{
"role": "user",
"content": f"ロボットコマンドをJSONで返してください: {text}"
}]
),
timeout=5.0 # 5秒でタイムアウト
)
import json
return json.loads(response.content[0].text)
except asyncio.TimeoutError:
self.get_logger().warn('LLM APIタイムアウト')
return None
except Exception as e:
self.get_logger().error(f'LLM APIエラー: {e}')
return None
def on_llm_done(self, future: Future):
"""LLM処理完了後のコールバック(asyncioスレッドから呼ばれる)"""
try:
result = future.result()
except Exception as e:
self.get_logger().error(f'処理エラー: {e}')
return
if result is None:
return
# ROSのパブリッシュはスレッドセーフではないので注意
# create_timer で次のspinサイクルに実行する
self._queued_result = result
self.create_timer(0.0, self._publish_result) # 即時実行
def _publish_result(self):
if not hasattr(self, '_queued_result'):
return
result = self._queued_result
del self._queued_result
cmd = Twist()
cmd.linear.x = float(result.get('linear_x', 0.0))
cmd.angular.z = float(result.get('angular_z', 0.0))
self.cmd_pub.publish(cmd)
self.get_logger().info(f'コマンド実行: {result}')
より簡単な方法:MultiThreadedExecutor + concurrent.futures
asyncioを使わず、concurrent.futures.ThreadPoolExecutor でシンプルに書く方法もあります。
from concurrent.futures import ThreadPoolExecutor
import anthropic
class ThreadedLLMNode(Node):
def __init__(self):
super().__init__('threaded_llm_node')
self.executor_pool = ThreadPoolExecutor(max_workers=2)
self.client = anthropic.Anthropic() # 同期クライアント
self.cmd_pub = self.create_publisher(Twist, 'cmd_vel', 10)
self.sub = self.create_subscription(
String, 'user_command', self.on_command, 10
)
self._processing = False
def on_command(self, msg: String):
if self._processing:
return
self._processing = True
# スレッドプールで実行
future = self.executor_pool.submit(self.call_llm, msg.data)
future.add_done_callback(self.on_done)
def call_llm(self, text: str) -> dict | None:
"""別スレッドで同期的にAPIを呼ぶ"""
try:
response = self.client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=128,
messages=[{"role": "user", "content": text}],
timeout=5.0
)
import json
return json.loads(response.content[0].text)
except Exception as e:
self.get_logger().error(f'エラー: {e}')
return None
def on_done(self, future):
self._processing = False
result = future.result()
if result:
# ROSのパブリッシュは ROS spinスレッドから呼ぶ方が安全
# ここでは簡単のため直接呼ぶ
cmd = Twist()
cmd.linear.x = float(result.get('linear_x', 0.0))
self.cmd_pub.publish(cmd)
def main():
rclpy.init()
node = ThreadedLLMNode()
# MultiThreadedExecutorを使う(コールバックの並列実行を許可)
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(node)
executor.spin()
node.executor_pool.shutdown()
node.destroy_node()
rclpy.shutdown()
どちらを使うべきか
| asyncio | ThreadPoolExecutor | |
|---|---|---|
| コード複雑さ | 高い | 低い |
| パフォーマンス | 高い(I/O多重化) | 普通 |
| スレッド数 | 少なくて済む | API呼び出しごとにスレッド |
| 向いている場面 | 同時に多数のAPIリクエスト | 呼び出し頻度が低い場合 |
LLM APIの呼び出しは秒単位で遅く、頻度も低い(1〜10秒に1回程度)ので、ThreadPoolExecutorの方がシンプルで十分なことが多いです。
注意点まとめ
- ROS2のパブリッシュはROS spinスレッドから呼ぶのが安全(他スレッドから呼ぶと競合することがある)
- タイムアウトを必ず設定(APIが返ってこなくてもロボット制御が止まらないように)
- 前のリクエスト処理中のガード(連続してリクエストが来ても1つだけ処理)
- MultiThreadedExecutorを使う(SingleThreadedExecutorだとコールバックが並列実行されない)