ROS/ROS 1 Noetic

ROS 1 Noetic - 기본 프로그래밍 Python 주요 문법

공통

ㅇ 기본 설정

  • CMakeLists.txt에 파이썬 노드 설정 필요 없음
  • 해당 파이썬 파일은 실행 권한을 줘야함 (chmod +x python_file.py)
  • 해당 노드 실행 시 파이썬 파일이름으로 실행하면 됨 (rosrun package_name python_file.py)

 

ㅇ 기본 형태 (rospy)

#!/usr/bin/env python
import rospy

def node():
    rospy.init_node('node_name')
  
if __name__=='__main__':
    node()

 

ㅇ rospy.init_node('my_node_name')  / rospy.init_node('my_node_name', anonymous=True)

  • rospy 노드 설정 및 초기화
  • 'my_node_name': 노드 이름
  • anonymous: 익명 여부, True일 경우 노드의 고유 이름을 상관쓰지 않음 (기본값 : False)
  • log_level=rospy.INFO : 로그 메시지를 게시하기 위한 기본 로그 설정
  • disable_signals=False : Ctrl-C 에서 종료할 수 있도록 신호 처리기를 등록 합니다. True일 경우 정의 해줘야함

 

ㅇ rospy.is_shutdown()

  • rospy가 종료되었는지 여부 가져오기
  • 보통 while문 돌릴때 사용

 

ㅇ rospy.spin()

  • rospy 반복하기 (노드 설정한 것을 반복)

 

ㅇ rospy.loginfo('log message')

  •  터미널에 'log message' 출력

 

ㅇ rospy.on_shutdown(callback)

  • callback 함수를 부른 뒤 노드 종료 (callback 함수는 옵션)

 

ㅇ rate = rospy.Rate(10)    &    rate.sleep()

  • rospy.Rate(10) : 대기 시간 10hz 설정 (1초에 10번, 0.1초 단위)
  • rate.sleep() : 실정한 시간만큼 대기

 

ㅇ 메시지 인터페이스 형태

 

std_msgs - ROS Wiki

kinetic melodic noetic   Show EOL distros:  EOL distros:   electric fuerte groovy hydro indigo jade lunar diamondback: Only showing information from the released package extracted on Unknown. No API documentation available. Please see this page for in

wiki.ros.org

 

geometry_msgs - ROS Wiki

kinetic melodic noetic   Show EOL distros:  EOL distros:   electric fuerte groovy hydro indigo jade lunar diamondback: Only showing information from the released package extracted on Unknown. No API documentation available. Please see this page for in

wiki.ros.org

  • 종속 패키지 : messge_generation
    <build_depend>message_generation</build_depend>
    <exec_depend>message_runtime</exec_depend>​
  • message: add_message_files(FILES file_name ... )
  • service: add_service_files(FILES file_name ... )
  • action: add_action_files(FILES file_name ... )

 

ㅇ 파라미터

  • 파라미터 값 가져오기 
  • rospy.get_param(param_name)
  • 파라미터 값 설정
    # Using rospy and raw python objects
    rospy.set_param(param_name, param_value)
    
    # Using rosparam and yaml strings
    rosparam.set_param(param_name, param_value)
  • 파라미터 삭제
    # Using if
    if rospy.has_param(param_name):
        rospy.delete_param(param_name)
    
    # Using try
    try:
        rospy.delete_param(param_name)
    except KeyError:
        print("value not set")​
  • 파라미터 이름 가져오기
    try:
        rospy.get_param_names()
    except ROSException:
        print("could not get param name")​
  • 가장 가까운 파라미터 값 가져오기 (없을 경우 None 반환, 해당 노드가 /foo/bar에 있을경우 /foo/bar/global_example를 가져옴)
    param_name = rospy.search_param('global_example')
    v = rospy.get_param(param_name)​
    1. /foo/bar/global_example
    2. /foo/global_example
    3. /global_example

 

Topic

ㅇ Publisher

  • 'topic_name' : 토픽 이름
  • message_type: 메세지 인터페이스 타입
  • queue_size: 대기열 설정 (10개, QoS)
rospy.Publisher('topic_name', message_type, queue_size=10)
  • 예제
더보기
#!/usr/bin/env python

import rospy
from std_msgs.msg import String

def talker():
    pub = rospy.Publisher('chatter', String, queue_size=10)
    rospy.init_node('talker', anonymous=True)
    rate = rospy.Rate(10)
    while not rospy.is_shutdown():

        # 노드 종료할때까지 반복
        hello_str = "hello world %s" % rospy.get_time()
        rospy.loginfo(hello_str)
        pub.publish(hello_str)
        rate.sleep()

if __name__=='__main__':
    try:
        talker()
    except rospy.ROSInterruptException:

        # 강제 종료 시 쓰레드가 남을 수 있기 때문에 해당 에러 처리로 해결
        pass

 

ㅇ Subscriber

  • 'topic_name' : 토픽 이름
  • message_type: 메세지 인터페이스 타입
  • callback: 콜백함수 (수신 될 때마다 호출)
rospy.Subscriber('topic_name', message_type, callback)
def callback(data):
    # data.data : 수신한 데이터
    rospy.loginfo(data.data)
  • 예제
더보기
#!/usr/bin/env python

import rospy
from std_msgs.msg import String

def callback(data):

    # rospy.get_caller_id() : 해당 노드의 아이디
    rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)

def listener():
    rospy.init_node('listener', anonymous=True)

    rospy.Subscriber("chatter", String, callback)

    rospy.spin()

if __name__=='__main__':
    listener()

 

Service

ㅇ Server

  • 'service_name' : 서비스 이름
  • service_interface_type: 서비스 인터페이스 타입
  • callback: 콜백함수 (요청이 올때마다 호출)
rospy.Service('service_name', service_interface_type, callback)
def callback(req):
   // req.request_message_name: 요청 데이터 이름 (요청 데이터 가져옴)
   // service_interface_nameResponse: 응답 데이터로 넘겨줌
   return service_interface_nameResponse(req.request_message_name)
  • 예제
더보기
#!/usr/bin/env python

from ros_service_tutorial.srv import *
import rospy

def handle_add_two_ints(req):
    rospy.loginfo('Returning [%s + %s = %s]' %(req.a, req.b, (req.a+req.b)))
    return AddTwoIntsResponse(req.a + req.b)

def add_two_ints_server():
    rospy.init_node('add_two_ints_server')
    rospy.Service('add_two_ints', AddTwoInts, handle_add_two_ints)
    rospy.loginfo('Ready to add two ints.')
    rospy.spin()

if __name__=="__main__":
    add_two_ints_server()

 

ㅇ Client

  • rospy.wait_for_service('service_name') : 'service_name' 서비스 연결될 때까지 대기
  • rospy.ServiceProxy('service_name', service_interface_type) : 서비스 서버 접속
    • 'service_name' : 서비스 이름
    • service_interface_type: 서비스 인터페이스 타입
  • response = variable(request_message_name) : 서비스 서버 접속 이름으로 요청 데이터를 보낸 후 응답 받음
  • response.response_message_name : 응답 받은 데이터를 가져와 리턴
rospy.wait_for_service('service_name')
variable = rospy.ServiceProxy('service_name', service_interface_type)
response = variable(request_message_name)
return response.response_message_name
  • 예제
더보기
#!/usr/bin/env python

from __future__ import print_function

import sys
import rospy
from beginner_tutorials.srv import *

def add_two_ints_client(x, y):
    rospy.wait_for_service('add_two_ints')
    try:
        add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)
        resp1 = add_two_ints(x, y)
        return resp1.sum
    except rospy.ServiceException as e:
        # 서비스 서버 접속 실패 시
        print("Service call failed: %s"%e)

def usage():
    return "%s [x y]"%sys.argv[0]

if __name__ == "__main__":
    if len(sys.argv) == 3:
        x = int(sys.argv[1])
        y = int(sys.argv[2])
    else:
        print(usage())
        sys.exit(1)
    print("Requesting %s+%s"%(x, y))
    print("%s + %s = %s"%(x, y, add_two_ints_client(x, y)))

 

Action

ㅇ 종속 패키지 설정

  • package.xml
    <build_depend>actionlib</build_depend>
    <build_depend>actionlib_msgs</build_depend>
    <exec_depend>actionlib</exec_depend>
    <exec_depend>actionlib_msgs</exec_depend>​
  • CMakeList.txt
    find_package(catkin REQUIRED genmsg actionlib_msgs actionlib)
    generate_messages(DEPENDENCIES actionlib_msgs)

 

ㅇ Action Server

  • 'action_name' : 액션 이름
  • action_message_typeAction : 액션 메시지 인터페이스
  • execute_cb=self.callback : 액션 요청이 들어오면 콜백 함수 호출
  • auto_start=False
class none_class:
    def __init__(self, name):
        self.action = actionlib.SimpleActionServer('action_name', action_message_typeAction, execute_cb=self.callback, auto_start=False)
        
        # 액션 서비스 시작
        self.action.start()
    
    def callback(self, data):
        # 액션 메시지 인터페이스의 Feedback
        feedback_message = action_message_typeFeedback()
        
        # 액션 메시지 인터페이스의 Result
        result_message = action_message_typeResult()
        
        
        # 액션 서비스 요청이 들어왔는지 확인
        if self._as.is_preempt_requested():
            # 액션 서비스가 시작되었음을 알림
            self.action.set_preempted()
			
            # feedback 메시지 전송
            self.action.publish_feedback(feedback_message)
            
            # result 메시지 전송(응답) 후 액션 서비스 종료
            self.action.set_succeeded(self.result_message)

 

  • 예제
더보기
#! /usr/bin/env python

import rospy

import actionlib

import actionlib_tutorials.msg

class FibonacciAction(object):
    # create messages that are used to publish feedback/result
    _feedback = actionlib_tutorials.msg.FibonacciFeedback()
    _result = actionlib_tutorials.msg.FibonacciResult()

    def __init__(self, name):
        self._action_name = name
        self._as = actionlib.SimpleActionServer(self._action_name, actionlib_tutorials.msg.FibonacciAction, execute_cb=self.execute_cb, auto_start = False)
        self._as.start()
      
    def execute_cb(self, goal):
        # helper variables
        r = rospy.Rate(1)
        success = True
        
        # append the seeds for the fibonacci sequence
        self._feedback.sequence = []
        self._feedback.sequence.append(0)
        self._feedback.sequence.append(1)
        
        # publish info to the console for the user
        rospy.loginfo('%s: Executing, creating fibonacci sequence of order %i with seeds %i, %i' % (self._action_name, goal.order, self._feedback.sequence[0], self._feedback.sequence[1]))
        
        # start executing the action
        for i in range(1, goal.order):
            # check that preempt has not been requested by the client
            if self._as.is_preempt_requested():
                rospy.loginfo('%s: Preempted' % self._action_name)
                self._as.set_preempted()
                success = False
                break
            self._feedback.sequence.append(self._feedback.sequence[i] + self._feedback.sequence[i-1])
            # publish the feedback
            self._as.publish_feedback(self._feedback)
            # this step is not necessary, the sequence is computed at 1 Hz for demonstration purposes
            r.sleep()
          
        if success:
            self._result.sequence = self._feedback.sequence
            rospy.loginfo('%s: Succeeded' % self._action_name)
            self._as.set_succeeded(self._result)
        
if __name__ == '__main__':
    rospy.init_node('fibonacci')
    server = FibonacciAction(rospy.get_name())
    rospy.spin()

 

ㅇ Client

  • 'action_name' : 액션 이름
  • action_message_typeAction : 액션 메시지 인터페이스
client = actionlib.SimpleActionClient('action_name', action_message_typeAction)

# 액션 서버 접속할때까지 대기
client.wait_for_server()

# 액션 요청 메시지 설정
goal = action_message_typeGoal(data)

# 액션 요청 보내기
client.send_goal(goal)

# 결과 나올때까지 대기
client.wait_for_result()

# 결과 값 리턴
return client.get_result()
  • 예제
더보기
#! /usr/bin/env python

import rospy
from __future__ import print_function

# Brings in the SimpleActionClient
import actionlib

# Brings in the messages used by the fibonacci action, including the
# goal message and the result message.
import actionlib_tutorials.msg

def fibonacci_client():
    # Creates the SimpleActionClient, passing the type of the action
    # (FibonacciAction) to the constructor.
    client = actionlib.SimpleActionClient('fibonacci', actionlib_tutorials.msg.FibonacciAction)

    # Waits until the action server has started up and started
    # listening for goals.
    client.wait_for_server()

    # Creates a goal to send to the action server.
    goal = actionlib_tutorials.msg.FibonacciGoal(order=20)

    # Sends the goal to the action server.
    client.send_goal(goal)

    # Waits for the server to finish performing the action.
    client.wait_for_result()

    # Prints out the result of executing the action
    return client.get_result()  # A FibonacciResult

if __name__ == '__main__':
    try:
        # Initializes a rospy node so that the SimpleActionClient can
        # publish and subscribe over ROS.
        rospy.init_node('fibonacci_client_py')
        result = fibonacci_client()
        print("Result:", ', '.join([str(n) for n in result.sequence]))
    except rospy.ROSInterruptException:
        print("program interrupted before completion", file=sys.stderr)

 

 

참고 링크 1: http://wiki.ros.org/ROS/Tutorials/WritingPublisherSubscriber%28python%29

 

ROS/Tutorials/WritingPublisherSubscriber(python) - ROS Wiki

Writing the Publisher Node "Node" is the ROS term for an executable that is connected to the ROS network. Here we'll create the publisher ("talker") node which will continually broadcast a message. Change directory into the beginner_tutorials package, you

wiki.ros.org

참고 링크 2: http://wiki.ros.org/ROS/Tutorials/WritingServiceClient%28python%29

 

ROS/Tutorials/WritingServiceClient(python) - ROS Wiki

Writing a Service Node Here we'll create the service ("add_two_ints_server") node which will receive two ints and return the sum. Change directory into the beginner_tutorials package, you created in the earlier tutorial, creating a package: Change director

wiki.ros.org

참고 링크 3: http://wiki.ros.org/actionlib_tutorials

 

actionlib_tutorials - ROS Wiki

Workspace Setup If you have not yet created a workspace in which to complete the tutorials, click here for some brief instructions click here for some brief instructions . kinetic melodic noetic   Show EOL distros:  EOL distros:   electric fuerte groo

wiki.ros.org

참고 링크 4: https://htsstory.tistory.com/entry/ROS-python%ED%8C%8C%EC%9D%B4%EC%8D%AC-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-1-%ED%86%A0%ED%94%BD-%EB%A9%94%EC%8B%9C%EC%A7%80-%ED%86%B5%EC%8B%A0?category=282702 

 

ROS 프로그래밍 (python) -1- 토픽 메시지 통신

ROS python 프로그래밍 =토픽 메시지 통신= Created Date: 2018.03.24 Modified Date: 2018.03.24 revision 1 키워드:ROS, python, 토픽, 메시지 개발 환경:Ubuntu 16.04 + ROS kinetic + laptop ※파이썬을 이용..

htsstory.tistory.com

참고 링크 5: https://htsstory.tistory.com/entry/ROS-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-python-2-%EC%84%9C%EB%B9%84%EC%8A%A4-%ED%86%B5%EC%8B%A0?category=282702

 

ROS 프로그래밍 (python) -2- 서비스 통신

ROS python 프로그래밍 =서비스 통신= Created Date: 2018.04.01 Modified Date: 2018.04.01 revision 1 키워드:ROS,ptyhon,service,파이썬,서비스 개발 환경:ROS kinetic kame + Ubuntu 16.04 + laptop ※국내 RO..

htsstory.tistory.com

참고 링크 6: http://wiki.ros.org/rospy/Overview/Initialization%20and%20Shutdown

 

rospy/Overview/Initialization and Shutdown - ROS Wiki

Initialization There are two parts to initializing your Python code to work in ROS: Configuring your PYTHONPATH: if you use other ROS Packages in your code, you'll need to dynamically load their libraries onto your path so that you can import them. Initial

wiki.ros.org