본문으로 바로가기

Python에서 ROS2 토픽 구독

category 강좌/ROS2 2026. 5. 7. 00:35
728x90
728x90

ROS 2의 Python 클라이언트 라이브러리인 rclpy를 사용하여 turtlesim 노드가 발행하는 /turtle1/pose 토픽을 구독합니다.

최종적으로 Python 코드에서 거북이의 현재 위치와 방향 정보를 받아 다음 값을 출력합니다.

x 좌표
y 좌표
theta 방향
linear_velocity 선속도
angular_velocity 각속도

 

1. 실습환경 준비

 

.bashrc에 ROS2 실행환경을 구축해 놓았기 때문에 새로운 터미널을 실행하면 자동으로 실행됩니다.

 

2개의 터미널을 실행합니다.

먼저 첫번째 터미널에서는 노트북을 실행합니다.

 

cd jupyter_ws/
source .venv/bin/activate
jupyter notebook

 

 

 

다른 터미널에서는 turtlesim 노드를 실행합니다.

 

ros2 run turtlesim turtlesim_node

 

정상적으로 실행되면 파란색 창에 거북이가 표시됩니다.

 

 

 

2. 토픽 정보 확인

 

새로운 터미널을 실행하여 turtlesim이 실행 중일 때 현재 사용 가능한 토픽 목록을 확인합니다.

 

ros2 topic list -t

 

 

주요 토픽은 다음과 같습니다.

/turtle1/pose [turtlesim/msg/Pose]
/turtle1/cmd_vel [geometry_msgs/msg/Twist]
 

여기서 /turtle1/pose는 거북이의 위치와 자세 정보를 발행하는 토픽입니다.
메시지 타입은 turtlesim/msg/Pose입니다.

 

터미널에서 직접 확인하려면 다음 명령을 사용합니다.

 

ros2 topic echo /turtle1/pose

 

 

 

출력 중 토픽 하나는 아래와 같습니다. 연속적으로 토픽이 출력되며 종료는 ctrl+c입니다.

 



 

3. Jupyter Notebook에서 Python 코드 준비

 

파일 메뉴에서 "New"를 선택하고 "Notebook"을 선택합니다.

 

 

작업 폴더는 이미 생성한 jupyter_ws로 지정합니다.

 

 

새로운 탭이 열리면서 노트북이 실행됩니다. 제목은 "Untitled"입니다.

 

 

Untitled 제목을 클릭하면 파일 이름을 수정하는 창이 나타납니다. 여기서 파일 이름을 "sub_test"로 수정합니다. "Rename" 버튼을 클릭하여 수정한 파일 이름을 적용합니다.

 

 

파일 이름이 변경되었습니다.

 

먼저 필요한 모듈을 불러옵니다. 입력창에 아래의 소스를 입력합니다.

 

import rclpy as rp
from turtlesim.msg import Pose

 

 

여기서 rclpy는 ROS 2를 Python에서 사용할 수 있게 해주는 라이브러리입니다. as는 alias의 의미이며 rclpy 모듈을 불러와서 앞으로 rp로 사용하겠다는 것입니다. rclpy는 ROS Clint Library for Python의 약자로 파이썬을 위한 ROS 클라이언트 라이브러리입니다. 앞으로 Python에서 ROS2 프로그램을 작성할 경우 반드시 사용할 한 줄입니다.

 

구독을 원하는 토픽은 turtlesim_node의 /turtle1/pose 입니다.
Pose는 /turtle1/pose 토픽에서 사용하는 메시지 타입입니다. 2번째 줄은 turtlesim/msg/Pose 데이터 타입을 Python에서 어떻게 import 하는지를 보여줍니다. turtlesim/msg 중에 Pose만 import 해서 바로 사용하도록 한 것입니다.

 

 

 

입력창에 입력을 하고 Shit+enter를 입력하면 In[]에 숫자가 입력되고 2줄이 실행됩니다. 현재는 import만 시킨 것이기 때문에 화면에 아무런 변화가 없습니다.

 

 

 

4. ROS2 노드 생성

 

Python 코드에서 ROS 2 기능을 사용하려면 먼저 초기화가 필요합니다.

 

rp.init()
test_node = rp.create_node('sub_test')

 

init() 함수를 호출하여 초기화를 수행합니다. 그다음 줄은 sub_test라는 이름의 노드를 생성합니다. 노드를 생성하기 위해서는 create_node() 함수를 호출하고 인자로 노드의 이름을 입력합니다. 리턴 값은 test_node에 저장합니다.

 

다음 입력 창에 위의 코드를 입력하고 Shift+enter를 입력하여 실행합니다.

 

 

 

 

새로운 노드가 잘 만들어졌는지 확인하기 위해서는 터미널에서 아래의 명령어를 실행하여 노드의 목록을 출력합니다.

 

ros2 node list

 

 

 

위의 그림처럼 새롭게 생성한 "/sub_test" 노드가 목록에 출력됩니다. 현재 sub_test 노드가 실행된 상태입니다.

 

 

 

5. callback 함수 작성

 

토픽을 구독할 때는 메시지를 받을 때마다 실행될 함수를 만들어야 합니다.
이 함수를 callback 함수라고 합니다. 이 함수는 미리 작성해야 합니다.

함수는 def callback(data):로 정의합니다. 입력창에서 한 줄을 입력하고 :을 입력하고 엔터 키를 입력하면 한 줄이 개행되고 커서는 들여 쓰기가 되어 깜빡거립니다.

 

 

 

Python에서는 들여쓰기가 매우 중요합니다. C/C++에서는 {}로 함수를 정의하는데 Python에서는 들여 쓰기로 구분합니다. 따라서 Python에서는 항상 들여 쓰기에 주의하셔야 합니다.

 

나머지 print 문들은 data 토픽을 출력하는 문장들입니다.

 

def callback(data):
    print("----")
    print("/turtle1/pose:", data)
    print("X:", data.x)
    print("Y:", data.y)
    print("Theta:", data.theta)

 

 

data에는 /turtle1/pose에서 들어온 메시지가 저장됩니다.
Pose 메시지는 다음 필드를 가지고 있습니다.

 

x
y
theta
linear_velocity
angular_velocity
 

 

조금 더 실습용으로 보기 좋게 작성하면 다음과 같습니다.

 

def callback(data):
    print("===== Turtle Pose =====")
    print(f"x position        : {data.x}")
    print(f"y position        : {data.y}")
    print(f"theta             : {data.theta}")
    print(f"linear velocity   : {data.linear_velocity}")
    print(f"angular velocity  : {data.angular_velocity}")

 

더보기

(1) 문자열 연결 방식

 
print("x position : " + str(data.x))
 

(2) % 포맷 방식

 
print("x position : %f" % data.x)
 

(3) format() 방식

 
print("x position : {}".format(data.x))
 

(4) f-string 방식 (권장)

 
print(f"x position : {data.x}")
 

Python 3.6 이상에서는 f-string이 가장 직관적이고 빠르며 많이 사용됩니다.

 

 

 

callback 함수의 출력문을 모두 입력하고 shitft+enter를 입력합니다.

 

 

 

6. Subscriber 생성

 

이제 실제로 /turtle1/pose 토픽을 구독합니다.

 

구조는 다음과 같습니다.

 
node.create_subscription(메시지타입, 토픽이름, 콜백함수, QoS)
 

 

각 항목의 의미는 다음과 같습니다. 초급 단계에서는 QoS 값을 10으로 두고 진행하시면 됩니다.

                            항목                                                                     의미
Pose 구독할 메시지 타입
/turtle1/pose 구독할 토픽 이름
callback 메시지를 받을 때 실행할 함수
10 QoS History depth 값

 

아래의 코드를 입력하고 Shift+enter를 입력합니다.

 

test_node.create_subscription(Pose, '/turtle1/pose', callback, 10)

 

 

 

 

7. 메시지 한 번 받아보기

 

Jupyter Notebook에서는 무한 반복 구독보다 먼저 spin_once()로 한 번만 받아보는 것이 안전합니다.

 

rp.spin_once(test_node)

 

 

정상적으로 실행되면 다음과 같은 출력이 나옵니다.

 

===== Turtle Pose =====
x position        : 5.544445
y position        : 5.544445
theta             : 0.0
linear velocity   : 0.0
angular velocity  : 0.0
 

 

turtlesim 창에서 거북이를 움직이지 않았다면 속도 값은 대부분 0.0으로 표시됩니다.

 

 

 

8. 전체코드

 

전체 코드는 다음과 같습니다.

 

import rclpy as rp
from turtlesim.msg import Pose

rp.init()

test_node = rp.create_node('sub_test')

def callback(data):
    print("===== Turtle Pose =====")
    print(f"x position        : {data.x}")
    print(f"y position        : {data.y}")
    print(f"theta             : {data.theta}")
    print(f"linear velocity   : {data.linear_velocity}")
    print(f"angular velocity  : {data.angular_velocity}")

test_node.create_subscription(Pose, '/turtle1/pose', callback, 10)

rp.spin_once(test_node)

 

 

9. Jupyter Notebook 사용 시 주의점

Jupyter Notebook에서 ROS 2 코드를 실행할 때는 셀이 계속 실행 중인 상태가 될 수 있습니다.

 

특히 다음 코드는 계속 대기하는 방식입니다.

 

rp.spin(test_node)

 

 

처음 실습할 때는 아래 코드를 권장합니다.

 

rp.spin_once(test_node)

 

spin_once()는 메시지를 한 번만 받아서 callback 함수를 실행합니다.
그래서 Jupyter Notebook에서 실습하기에 더 안전합니다.

 

 

10. rqt_graph로 구조 확인하기

토픽 연결 구조를 시각적으로 확인하려면 다음 명령을 실행합니다.

 

rqt_graph

 

 

 

즉, /turtlesim 노드가 /turtle1/pose 토픽을 발행하고, 우리가 만든 /sub_test 노드가 해당 토픽을 구독하는 구조입니다.

 

 

11. 추가 설명: 왜 callback 함수가 필요한가?

ROS 2에서 토픽은 계속해서 데이터가 흘러가는 통신 방식입니다.
Subscriber는 데이터를 직접 가져오는 것이 아니라, 데이터가 도착했을 때 자동으로 지정된 함수를 실행합니다.

이때 실행되는 함수가 callback 함수입니다.

즉, 구조는 다음과 같습니다.

 

토픽 메시지 도착
        ↓
callback 함수 자동 실행
        ↓
메시지 데이터 처리
 

 

센서 데이터, 카메라 데이터, 위치 정보, 배터리 상태 등을 처리할 때도 같은 방식이 사용됩니다.

 

 

12. 실무 관점에서의 활용

turtlesim의 /turtle1/pose는 단순한 예제이지만, 실제 로봇에서는 다음과 같은 데이터와 같은 개념으로 연결됩니다.

 

                    turtlesim 예제                                                                                  실제 로봇 적용
/turtle1/pose 로봇 위치 정보
x, y 지도 좌표
theta 로봇 방향
linear_velocity 직진 속도
angular_velocity 회전 속도
subscriber 센서 데이터 수신 노드

 

드론에서는 위치 토픽, IMU 토픽, GPS 토픽, 배터리 토픽 등을 구독할 때 같은 구조를 사용합니다.

예를 들어 PX4 + ROS 2 환경에서는 다음과 같은 토픽을 구독하는 방식과 개념적으로 같습니다.

 

/fmu/out/vehicle_odometry
/fmu/out/vehicle_status
/fmu/out/battery_status

 

 

지금까지 ROS 2 Python 코드로 /turtle1/pose 토픽을 구독하는 방법을 정리했습니다.

핵심은 rclpy 초기화, 노드 생성, callback 함수 작성, subscriber 생성, 그리고 spin_once() 실행입니다.

이 구조는 단순히 turtlesim 예제에만 쓰이는 것이 아니라, 실제 로봇이나 드론에서 센서 데이터를 받아 처리하는 기본 구조이기도 합니다.

 

 

 

13. 토픽 구독 횟수 제한하기

 

앞에서 만든 subscriber는 /turtle1/pose 토픽을 계속 구독합니다.
Jupyter Notebook에서는 무한 실행 상태가 되면 실습 흐름이 끊길 수 있으므로, 특정 횟수만 메시지를 받은 뒤 멈추도록 만들 수 있습니다.

 

예제 구조는 다음과 같습니다.

 

import rclpy as rp
from turtlesim.msg import Pose

rp.init()
test_node = rp.create_node('sub_test')

cnt = 0

def callback(data):
    global cnt
    cnt += 1

    print(">", cnt, "-> X:", data.x, ", Y:", data.y)

    if cnt > 3:
        raise Exception("Subscription Stop")

test_node.create_subscription(Pose, '/turtle1/pose', callback, 10)

rp.spin(test_node)

 

 

cnt 변수는 callback 함수가 몇 번 실행되었는지 세는 역할을 합니다.

 
cnt = 0
 

 

callback 함수 내부에서 외부 변수를 수정하려면 global 키워드를 사용합니다.

 
global cnt
 

 

메시지가 들어올 때마다 cnt 값을 1씩 증가시킵니다.

 
cnt += 1
 

 

그리고 현재 받은 메시지의 좌표를 출력합니다.

 
print(">", cnt, "-> X:", data.x, ", Y:", data.y)
 

 

마지막으로 cnt가 3보다 커지면 예외를 발생시켜 실행을 멈춥니다.

 
if cnt > 3:
    raise Exception("Subscription Stop")
 

 

즉, 실제로는 callback이 4번째 실행될 때 멈추는 구조입니다.

 

 

Jupyter Notebook 실습에서는 rp.spin()이 계속 실행되기 때문에 셀이 멈추지 않는 경우가 많습니다.
이때 예외를 발생시키면 실행 흐름을 강제로 종료할 수 있습니다.

다만 실무 코드에서는 raise Exception()으로 노드를 종료하는 방식은 권장되지 않습니다. 실무에서는 보통 다음과 같이 정리합니다.

 
test_node.destroy_node()
rp.shutdown()
 

 

또는 subscription 객체를 저장해 두었다가 필요할 때 제거합니다.

 
subscription = test_node.create_subscription(
    Pose,
    '/turtle1/pose',
    callback,
    10
)

test_node.destroy_subscription(subscription)
 

 

교육용 실습에서는 동작 원리를 빠르게 확인하기 위해 예외 방식을 사용할 수 있지만, 실제 패키지 코드에서는 명시적으로 노드를 종료하는 방식이 더 안정적입니다.

 

 

 

14. Jupyter Kernel 재시작이 필요한 경우

 

Jupyter Notebook에서 ROS 2 노드를 반복해서 만들다 보면 이전 노드가 남아 있는 것처럼 보일 수 있습니다.

예를 들어 ros2 node list나 rqt_graph에서 방금 만든 노드가 계속 보이는 경우가 있습니다. 이때는 다음 중 하나를 수행합니다.

 
test_node.destroy_node()
rp.shutdown()
 

 

또는 Jupyter 메뉴에서 다음을 선택합니다.

Kernel → Restart
 

 

출력까지 지우고 완전히 초기화하려면 다음 메뉴를 사용합니다.

Kernel → Restart & Clear Output
 

 

 

 

728x90
728x90

'강좌 > ROS2' 카테고리의 다른 글

ROS 2 Python Service Client 사용하기  (0) 2026.05.08
Python에서 ROS2 토픽 발행  (0) 2026.05.07
ROS2 파라미터  (0) 2026.05.06
ROS2 Action  (0) 2026.05.05
ROS2 Topic  (0) 2026.05.04