본문으로 바로가기

Python에서 ROS2 토픽 발행

category 강좌/ROS2 2026. 5. 7. 00:48
728x90
728x90

 

이번에는 /turtle1/cmd_vel 토픽으로 속도 명령을 발행하여 거북이를 움직입니다.

turtlesim에서 거북이를 움직이는 토픽은 다음과 같습니다.

 
/turtle1/cmd_vel
 

 

메시지 타입은 다음과 같습니다.

 
geometry_msgs/msg/Twist
 

 

확인은 다음 명령으로 할 수 있습니다.

 
ros2 topic list -t

 

 

1. 필요한 모듈 import

 

Twist 메시지는 선속도와 각속도를 표현하는 메시지입니다.

구조는 크게 두 부분입니다.

linear
angular
 

 

각각은 다시 x, y, z 값을 가집니다.

linear.x
linear.y
linear.z

angular.x
angular.y
angular.z
 

 

turtlesim에서는 주로 아래 두 값을 사용합니다.

 

msg.linear.x
msg.angular.z

 

 

 

 

import rclpy as rp
from geometry_msgs.msg import Twist

 

 

 

 

2. 노드 생성

 

rp.init()
test_node = rp.create_node('pub_test')

 

여기서 pub_test는 토픽을 발행하는 노드 이름입니다.

 

 

 

3. Twist 메시지 객체 생성

 

msg = Twist()
print(msg)

 

 

 

초기 상태의 Twist 메시지는 모든 값이 0.0입니다.

linear:
  x: 0.0
  y: 0.0
  z: 0.0
angular:
  x: 0.0
  y: 0.0
  z: 0.0
 

 

거북이를 앞으로 움직이려면 linear.x 값을 수정합니다.

 
msg.linear.x = 2.0
print(msg)
 

 

 

 

회전시키려면 angular.z 값을 수정합니다.

 
msg.angular.z = 1.0
print(msg)

 

 

 

4. Publisher 생성과 발행

 

a. Publisher 생성

 

pub = test_node.create_publisher(
    Twist,
    '/turtle1/cmd_vel',
    10
)

 

 

 

구조는 다음과 같습니다.

 
node.create_publisher(메시지타입, 토픽이름, QoS)
 

각 항목의 의미는 다음과 같습니다.

 

                                        항목                                                                                                설명
Twist 발행할 메시지 타입
/turtle1/cmd_vel 발행할 토픽 이름
10 QoS queue depth

 

 

b. 메시지 한 번 발행하기

 

pub.publish(msg)

 

 

위 코드를 실행하면 거북이가 한 번 움직입니다.

 

 

전체 코드는 다음과 같습니다.

 

import rclpy as rp
from geometry_msgs.msg import Twist

rp.init()

test_node = rp.create_node('pub_test')

msg = Twist()
msg.linear.x = 2.0

pub = test_node.create_publisher(
    Twist,
    '/turtle1/cmd_vel',
    10
)

pub.publish(msg)

 

 

 

 

5. 직진과 회전을 함께 명령하기

 

거북이를 앞으로 이동시키면서 회전시키려면 다음과 같이 작성합니다.

 

msg.linear.x = 2.0
msg.angular.z = 1.0

pub.publish(msg)

 

 

 

 

이렇게 하면 거북이가 원호를 그리며 움직입니다.

값을 바꾸면 움직임도 달라집니다.

 

                                               설정                                                                                                  결과
linear.x > 0 전진
linear.x < 0 후진
angular.z > 0 반시계 방향 회전
angular.z < 0 시계 방향 회전
linear.x = 0, angular.z ≠ 0 제자리 회전

 

 

6. Timer로 주기적으로 토픽 발행하기

 

a. Timer가 필요한 이유

한 번만 publish()를 실행하면 거북이는 짧게만 움직입니다.


지속적으로 움직이게 하려면 일정 주기로 메시지를 계속 발행해야 합니다.

ROS 2에서는 이를 위해 timer를 사용합니다.

 

 

test_node.create_timer(timer_period, timer_callback)

 

timer_period는 몇 초마다 실행할지를 의미합니다.


예를 들어 timer_callback이 0.1이면 0.1초마다 callback이 실행됩니다.

 

 

b. Timer callback 작성

 

cnt = 0

def timer_callback():
    global cnt

    cnt += 1
    print(cnt)

    pub.publish(msg)

    if cnt > 3:
        raise Exception("Publisher Stop")

 

이 코드는 timer가 실행될 때마다 다음 동작을 수행합니다.

  1. cnt 값을 증가시킵니다.
  2. 현재 횟수를 출력합니다.
  3. /turtle1/cmd_vel로 msg를 발행합니다.
  4. 4번째 실행 이후 예외를 발생시켜 멈춥니다.

 

c. Timer 전체 코드

 

import rclpy as rp
from geometry_msgs.msg import Twist

rp.init()

test_node = rp.create_node('pub_test')

msg = Twist()
msg.linear.x = 2.0
msg.angular.z = 1.0

pub = test_node.create_publisher(
    Twist,
    '/turtle1/cmd_vel',
    10
)

cnt = 0

def timer_callback():
    global cnt

    cnt += 1
    print("publish count:", cnt)

    pub.publish(msg)

    if cnt > 3:
        raise Exception("Publisher Stop")

timer_period = 0.1

timer = test_node.create_timer(
    timer_period,
    timer_callback
)

rp.spin(test_node)

 

 

 

 

 

7. 실무형 개선 코드

 

강의에서는 위 예제가 직관적입니다. 

실무형 코드에서는 예외로 종료하는 대신, 일정 횟수 발행 후 안전하게 종료하는 구조입니다.

 

import rclpy
from geometry_msgs.msg import Twist

def main():
    rclpy.init()

    node = rclpy.create_node('safe_pub_test')

    publisher = node.create_publisher(
        Twist,
        '/turtle1/cmd_vel',
        10
    )

    msg = Twist()
    msg.linear.x = 2.0
    msg.angular.z = 1.0

    count = {'value': 0}

    def timer_callback():
        count['value'] += 1
        node.get_logger().info(f"publish count: {count['value']}")

        publisher.publish(msg)

        if count['value'] >= 10:
            node.get_logger().info("Publishing finished.")
            node.destroy_timer(timer)
            node.destroy_node()
            rclpy.shutdown()

    timer = node.create_timer(0.1, timer_callback)

    rclpy.spin(node)

if __name__ == '__main__':
    main()

 

 

 

더보기

count = {'value': 0} 는 타이머 콜백 함수 안에서 값을 계속 증가시키기 위해 만든 카운터 변수입니다.

여기서 count는 정수 변수가 아니라 딕셔너리입니다.

 
{
    'value': 0
}
 

즉, count 안에 "value"라는 이름의 값이 있고, 초기값은 0입니다.

 

딕셔너리로 썼냐면, Python의 함수 스코프 때문입니다.

예를 들어 이렇게 쓰면 문제가 생깁니다.

 
count = 0

def timer_callback():
    count += 1
 

이 코드는 에러가 납니다.

UnboundLocalError: local variable 'count' referenced before assignment
 

왜냐하면 함수 안에서 count += 1을 쓰는 순간, Python은 count를 함수 내부 지역 변수로 판단하기 때문입니다.

그래서 바깥의 count = 0을 제대로 수정하지 못합니다.

 

현재 코드 방식처럼 사용하면

 
count = {'value': 0}

def timer_callback():
    count['value'] += 1
 

딕셔너리 자체는 바깥에 있고, 함수 안에서는 딕셔너리 안의 값만 바꾸는 것이므로 정상 동작합니다.

 

 

 

 

이 코드는 블로그 독자에게 더 좋은 예제입니다.

 

이유는 다음과 같습니다.

                         교육용 코드                                                                          실무형 코드
raise Exception()으로 종료 destroy_node()와 shutdown() 사용
Jupyter 실습에 적합 ROS 2 패키지 코드에 적합
빠르게 결과 확인 가능 구조가 안정적
디버깅용 실제 프로젝트용

 

 

 

 

 

8. 노드 종료 정리

 

ROS 2에서 노드를 만들었으면 사용 후 종료하는 습관이 중요합니다.

 

test_node.destroy_node()
rp.shutdown()

 

 

Jupyter에서는 kernel이 살아 있으면 이전 노드 정보가 남아 보일 수 있습니다.
이 경우 다음 메뉴를 사용합니다.

Kernel → Restart
 

 

또는 완전 초기화를 위해 다음을 선택합니다.

Kernel → Restart & Clear Output

 

 

728x90
728x90

'강좌 > ROS2' 카테고리의 다른 글

ROS2 패키지 만들고 토픽 다루기  (0) 2026.05.08
ROS 2 Python Service Client 사용하기  (0) 2026.05.08
Python에서 ROS2 토픽 구독  (0) 2026.05.07
ROS2 파라미터  (0) 2026.05.06
ROS2 Action  (0) 2026.05.05