본문으로 바로가기

1. 최종 목표

이번 실습에서 만들 구조는 다음과 같습니다.

ros2_lab_ws
└── src
    ├── robot_interfaces
    │   └── msg
    │       └── RobotStatus.msg
    │
    └── topic_practice
        └── topic_practice
            ├── robot_status_pub.py
            └── robot_status_sub.py
 

 

전체 동작 흐름은 다음과 같습니다.

robot_status_pub
        ↓
/robot_status Topic
        ↓
robot_status_sub
 

 

Publisher는 로봇 상태를 보냅니다.

robot_id: R-001
mode: AUTO
battery_percent: 87.5
emergency_stop: false
 

 

Subscriber는 이 메시지를 받아서 출력합니다.

 

 

 

2. 워크스페이스 생성

먼저 ROS 2 워크스페이스를 만듭니다.

 
mkdir -p ~/ros2_lab_ws/src
cd ~/ros2_lab_ws/src
 

 

현재 구조는 이렇게 됩니다.

~/ros2_lab_ws
└── src
 

 

 

 

3. msg 폴더 생성

RobotStatus.msg 파일을 넣을 msg 폴더를 만듭니다.

 
cd ~/ros2_lab_ws/src/robot_interfaces
mkdir msg
 
 
 
 
4. RobotStatus.msg 작성

파일을 생성합니다.

 
touch msg/RobotStatus.msg
 

 

내용은 다음과 같이 작성합니다.

string robot_id
string mode
float32 battery_percent
bool emergency_stop
 

 

각 필드 의미는 다음과 같습니다.

 

                                           필드                                                   타입                                               의미

robot_id string 로봇 ID
mode string 현재 동작 모드
battery_percent float32 배터리 잔량
emergency_stop bool 비상 정지 상태

 

 

 

5. topic_practice 패키지 생성

이제 Publisher와 Subscriber를 만들 Python 패키지를 생성합니다.

 
cd ~/ros2_lab_ws/src

ros2 pkg create topic_practice --build-type ament_python --dependencies rclpy robot_interfaces
 

 

생성 후 구조는 대략 다음과 같습니다.

topic_practice
├── package.xml
├── setup.py
├── setup.cfg
├── resource
│   └── topic_practice
└── topic_practice
    └── __init__.py
 

 

 

 

6. Publisher 코드 작성

Publisher 파일을 만듭니다.

 
cd ~/ros2_lab_ws/src/topic_practice/topic_practice
nano robot_status_pub.py
 

 

아래 코드를 작성합니다.

 
import rclpy
from rclpy.node import Node

from robot_interfaces.msg import RobotStatus


class RobotStatusPublisher(Node):
    def __init__(self):
        super().__init__('robot_status_pub')

        self.publisher_ = self.create_publisher(
            RobotStatus,
            '/robot_status',
            10
        )

        self.timer = self.create_timer(1.0, self.timer_callback)

        self.count = 0
        self.modes = ['IDLE', 'MANUAL', 'AUTO', 'ERROR']

    def timer_callback(self):
        self.count += 1

        msg = RobotStatus()
        msg.robot_id = f'R-{self.count:03d}'
        msg.mode = self.modes[self.count % len(self.modes)]
        msg.battery_percent = max(0.0, 100.0 - self.count * 1.25)
        msg.emergency_stop = msg.mode == 'ERROR'

        self.publisher_.publish(msg)

        self.get_logger().info(
            f'Publish: robot_id={msg.robot_id}, '
            f'mode={msg.mode}, '
            f'battery={msg.battery_percent:.2f}, '
            f'emergency={msg.emergency_stop}'
        )


def main(args=None):
    rclpy.init(args=args)

    node = RobotStatusPublisher()

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass

    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
 

 

 

 

7. Subscriber 코드 작성

Subscriber 파일을 만듭니다.

 
nano robot_status_sub.py
 

 

아래 코드를 작성합니다.

 
import rclpy
from rclpy.node import Node

from robot_interfaces.msg import RobotStatus


class RobotStatusSubscriber(Node):
    def __init__(self):
        super().__init__('robot_status_sub')

        self.subscription = self.create_subscription(
            RobotStatus,
            '/robot_status',
            self.listener_callback,
            10
        )

    def listener_callback(self, msg):
        if msg.emergency_stop:
            self.get_logger().error(
                f'Emergency detected: {msg.robot_id} mode={msg.mode}'
            )
        else:
            self.get_logger().info(
                f'Receive: {msg.robot_id} | '
                f'{msg.mode} | '
                f'battery={msg.battery_percent:.2f} | '
                f'emergency={msg.emergency_stop}'
            )


def main(args=None):
    rclpy.init(args=args)

    node = RobotStatusSubscriber()

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass

    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
 

 

 

 

8. robot_interfaces/package.xml 수정

robot_interfaces/package.xml을 엽니다.

아래 내용을 추가합니다.
 
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
 

 

전체에서 중요한 부분은 이런 형태입니다.

 
<?xml version="1.0"?>
<package format="3">
  <name>robot_interfaces</name>
  <version>0.0.0</version>
  <description>Custom interfaces for robot examples</description>
  <maintainer email="user@example.com">user</maintainer>
  <license>Apache-2.0</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <buildtool_depend>rosidl_default_generators</buildtool_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>

  <member_of_group>rosidl_interface_packages</member_of_group>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>
 

 

여기서 중요 포인트는 이것입니다.

 
<member_of_group>rosidl_interface_packages</member_of_group>
 

 

이게 빠지면 인터페이스 패키지 빌드에서 문제가 날 수 있습니다.

 

 

 

9. robot_interfaces/CMakeLists.txt 수정

CMakeLists.txt를 엽니다.

내용을 다음처럼 정리합니다.
 
cmake_minimum_required(VERSION 3.8)
project(robot_interfaces)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/RobotStatus.msg"
)

ament_export_dependencies(rosidl_default_runtime)

ament_package()
 

 

 

10. setup.py 수정

ros2 run으로 실행하려면 setup.py에 실행 파일을 등록해야 합니다.

entry_points 부분을 다음처럼 수정합니다.
 
entry_points={
    'console_scripts': [
        'robot_status_pub = topic_practice.robot_status_pub:main',
        'robot_status_sub = topic_practice.robot_status_sub:main',
    ],
},
 

 

전체 예시는 다음과 같습니다.

 
from setuptools import find_packages, setup

package_name = 'topic_practice'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='user',
    maintainer_email='user@example.com',
    description='ROS 2 topic practice package',
    license='Apache-2.0',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'robot_status_pub = topic_practice.robot_status_pub:main',
            'robot_status_sub = topic_practice.robot_status_sub:main',
        ],
    },
)
 

 

 

 

11. topic_practice/package.xml 확인

topic_practice는 rclpy와 robot_interfaces를 사용합니다.

아래 의존성이 있는지 확인합니다.
 
<depend>rclpy</depend>
<depend>robot_interfaces</depend>
 

 

예시는 다음과 같습니다.

 
<?xml version="1.0"?>
<package format="3">
  <name>topic_practice</name>
  <version>0.0.0</version>
  <description>ROS 2 topic practice package</description>
  <maintainer email="user@example.com">user</maintainer>
  <license>Apache-2.0</license>

  <depend>rclpy</depend>
  <depend>robot_interfaces</depend>

  <test_depend>ament_copyright</test_depend>
  <test_depend>ament_flake8</test_depend>
  <test_depend>ament_pep257</test_depend>
  <test_depend>python3-pytest</test_depend>

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>
 
 
 
 
 

12. 빌드하기

워크스페이스 루트로 이동합니다.

 
cd ~/ros2_lab_ws
 

 

빌드합니다.

 
colcon build --symlink-install
 

 

빌드가 끝나면 환경 설정을 적용합니다.

 
source install/setup.bash
 

 

매우 중요합니다.
빌드 후 source install/setup.bash를 하지 않으면 새 메시지나 실행 파일을 ROS 2가 못 찾습니다.

 

 

 

13. 메시지 생성 확인

사용자 정의 메시지가 정상 등록되었는지 확인합니다.

 
ros2 interface show robot_interfaces/msg/RobotStatus
 

 

정상이라면 다음처럼 출력됩니다.

string robot_id
string mode
float32 battery_percent
bool emergency_stop
 
 
 
 
 

14. Publisher 실행

첫 번째 터미널에서 실행합니다.

 
cd ~/ros2_lab_ws
source install/setup.bash
ros2 run topic_practice robot_status_pub
 

 

정상 출력 예시는 다음과 같습니다.

[INFO] [robot_status_pub]: Publish: robot_id=R-001, mode=MANUAL, battery=98.75, emergency=False
[INFO] [robot_status_pub]: Publish: robot_id=R-001, mode=AUTO, battery=97.50, emergency=False
[INFO] [robot_status_pub]: Publish: robot_id=R-001, mode=ERROR, battery=96.25, emergency=True
[INFO] [robot_status_pub]: Publish: robot_id=R-001, mode=IDLE, battery=95.00, emergency=False
 

 

확인할 것:

mode가 계속 바뀐다.
battery가 계속 줄어든다.
mode가 ERROR일 때 emergency_stop이 True가 된다.
 

 

 

15. Subscriber 실행

두 번째 터미널을 엽니다.

 
cd ~/ros2_lab_ws
source install/setup.bash
ros2 run topic_practice robot_status_sub
 

 

정상 출력 예시는 다음과 같습니다.

[INFO] [robot_status_sub]: Receive: R-001 | MANUAL | battery=98.75 | emergency=False
[INFO] [robot_status_sub]: Receive: R-001 | AUTO | battery=97.50 | emergency=False
[ERROR] [robot_status_sub]: Emergency detected: R-001 mode=ERROR
[INFO] [robot_status_sub]: Receive: R-001 | IDLE | battery=95.00 | emergency=False
 

 

여기서 핵심은 이것입니다.

mode=ERROR이면 Subscriber 로그가 [ERROR]로 출력된다.
 

 

즉, Subscriber가 단순 출력만 하는 게 아니라 메시지 내용을 보고 조건 판단까지 합니다.

 

 

 

16. CLI로 Topic 확인

Publisher가 실행 중인 상태에서 새 터미널을 엽니다.

 
cd ~/ros2_lab_ws
source install/setup.bash
 

 

Topic 목록 확인:

 
ros2 topic list
 

 

정상이라면 /robot_status가 보여야 합니다.

/robot_status
/parameter_events
/rosout
 

 

Topic 데이터 확인:

 
ros2 topic echo /robot_status
 

 

출력 예시:

robot_id: R-001
mode: AUTO
battery_percent: 97.5
emergency_stop: false
---
robot_id: R-001
mode: ERROR
battery_percent: 96.25
emergency_stop: true
---
 

 

Topic 주기 확인:

 
ros2 topic hz /robot_status
 

 

1초마다 발행하므로 약 1Hz가 나와야 합니다.

average rate: 1.000
 

 

 

 

17. 실습 확장 과제

 

과제 1. 배터리 20% 이하 경고 추가

 

Subscriber의 listener_callback()에 아래 조건을 추가합니다.

 
if msg.battery_percent <= 20.0:
    self.get_logger().warn(
        f'Low battery: {msg.robot_id} battery={msg.battery_percent:.2f}'
    )
 
 
 
 
 

과제 2. 로봇 ID 여러 개 사용

Publisher에서 로봇 ID를 리스트로 만듭니다.

 
self.robot_ids = ['R-001', 'R-002', 'R-003']
 

 

그리고 callback에서 바꿔가며 넣습니다.

 
msg.robot_id = self.robot_ids[self.count % len(self.robot_ids)]
 

 

 

 

과제 3. mode를 숫자로 바꾸기

실무에서는 문자열보다 숫자 상수를 쓰는 경우가 많습니다.

 

RobotStatus.msg를 다음처럼 바꿀 수 있습니다.

uint8 MODE_IDLE=0
uint8 MODE_MANUAL=1
uint8 MODE_AUTO=2
uint8 MODE_ERROR=3

string robot_id
uint8 mode
float32 battery_percent
bool emergency_stop
 

 

Publisher 예제 수정

 

기존에는 문자열로 이렇게 작성했을 수 있습니다.

 
msg.mode = "AUTO"
 

 

이제는 RobotStatus.msg에 정의한 상수를 사용합니다.

msg.mode = RobotStatus.MODE_AUTO
 

 

전체 예시는 다음과 같습니다.

 
import rclpy
from rclpy.node import Node

from my_robot_interfaces.msg import RobotStatus


class RobotStatusPublisher(Node):

    def __init__(self):
        super().__init__('robot_status_publisher')

        self.publisher = self.create_publisher(
            RobotStatus,
            'robot_status',
            10
        )
        
        self.robot_ids = ['R-001', 'R-002', 'R-003']

        self.timer = self.create_timer(1.0, self.publish_status)

    def publish_status(self):
        msg = RobotStatus()

        msg.robot_id = self.robot_ids[self.count % len(self.robot_ids)]
        msg.mode = RobotStatus.MODE_AUTO
        msg.battery_percent = 87.5
        msg.emergency_stop = False

        self.publisher.publish(msg)

        self.get_logger().info(
            f'Publish: robot_id={msg.robot_id}, '
            f'mode={msg.mode}, '
            f'battery={msg.battery_percent}, '
            f'emergency_stop={msg.emergency_stop}'
        )


def main(args=None):
    rclpy.init(args=args)

    node = RobotStatusPublisher()
    rclpy.spin(node)

    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
 

 

 

 

Subscriber 예제 수정

Subscriber에서는 숫자 값을 직접 비교하지 말고, 메시지에 정의된 상수를 사용합니다.

 
if msg.mode == RobotStatus.MODE_AUTO:
    print("자동 주행 모드입니다.")
 

 

전체 예시는 다음과 같습니다.

 
import rclpy
from rclpy.node import Node

from my_robot_interfaces.msg import RobotStatus


class RobotStatusSubscriber(Node):

    def __init__(self):
        super().__init__('robot_status_subscriber')

        self.subscription = self.create_subscription(
            RobotStatus,
            'robot_status',
            self.status_callback,
            10
        )

    def status_callback(self, msg):
        if msg.mode == RobotStatus.MODE_IDLE:
            mode_text = 'IDLE'

        elif msg.mode == RobotStatus.MODE_MANUAL:
            mode_text = 'MANUAL'

        elif msg.mode == RobotStatus.MODE_AUTO:
            mode_text = 'AUTO'

        elif msg.mode == RobotStatus.MODE_ERROR:
            mode_text = 'ERROR'

        else:
            mode_text = 'UNKNOWN'

        self.get_logger().info(
            f'Receive: robot_id={msg.robot_id}, '
            f'mode={mode_text}, '
            f'battery={msg.battery_percent}, '
            f'emergency_stop={msg.emergency_stop}'
        )


def main(args=None):
    rclpy.init(args=args)

    node = RobotStatusSubscriber()
    rclpy.spin(node)

    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

 

 

 

18. 전체 명령어 요약

처음부터 한 번에 정리하면 다음과 같습니다.

 
mkdir -p ~/ros2_lab_ws/src
cd ~/ros2_lab_ws/src

ros2 pkg create robot_interfaces --build-type ament_cmake

cd ~/ros2_lab_ws/src/robot_interfaces
mkdir msg
touch msg/RobotStatus.msg
 

 

RobotStatus.msg:

string robot_id
string mode
float32 battery_percent
bool emergency_stop
 

 

패키지 생성:

 
cd ~/ros2_lab_ws/src

ros2 pkg create topic_practice \
  --build-type ament_python \
  --dependencies rclpy robot_interfaces
 

 

Publisher 작성:

 
cd ~/ros2_lab_ws/src/topic_practice/topic_practice
gedit robot_status_pub.py
 

 

Subscriber 작성:

 
gedit robot_status_sub.py
 

 

빌드:

 
cd ~/ros2_lab_ws
colcon build --symlink-install
source install/setup.bash
 

 

메시지 확인:

 
ros2 interface show robot_interfaces/msg/RobotStatus
 

 

Publisher 실행:

 
ros2 run topic_practice robot_status_pub
 

 

Subscriber 실행:

 
ros2 run topic_practice robot_status_sub
 

 

Topic 확인:

 
ros2 topic list
ros2 topic echo /robot_status
ros2 topic hz /robot_status
 

 

 

 

728x90
728x90