이번 단계에서는 ROS 2 Action Server를 실제 turtlesim 제어에 연결합니다.
앞에서는 Action Server가 Goal을 받고 Result와 Feedback을 반환하는 구조를 확인했다면, 이번에는 다음 기능을 함께 구현합니다.
1. Action Client가 이동 명령을 보냅니다.
2. Action Server가 목표 거리와 속도 값을 받습니다.
3. /turtle1/cmd_vel 토픽으로 turtlesim에 속도 명령을 발행합니다.
4. /turtle1/pose 토픽을 구독해서 현재 위치를 계속 확인합니다.
5. 이동한 거리를 누적 계산합니다.
6. 목표 거리까지 남은 거리를 Feedback으로 반환합니다.
7. 목표 지점에 도달하면 Result를 반환하고 Action을 종료합니다.
이 구조는 단순한 예제가 아니라 실제 로봇 이동 제어의 기본 패턴입니다.
특히 모바일 로봇, 드론, AGV, AMR에서 “목표 지점까지 이동하면서 진행 상태를 계속 확인하는 구조”와 매우 비슷합니다.
1. 전체 구조
이번 프로그램은 크게 두 개의 노드 역할로 나눌 수 있습니다.
DistTurtleServer
├─ Action Server
├─ /turtle1/cmd_vel Publisher
├─ 이동 거리 계산
├─ Feedback 발행
└─ Result 반환
TurtleSub_Action
├─ /turtle1/pose Subscriber
└─ 현재 pose를 Action Server에 전달
두 노드는 MultiThreadedExecutor에 함께 등록됩니다.
executor.add_node(sub)
executor.add_node(ac)
이렇게 하면 Subscriber와 Action Server가 동시에 동작할 수 있습니다.
2. 코드 구성
a. import 구성
이번 코드에서 필요한 주요 모듈은 다음과 같습니다.
import rclpy as rp
from rclpy.action import ActionServer
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from turtlesim.msg import Pose
from geometry_msgs.msg import Twist
from my_first_package_msgs.action import DistTurtle
from my_first_package.my_subscriber import TurtlesimSubscriber
import math
import time
각 import의 역할은 다음과 같습니다.
| rclpy | ROS 2 Python 클라이언트 라이브러리입니다. |
| ActionServer | Action Server를 생성할 때 사용합니다. |
| MultiThreadedExecutor | 여러 callback을 동시에 처리하기 위해 사용합니다. |
| Node | ROS 2 노드 클래스를 만들기 위한 기본 클래스입니다. |
| Pose | turtlesim의 현재 위치와 자세 정보입니다. |
| Twist | turtlesim에 속도 명령을 보내기 위한 메시지입니다. |
| DistTurtle | 직접 만든 Action 인터페이스입니다. |
| math | 거리 계산을 위해 사용합니다. |
| time | 반복문 주기 조절에 사용합니다. |
3. Pose 구독 클래스
turtle1/pose 토픽을 구독하는 클래스를 따로 만듭니다.
class TurtleSub_Action(TurtlesimSubscriber):
def __init__(self, ac_server):
super().__init__()
self.ac_server = ac_server
def pose_callback(self, msg):
self.ac_server.current_pose = msg
이 클래스는 기존에 만들어 둔 TurtlesimSubscriber를 상속합니다.
TurtlesimSubscriber 클래스의 pose_callback() 함수를 오버라이딩합니다.
핵심은 pose 메시지를 받을 때마다 Action Server의 current_pose 값을 경신한다는 점입니다.
self.ac_server.current_pose = msg
즉, Subscriber가 받은 최신 위치 정보가 Action Server 내부로 전달됩니다.
이 방식은 학습용으로 이해하기 쉽습니다.
다만 실무에서는 노드 간 상태 공유를 할 때 callback group, lock, topic 재설계 등을 고려하는 것이 좋습니다.
4. Action Server 초기화
Action Server 클래스는 다음과 같은 역할을 합니다.
class DistTurtleServer(Node):
def __init__(self):
super().__init__('dist_turtle_action_server')
self.total_dist = 0
self.is_first_time = True
self.current_pose = Pose()
self.previous_pose = Pose()
self.publisher = self.create_publisher(
Twist,
'/turtle1/cmd_vel',
10
)
self.action_server = ActionServer(
self,
DistTurtle,
'dist_turtle',
self.execute_callback
)
여기서 중요한 변수는 다음과 같습니다.
| total_dist | 지금까지 누적 이동한 거리입니다. |
| is_first_time | 첫 pose 계산인지 확인하는 플래그입니다. |
| current_pose | 현재 turtlesim 위치입니다. |
| previous_pose | 직전 위치입니다. |
| publisher | /turtle1/cmd_vel로 속도 명령을 발행합니다. |
| action_server | /dist_turtle Action Server입니다. |
5. 이동 거리 계산 함수
거북이가 이동한 거리는 현재 위치와 이전 위치의 차이로 계산합니다.
def calc_diff_pose(self):
if self.is_first_time:
self.previous_pose.x = self.current_pose.x
self.previous_pose.y = self.current_pose.y
self.is_first_time = False
diff_dist = math.sqrt(
(self.current_pose.x - self.previous_pose.x) ** 2 +
(self.current_pose.y - self.previous_pose.y) ** 2
)
self.previous_pose = self.current_pose
return diff_dist
계산식은 2차원 평면 거리 공식입니다.
거리 = sqrt((현재 x - 이전 x)^2 + (현재 y - 이전 y)^2)
이 방식은 실제 로봇에서도 자주 사용됩니다.
예를 들어 odometry, GPS, UWB, SLAM 위치 정보를 받아서 이동 거리를 계산할 때 같은 구조를 사용할 수 있습니다.
단, 실제 로봇에서는 센서 노이즈가 있기 때문에 이동 거리 계산에 필터링이 추가되는 경우가 많습니다.
6. Action 실행 callback
Action Client가 Goal을 보내면 execute_callback()이 실행됩니다.
def execute_callback(self, goal_handle):
feedback_msg = DistTurtle.Feedback()
msg = Twist()
msg.linear.x = goal_handle.request.linear_x
msg.angular.z = goal_handle.request.angular_z
while True:
self.total_dist += self.calc_diff_pose()
feedback_msg.remained_dist = (
goal_handle.request.dist - self.total_dist
)
goal_handle.publish_feedback(feedback_msg)
self.publisher.publish(msg)
time.sleep(0.01)
if feedback_msg.remained_dist < 0.2:
break
goal_handle.succeed()
result = DistTurtle.Result()
result.pos_x = self.current_pose.x
result.pos_y = self.current_pose.y
result.pos_theta = self.current_pose.theta
result.result_dist = self.total_dist
self.total_dist = 0
self.is_first_time = True
return result
이 함수의 흐름은 다음과 같습니다.
1. Feedback 메시지 객체를 만듭니다.
2. Goal에서 linear_x, angular_z 값을 가져옵니다.
3. Twist 메시지에 속도 값을 넣습니다.
4. 반복문을 돌면서 이동 거리를 누적합니다.
5. 목표 거리에서 현재 이동 거리를 빼서 남은 거리를 계산합니다.
6. Feedback을 발행합니다.
7. cmd_vel을 발행해서 turtlesim을 움직입니다.
8. 남은 거리가 기준값보다 작으면 반복문을 종료합니다.
9. Result에 최종 위치와 이동 거리를 넣어 반환합니다.
7. 종료 조건
코드에서는 다음 조건으로 이동을 멈춥니다.
if feedback_msg.remained_dist < 0.2:
break
즉, 남은 거리가 0.2보다 작아지면 목표에 도달했다고 판단합니다.
실제 로봇에서는 정확히 0이 되는 경우가 거의 없습니다.
센서 오차, 제어 지연, 바퀴 미끄러짐, 통신 지연 등이 있기 때문입니다.
그래서 보통은 허용 오차를 둡니다.
목표 거리와 실제 이동 거리의 차이가 일정 값 이하이면 도착으로 판단
이런 허용 오차를 tolerance라고 부릅니다.
8. Result 반환
Action이 완료되면 최종 결과를 반환합니다.
result = DistTurtle.Result()
result.pos_x = self.current_pose.x
result.pos_y = self.current_pose.y
result.pos_theta = self.current_pose.theta
result.result_dist = self.total_dist
반환되는 값은 다음과 같습니다.
| pos_x | 최종 x 위치 |
| pos_y | 최종 y 위치 |
| pos_theta | 최종 방향 |
| result_dist | 실제 누적 이동 거리 |
이렇게 하면 Action Client는 단순히 성공 여부만 받는 것이 아니라, 실제 이동 결과까지 확인할 수 있습니다.
9. main 함수와 멀티스레드 적용
전체 실행 구조는 main() 함수에서 구성합니다.
def main(args=None):
rp.init(args=args)
executor = MultiThreadedExecutor()
ac = DistTurtleServer()
sub = TurtleSub_Action(ac_server=ac)
executor.add_node(sub)
executor.add_node(ac)
try:
executor.spin()
finally:
executor.shutdown()
sub.destroy_node()
ac.destroy_node()
rp.shutdown()
여기서 핵심은 다음입니다.
executor = MultiThreadedExecutor()
멀티스레드 실행기를 만듭니다.
ac = DistTurtleServer()
sub = TurtleSub_Action(ac_server=ac)
Action Server와 Subscriber를 각각 생성합니다.
executor.add_node(sub)
executor.add_node(ac)
두 노드를 Executor에 등록합니다.
executor.spin()
등록된 노드들의 callback을 실행합니다.
10. 전체 코드
아래의 위치에서 dist_turtle_action_server.py 파일을 생성합니다.
~/ros2_study/src/my_first_package/ my_first_package/dist_turtle_action_server.py
cd ~/ros2_study/src/my_first_package/ my_first_package/
touch dist_turtle_action_server.py
tree

import rclpy as rp
from rclpy.action import ActionServer
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from turtlesim.msg import Pose
from geometry_msgs.msg import Twist
from my_first_package_msgs.action import DistTurtle
from my_first_package.my_subscriber import TurtlesimSubscriber
import math
import time
class TurtleSub_Action(TurtlesimSubscriber):
def __init__(self, ac_server):
super().__init__()
self.ac_server = ac_server
def pose_callback(self, msg):
self.ac_server.current_pose = msg
class DistTurtleServer(Node):
def __init__(self):
super().__init__('dist_turtle_action_server')
self.total_dist = 0
self.is_first_time = True
self.current_pose = Pose()
self.previous_pose = Pose()
self.publisher = self.create_publisher(
Twist,
'/turtle1/cmd_vel',
10
)
self.action_server = ActionServer(
self,
DistTurtle,
'dist_turtle',
self.execute_callback
)
def calc_diff_pose(self):
if self.is_first_time:
self.previous_pose.x = self.current_pose.x
self.previous_pose.y = self.current_pose.y
self.is_first_time = False
diff_dist = math.sqrt(
(self.current_pose.x - self.previous_pose.x) ** 2 +
(self.current_pose.y - self.previous_pose.y) ** 2
)
self.previous_pose = self.current_pose
return diff_dist
def execute_callback(self, goal_handle):
feedback_msg = DistTurtle.Feedback()
msg = Twist()
msg.linear.x = goal_handle.request.linear_x
msg.angular.z = goal_handle.request.angular_z
while True:
self.total_dist += self.calc_diff_pose()
feedback_msg.remained_dist = (
goal_handle.request.dist - self.total_dist
)
goal_handle.publish_feedback(feedback_msg)
self.publisher.publish(msg)
time.sleep(0.01)
if feedback_msg.remained_dist < 0.2:
break
goal_handle.succeed()
result = DistTurtle.Result()
result.pos_x = self.current_pose.x
result.pos_y = self.current_pose.y
result.pos_theta = self.current_pose.theta
result.result_dist = self.total_dist
self.total_dist = 0
self.is_first_time = True
return result
def main(args=None):
rp.init(args=args)
executor = MultiThreadedExecutor()
ac = DistTurtleServer()
sub = TurtleSub_Action(ac_server=ac)
executor.add_node(sub)
executor.add_node(ac)
try:
executor.spin()
finally:
executor.shutdown()
sub.destroy_node()
ac.destroy_node()
rp.shutdown()
if __name__ == '__main__':
main()

11. setup.py 등록
Python 파일을 ros2 run으로 실행하려면 setup.py에 entry point를 추가해야 합니다.
entry_points={
'console_scripts': [
'dist_turtle_action_server = my_first_package.dist_turtle_action_server:main',
'my_multi_thread = my_first_package.my_multi_thread:main',
],
},

수정 후 반드시 빌드해야 합니다.
cd ~/ros2_study
colcon build
source install/setup.bash
또는
sl

12. 실행 순서
a. turtlesim 실행
첫 번째 터미널에서 실행합니다.
ros2 run turtlesim turtlesim_node

b. Action Server 실행
두 번째 터미널에서 실행합니다.
ros2 run my_first_package dist_turtle_action_server

c. Action Goal 전송
세 번째 터미널에서 Goal을 보냅니다.
ros2 action send_goal --feedback /dist_turtle my_first_package_msgs/action/DistTurtle "{linear_x: 1.0, angular_z: 0.0, dist: 3.0}"
이 명령은 다음 의미입니다.
linear_x: 1.0 → 앞으로 이동하는 속도
angular_z: 0.0 → 회전하지 않음
dist: 3.0 → 3.0만큼 이동
실행하면 turtlesim이 앞으로 이동하고, 터미널에는 남은 거리가 Feedback으로 출력됩니다.


13. rqt_graph로 구조 확인
실행 중에 노드와 토픽 연결을 확인하려면 다음 명령을 사용합니다.
rqt_graph
정상 구조라면 대략 다음 관계가 보입니다.
/dist_turtle_action_server
├─ publish → /turtle1/cmd_vel
└─ action → /dist_turtle
/turtlesim
├─ subscribe ← /turtle1/cmd_vel
└─ publish → /turtle1/pose
/turtlesim_subscriber
└─ subscribe ← /turtle1/pose
이 그래프를 보면 Action Server가 속도 명령을 발행하고, turtlesim이 pose를 발행하며, Subscriber가 pose를 받아오는 구조를 한눈에 확인할 수 있습니다.

14. 이 구조가 중요한 이유
이번 예제의 핵심은 “명령”과 “상태 확인”을 분리했다는 점입니다.
단순히 /cmd_vel만 발행하면 거북이는 움직입니다.
하지만 어디까지 이동했는지, 목표 거리까지 얼마나 남았는지, 실제로 도착했는지는 알 수 없습니다.
그래서 다음 구조가 필요합니다.
cmd_vel 발행 → 로봇 이동
pose 구독 → 현재 위치 확인
거리 계산 → 진행 상태 판단
feedback 발행 → 클라이언트에 중간 상태 제공
result 반환 → 최종 결과 제공
이 구조는 실제 로봇 시스템에서 거의 그대로 확장됩니다.
예를 들어 드론 배송 시스템에서는 다음처럼 바뀔 수 있습니다.
cmd_vel → velocity setpoint
pose → vehicle odometry
dist → waypoint까지 남은 거리
feedback → 현재 거리 오차, 고도 오차, 도착률
result → 도착 여부, 최종 위치, 미션 성공 여부
즉, turtlesim 예제는 장난감 예제가 아니라 실제 자율주행 로봇과 드론 제어의 축소판이라고 볼 수 있습니다.
15. 실무 관점에서 개선할 부분
실무 코드로 쓰려면 몇 가지 개선이 필요합니다.
a. while True 구조 개선
현재는 다음처럼 무한 반복문을 사용합니다.
while True:
실무에서는 취소 요청을 처리해야 합니다.
if goal_handle.is_cancel_requested:
goal_handle.canceled()
return result
Action은 장시간 작업에 쓰이기 때문에 cancel 처리가 중요합니다.
b. 정지 명령 추가
목표 거리에 도달한 뒤에는 속도를 0으로 보내는 것이 안전합니다.
stop_msg = Twist()
self.publisher.publish(stop_msg)
그렇지 않으면 마지막으로 발행된 속도 명령이 잠깐 유지될 수 있습니다.
c. 오차 허용값 변수화
현재는 0.2가 코드에 직접 들어가 있습니다.
if feedback_msg.remained_dist < 0.2:
다음처럼 변수로 빼는 편이 좋습니다.
goal_tolerance = 0.2
이렇게 하면 나중에 로봇 크기나 속도에 따라 쉽게 조정할 수 있습니다.
d. 음수 Feedback 방지
목표 거리보다 더 이동하면 남은 거리가 음수가 될 수 있습니다.
feedback_msg.remained_dist = max(
goal_handle.request.dist - self.total_dist,
0.0
)
이렇게 하면 사용자에게 더 자연스러운 Feedback을 제공할 수 있습니다.
16. 정리
이번 실습에서는 ROS 2 Action Server를 이용해 turtlesim을 지정한 거리만큼 이동시키는 구조를 구현했습니다.
핵심은 Action Server가 단순히 요청만 처리하는 것이 아니라, 이동 명령 발행과 pose 구독을 함께 사용해 실제 이동 거리를 계산한다는 점입니다.
또한 MultiThreadedExecutor를 적용하여 Action callback과 Subscriber callback이 동시에 동작하도록 구성했습니다.
이를 통해 Action Server가 반복문을 수행하는 동안에도 pose 값이 계속 경신될 수 있습니다.
이 구조는 ROS 2에서 장시간 동작하는 로봇 제어 기능을 만들 때 매우 중요합니다.
목표 이동, Waypoint 주행, 로봇팔 동작, 드론 미션 수행처럼 시간이 걸리는 작업은 대부분 Action 구조로 설계하는 것이 적합합니다.
'강좌 > ROS2' 카테고리의 다른 글
| ROS2 launch 작성 (0) | 2026.05.10 |
|---|---|
| 소스에서 파라미터 사용하기 (0) | 2026.05.10 |
| ROS2 MultiThread 기초 (0) | 2026.05.10 |
| ROS2 Action Server 만들기 #2 (0) | 2026.05.10 |
| ROS 2 Action 정의 만들기 (0) | 2026.05.10 |