1. 최종 목표
이번 실습에서 만들 구조는 다음과 같습니다.
ros2_lab_ws
└── src
├── robot_interfaces
│ └── msg
│ └── RobotStatus.msg
│
└── topic_practice
└── topic_practice
├── robot_status_pub.py
└── robot_status_sub.py
전체 동작 흐름은 다음과 같습니다.
robot_status_pub
↓
/robot_status Topic
↓
robot_status_sub
Publisher는 로봇 상태를 보냅니다.
robot_id: R-001
mode: AUTO
battery_percent: 87.5
emergency_stop: false
Subscriber는 이 메시지를 받아서 출력합니다.
2. 워크스페이스 생성
먼저 ROS 2 워크스페이스를 만듭니다.
mkdir -p ~/ros2_lab_ws/src
cd ~/ros2_lab_ws/src
현재 구조는 이렇게 됩니다.
~/ros2_lab_ws
└── src
3. msg 폴더 생성
RobotStatus.msg 파일을 넣을 msg 폴더를 만듭니다.
cd ~/ros2_lab_ws/src/robot_interfaces
mkdir msg
파일을 생성합니다.
touch msg/RobotStatus.msg
내용은 다음과 같이 작성합니다.
string robot_id
string mode
float32 battery_percent
bool emergency_stop
필드 타입 의미
| robot_id | string | 로봇 ID |
| mode | string | 현재 동작 모드 |
| battery_percent | float32 | 배터리 잔량 |
| emergency_stop | bool | 비상 정지 상태 |
5. topic_practice 패키지 생성
이제 Publisher와 Subscriber를 만들 Python 패키지를 생성합니다.
cd ~/ros2_lab_ws/src
ros2 pkg create topic_practice --build-type ament_python --dependencies rclpy robot_interfaces
생성 후 구조는 대략 다음과 같습니다.
topic_practice
├── package.xml
├── setup.py
├── setup.cfg
├── resource
│ └── topic_practice
└── topic_practice
└── __init__.py
6. Publisher 코드 작성
Publisher 파일을 만듭니다.
cd ~/ros2_lab_ws/src/topic_practice/topic_practice
nano robot_status_pub.py
아래 코드를 작성합니다.
import rclpy
from rclpy.node import Node
from robot_interfaces.msg import RobotStatus
class RobotStatusPublisher(Node):
def __init__(self):
super().__init__('robot_status_pub')
self.publisher_ = self.create_publisher(
RobotStatus,
'/robot_status',
10
)
self.timer = self.create_timer(1.0, self.timer_callback)
self.count = 0
self.modes = ['IDLE', 'MANUAL', 'AUTO', 'ERROR']
def timer_callback(self):
self.count += 1
msg = RobotStatus()
msg.robot_id = f'R-{self.count:03d}'
msg.mode = self.modes[self.count % len(self.modes)]
msg.battery_percent = max(0.0, 100.0 - self.count * 1.25)
msg.emergency_stop = msg.mode == 'ERROR'
self.publisher_.publish(msg)
self.get_logger().info(
f'Publish: robot_id={msg.robot_id}, '
f'mode={msg.mode}, '
f'battery={msg.battery_percent:.2f}, '
f'emergency={msg.emergency_stop}'
)
def main(args=None):
rclpy.init(args=args)
node = RobotStatusPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
7. Subscriber 코드 작성
Subscriber 파일을 만듭니다.
nano robot_status_sub.py
아래 코드를 작성합니다.
import rclpy
from rclpy.node import Node
from robot_interfaces.msg import RobotStatus
class RobotStatusSubscriber(Node):
def __init__(self):
super().__init__('robot_status_sub')
self.subscription = self.create_subscription(
RobotStatus,
'/robot_status',
self.listener_callback,
10
)
def listener_callback(self, msg):
if msg.emergency_stop:
self.get_logger().error(
f'Emergency detected: {msg.robot_id} mode={msg.mode}'
)
else:
self.get_logger().info(
f'Receive: {msg.robot_id} | '
f'{msg.mode} | '
f'battery={msg.battery_percent:.2f} | '
f'emergency={msg.emergency_stop}'
)
def main(args=None):
rclpy.init(args=args)
node = RobotStatusSubscriber()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
8. robot_interfaces/package.xml 수정
robot_interfaces/package.xml을 엽니다.
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
전체에서 중요한 부분은 이런 형태입니다.
<?xml version="1.0"?>
<package format="3">
<name>robot_interfaces</name>
<version>0.0.0</version>
<description>Custom interfaces for robot examples</description>
<maintainer email="user@example.com">user</maintainer>
<license>Apache-2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
여기서 중요 포인트는 이것입니다.
<member_of_group>rosidl_interface_packages</member_of_group>
이게 빠지면 인터페이스 패키지 빌드에서 문제가 날 수 있습니다.
9. robot_interfaces/CMakeLists.txt 수정
CMakeLists.txt를 엽니다.
cmake_minimum_required(VERSION 3.8)
project(robot_interfaces)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/RobotStatus.msg"
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()
10. setup.py 수정
ros2 run으로 실행하려면 setup.py에 실행 파일을 등록해야 합니다.
entry_points={
'console_scripts': [
'robot_status_pub = topic_practice.robot_status_pub:main',
'robot_status_sub = topic_practice.robot_status_sub:main',
],
},
전체 예시는 다음과 같습니다.
from setuptools import find_packages, setup
package_name = 'topic_practice'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='user',
maintainer_email='user@example.com',
description='ROS 2 topic practice package',
license='Apache-2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'robot_status_pub = topic_practice.robot_status_pub:main',
'robot_status_sub = topic_practice.robot_status_sub:main',
],
},
)
11. topic_practice/package.xml 확인
topic_practice는 rclpy와 robot_interfaces를 사용합니다.
<depend>rclpy</depend>
<depend>robot_interfaces</depend>
예시는 다음과 같습니다.
<?xml version="1.0"?>
<package format="3">
<name>topic_practice</name>
<version>0.0.0</version>
<description>ROS 2 topic practice package</description>
<maintainer email="user@example.com">user</maintainer>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>robot_interfaces</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
12. 빌드하기
워크스페이스 루트로 이동합니다.
cd ~/ros2_lab_ws
빌드합니다.
colcon build --symlink-install
빌드가 끝나면 환경 설정을 적용합니다.
source install/setup.bash
매우 중요합니다.
빌드 후 source install/setup.bash를 하지 않으면 새 메시지나 실행 파일을 ROS 2가 못 찾습니다.
13. 메시지 생성 확인
사용자 정의 메시지가 정상 등록되었는지 확인합니다.
ros2 interface show robot_interfaces/msg/RobotStatus
정상이라면 다음처럼 출력됩니다.
string robot_id
string mode
float32 battery_percent
bool emergency_stop
14. Publisher 실행
첫 번째 터미널에서 실행합니다.
cd ~/ros2_lab_ws
source install/setup.bash
ros2 run topic_practice robot_status_pub
정상 출력 예시는 다음과 같습니다.
[INFO] [robot_status_pub]: Publish: robot_id=R-001, mode=MANUAL, battery=98.75, emergency=False
[INFO] [robot_status_pub]: Publish: robot_id=R-001, mode=AUTO, battery=97.50, emergency=False
[INFO] [robot_status_pub]: Publish: robot_id=R-001, mode=ERROR, battery=96.25, emergency=True
[INFO] [robot_status_pub]: Publish: robot_id=R-001, mode=IDLE, battery=95.00, emergency=False
확인할 것:
mode가 계속 바뀐다.
battery가 계속 줄어든다.
mode가 ERROR일 때 emergency_stop이 True가 된다.
15. Subscriber 실행
두 번째 터미널을 엽니다.
cd ~/ros2_lab_ws
source install/setup.bash
ros2 run topic_practice robot_status_sub
정상 출력 예시는 다음과 같습니다.
[INFO] [robot_status_sub]: Receive: R-001 | MANUAL | battery=98.75 | emergency=False
[INFO] [robot_status_sub]: Receive: R-001 | AUTO | battery=97.50 | emergency=False
[ERROR] [robot_status_sub]: Emergency detected: R-001 mode=ERROR
[INFO] [robot_status_sub]: Receive: R-001 | IDLE | battery=95.00 | emergency=False
여기서 핵심은 이것입니다.
mode=ERROR이면 Subscriber 로그가 [ERROR]로 출력된다.
즉, Subscriber가 단순 출력만 하는 게 아니라 메시지 내용을 보고 조건 판단까지 합니다.
16. CLI로 Topic 확인
Publisher가 실행 중인 상태에서 새 터미널을 엽니다.
cd ~/ros2_lab_ws
source install/setup.bash
Topic 목록 확인:
ros2 topic list
정상이라면 /robot_status가 보여야 합니다.
/robot_status
/parameter_events
/rosout
Topic 데이터 확인:
ros2 topic echo /robot_status
출력 예시:
robot_id: R-001
mode: AUTO
battery_percent: 97.5
emergency_stop: false
---
robot_id: R-001
mode: ERROR
battery_percent: 96.25
emergency_stop: true
---
Topic 주기 확인:
ros2 topic hz /robot_status
1초마다 발행하므로 약 1Hz가 나와야 합니다.
average rate: 1.000
17. 실습 확장 과제
과제 1. 배터리 20% 이하 경고 추가
Subscriber의 listener_callback()에 아래 조건을 추가합니다.
if msg.battery_percent <= 20.0:
self.get_logger().warn(
f'Low battery: {msg.robot_id} battery={msg.battery_percent:.2f}'
)
과제 2. 로봇 ID 여러 개 사용
Publisher에서 로봇 ID를 리스트로 만듭니다.
self.robot_ids = ['R-001', 'R-002', 'R-003']
그리고 callback에서 바꿔가며 넣습니다.
msg.robot_id = self.robot_ids[self.count % len(self.robot_ids)]
과제 3. mode를 숫자로 바꾸기
실무에서는 문자열보다 숫자 상수를 쓰는 경우가 많습니다.
RobotStatus.msg를 다음처럼 바꿀 수 있습니다.
uint8 MODE_IDLE=0
uint8 MODE_MANUAL=1
uint8 MODE_AUTO=2
uint8 MODE_ERROR=3
string robot_id
uint8 mode
float32 battery_percent
bool emergency_stop
Publisher 예제 수정
기존에는 문자열로 이렇게 작성했을 수 있습니다.
msg.mode = "AUTO"
이제는 RobotStatus.msg에 정의한 상수를 사용합니다.
msg.mode = RobotStatus.MODE_AUTO
전체 예시는 다음과 같습니다.
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import RobotStatus
class RobotStatusPublisher(Node):
def __init__(self):
super().__init__('robot_status_publisher')
self.publisher = self.create_publisher(
RobotStatus,
'robot_status',
10
)
self.robot_ids = ['R-001', 'R-002', 'R-003']
self.timer = self.create_timer(1.0, self.publish_status)
def publish_status(self):
msg = RobotStatus()
msg.robot_id = self.robot_ids[self.count % len(self.robot_ids)]
msg.mode = RobotStatus.MODE_AUTO
msg.battery_percent = 87.5
msg.emergency_stop = False
self.publisher.publish(msg)
self.get_logger().info(
f'Publish: robot_id={msg.robot_id}, '
f'mode={msg.mode}, '
f'battery={msg.battery_percent}, '
f'emergency_stop={msg.emergency_stop}'
)
def main(args=None):
rclpy.init(args=args)
node = RobotStatusPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Subscriber 예제 수정
Subscriber에서는 숫자 값을 직접 비교하지 말고, 메시지에 정의된 상수를 사용합니다.
if msg.mode == RobotStatus.MODE_AUTO:
print("자동 주행 모드입니다.")
전체 예시는 다음과 같습니다.
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import RobotStatus
class RobotStatusSubscriber(Node):
def __init__(self):
super().__init__('robot_status_subscriber')
self.subscription = self.create_subscription(
RobotStatus,
'robot_status',
self.status_callback,
10
)
def status_callback(self, msg):
if msg.mode == RobotStatus.MODE_IDLE:
mode_text = 'IDLE'
elif msg.mode == RobotStatus.MODE_MANUAL:
mode_text = 'MANUAL'
elif msg.mode == RobotStatus.MODE_AUTO:
mode_text = 'AUTO'
elif msg.mode == RobotStatus.MODE_ERROR:
mode_text = 'ERROR'
else:
mode_text = 'UNKNOWN'
self.get_logger().info(
f'Receive: robot_id={msg.robot_id}, '
f'mode={mode_text}, '
f'battery={msg.battery_percent}, '
f'emergency_stop={msg.emergency_stop}'
)
def main(args=None):
rclpy.init(args=args)
node = RobotStatusSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
18. 전체 명령어 요약
처음부터 한 번에 정리하면 다음과 같습니다.
mkdir -p ~/ros2_lab_ws/src
cd ~/ros2_lab_ws/src
ros2 pkg create robot_interfaces --build-type ament_cmake
cd ~/ros2_lab_ws/src/robot_interfaces
mkdir msg
touch msg/RobotStatus.msg
RobotStatus.msg:
string robot_id
string mode
float32 battery_percent
bool emergency_stop
패키지 생성:
cd ~/ros2_lab_ws/src
ros2 pkg create topic_practice \
--build-type ament_python \
--dependencies rclpy robot_interfaces
Publisher 작성:
cd ~/ros2_lab_ws/src/topic_practice/topic_practice
gedit robot_status_pub.py
Subscriber 작성:
gedit robot_status_sub.py
빌드:
cd ~/ros2_lab_ws
colcon build --symlink-install
source install/setup.bash
메시지 확인:
ros2 interface show robot_interfaces/msg/RobotStatus
Publisher 실행:
ros2 run topic_practice robot_status_pub
Subscriber 실행:
ros2 run topic_practice robot_status_sub
Topic 확인:
ros2 topic list
ros2 topic echo /robot_status
ros2 topic hz /robot_status
'강좌 > ROS2' 카테고리의 다른 글
| Cancel 테스트용 Action Client 만들기 (0) | 2026.05.26 |
|---|---|
| Python 기본 타입 메시지 Publisher / Subscriber 예제 (0) | 2026.05.26 |
| 로봇 개발자를 위한 Python 기초 교육 #3 (0) | 2026.05.25 |
| VS Code 원격 개발 환경 (0) | 2026.05.24 |
| ROS 2 Humble rqt Plugins 정리 #3 (0) | 2026.05.24 |