공통
ㅇ 기본 설정
- CMakeLists.txt에 파이썬 노드 설정 필요 없음
- 해당 파이썬 파일은 실행 권한을 줘야함 (chmod +x python_file.py)
- 해당 노드 실행 시 파이썬 파일이름으로 실행하면 됨 (rosrun package_name python_file.py)
ㅇ 기본 형태 (rospy)
#!/usr/bin/env python
import rospy
def node():
rospy.init_node('node_name')
if __name__=='__main__':
node()
ㅇ rospy.init_node('my_node_name') / rospy.init_node('my_node_name', anonymous=True)
- rospy 노드 설정 및 초기화
- 'my_node_name': 노드 이름
- anonymous: 익명 여부, True일 경우 노드의 고유 이름을 상관쓰지 않음 (기본값 : False)
- log_level=rospy.INFO : 로그 메시지를 게시하기 위한 기본 로그 설정
- disable_signals=False : Ctrl-C 에서 종료할 수 있도록 신호 처리기를 등록 합니다. True일 경우 정의 해줘야함
ㅇ rospy.is_shutdown()
- rospy가 종료되었는지 여부 가져오기
- 보통 while문 돌릴때 사용
ㅇ rospy.spin()
- rospy 반복하기 (노드 설정한 것을 반복)
ㅇ rospy.loginfo('log message')
- 터미널에 'log message' 출력
ㅇ rospy.on_shutdown(callback)
- callback 함수를 부른 뒤 노드 종료 (callback 함수는 옵션)
ㅇ rate = rospy.Rate(10) & rate.sleep()
- rospy.Rate(10) : 대기 시간 10hz 설정 (1초에 10번, 0.1초 단위)
- rate.sleep() : 실정한 시간만큼 대기
ㅇ 메시지 인터페이스 형태
- std_msg 형태 (기본 메시지) : http://wiki.ros.org/std_msgs
- from std_msgs.msg import msg_type
std_msgs - ROS Wiki
kinetic melodic noetic Show EOL distros: EOL distros: electric fuerte groovy hydro indigo jade lunar diamondback: Only showing information from the released package extracted on Unknown. No API documentation available. Please see this page for in
wiki.ros.org
- geometry_msgs 형태 (기하학적 메시지) : http://wiki.ros.org/geometry_msgs
- from geometry_msgs.msg import msg_type
geometry_msgs - ROS Wiki
kinetic melodic noetic Show EOL distros: EOL distros: electric fuerte groovy hydro indigo jade lunar diamondback: Only showing information from the released package extracted on Unknown. No API documentation available. Please see this page for in
wiki.ros.org
- 사용자 정의 형태 (message, service, action)
- from package_name.directory_name import *
- https://easy-support.tistory.com/67 (규격은 ROS 1, 2 동일)
- 종속 패키지 : messge_generation
<build_depend>message_generation</build_depend> <exec_depend>message_runtime</exec_depend>
- message: add_message_files(FILES file_name ... )
- service: add_service_files(FILES file_name ... )
- action: add_action_files(FILES file_name ... )
ㅇ 파라미터
- 파라미터 값 가져오기
rospy.get_param(param_name)
- 파라미터 값 설정
# Using rospy and raw python objects rospy.set_param(param_name, param_value) # Using rosparam and yaml strings rosparam.set_param(param_name, param_value)
- 파라미터 삭제
# Using if if rospy.has_param(param_name): rospy.delete_param(param_name) # Using try try: rospy.delete_param(param_name) except KeyError: print("value not set")
- 파라미터 이름 가져오기
try: rospy.get_param_names() except ROSException: print("could not get param name")
- 가장 가까운 파라미터 값 가져오기 (없을 경우 None 반환, 해당 노드가 /foo/bar에 있을경우 /foo/bar/global_example를 가져옴)
param_name = rospy.search_param('global_example') v = rospy.get_param(param_name)
- /foo/bar/global_example
- /foo/global_example
- /global_example
Topic
ㅇ Publisher
- 'topic_name' : 토픽 이름
- message_type: 메세지 인터페이스 타입
- queue_size: 대기열 설정 (10개, QoS)
rospy.Publisher('topic_name', message_type, queue_size=10)
- 예제
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def talker():
pub = rospy.Publisher('chatter', String, queue_size=10)
rospy.init_node('talker', anonymous=True)
rate = rospy.Rate(10)
while not rospy.is_shutdown():
# 노드 종료할때까지 반복
hello_str = "hello world %s" % rospy.get_time()
rospy.loginfo(hello_str)
pub.publish(hello_str)
rate.sleep()
if __name__=='__main__':
try:
talker()
except rospy.ROSInterruptException:
# 강제 종료 시 쓰레드가 남을 수 있기 때문에 해당 에러 처리로 해결
pass
ㅇ Subscriber
- 'topic_name' : 토픽 이름
- message_type: 메세지 인터페이스 타입
- callback: 콜백함수 (수신 될 때마다 호출)
rospy.Subscriber('topic_name', message_type, callback)
def callback(data):
# data.data : 수신한 데이터
rospy.loginfo(data.data)
- 예제
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def callback(data):
# rospy.get_caller_id() : 해당 노드의 아이디
rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)
def listener():
rospy.init_node('listener', anonymous=True)
rospy.Subscriber("chatter", String, callback)
rospy.spin()
if __name__=='__main__':
listener()
Service
ㅇ Server
- 'service_name' : 서비스 이름
- service_interface_type: 서비스 인터페이스 타입
- callback: 콜백함수 (요청이 올때마다 호출)
rospy.Service('service_name', service_interface_type, callback)
def callback(req):
// req.request_message_name: 요청 데이터 이름 (요청 데이터 가져옴)
// service_interface_nameResponse: 응답 데이터로 넘겨줌
return service_interface_nameResponse(req.request_message_name)
- 예제
#!/usr/bin/env python
from ros_service_tutorial.srv import *
import rospy
def handle_add_two_ints(req):
rospy.loginfo('Returning [%s + %s = %s]' %(req.a, req.b, (req.a+req.b)))
return AddTwoIntsResponse(req.a + req.b)
def add_two_ints_server():
rospy.init_node('add_two_ints_server')
rospy.Service('add_two_ints', AddTwoInts, handle_add_two_ints)
rospy.loginfo('Ready to add two ints.')
rospy.spin()
if __name__=="__main__":
add_two_ints_server()
ㅇ Client
- rospy.wait_for_service('service_name') : 'service_name' 서비스 연결될 때까지 대기
- rospy.ServiceProxy('service_name', service_interface_type) : 서비스 서버 접속
- 'service_name' : 서비스 이름
- service_interface_type: 서비스 인터페이스 타입
- response = variable(request_message_name) : 서비스 서버 접속 이름으로 요청 데이터를 보낸 후 응답 받음
- response.response_message_name : 응답 받은 데이터를 가져와 리턴
rospy.wait_for_service('service_name')
variable = rospy.ServiceProxy('service_name', service_interface_type)
response = variable(request_message_name)
return response.response_message_name
- 예제
#!/usr/bin/env python
from __future__ import print_function
import sys
import rospy
from beginner_tutorials.srv import *
def add_two_ints_client(x, y):
rospy.wait_for_service('add_two_ints')
try:
add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)
resp1 = add_two_ints(x, y)
return resp1.sum
except rospy.ServiceException as e:
# 서비스 서버 접속 실패 시
print("Service call failed: %s"%e)
def usage():
return "%s [x y]"%sys.argv[0]
if __name__ == "__main__":
if len(sys.argv) == 3:
x = int(sys.argv[1])
y = int(sys.argv[2])
else:
print(usage())
sys.exit(1)
print("Requesting %s+%s"%(x, y))
print("%s + %s = %s"%(x, y, add_two_ints_client(x, y)))
Action
ㅇ 종속 패키지 설정
- package.xml
<build_depend>actionlib</build_depend> <build_depend>actionlib_msgs</build_depend> <exec_depend>actionlib</exec_depend> <exec_depend>actionlib_msgs</exec_depend>
- CMakeList.txt
find_package(catkin REQUIRED genmsg actionlib_msgs actionlib) generate_messages(DEPENDENCIES actionlib_msgs)
ㅇ Action Server
- 'action_name' : 액션 이름
- action_message_typeAction : 액션 메시지 인터페이스
- execute_cb=self.callback : 액션 요청이 들어오면 콜백 함수 호출
- auto_start=False
class none_class:
def __init__(self, name):
self.action = actionlib.SimpleActionServer('action_name', action_message_typeAction, execute_cb=self.callback, auto_start=False)
# 액션 서비스 시작
self.action.start()
def callback(self, data):
# 액션 메시지 인터페이스의 Feedback
feedback_message = action_message_typeFeedback()
# 액션 메시지 인터페이스의 Result
result_message = action_message_typeResult()
# 액션 서비스 요청이 들어왔는지 확인
if self._as.is_preempt_requested():
# 액션 서비스가 시작되었음을 알림
self.action.set_preempted()
# feedback 메시지 전송
self.action.publish_feedback(feedback_message)
# result 메시지 전송(응답) 후 액션 서비스 종료
self.action.set_succeeded(self.result_message)
- 예제
#! /usr/bin/env python
import rospy
import actionlib
import actionlib_tutorials.msg
class FibonacciAction(object):
# create messages that are used to publish feedback/result
_feedback = actionlib_tutorials.msg.FibonacciFeedback()
_result = actionlib_tutorials.msg.FibonacciResult()
def __init__(self, name):
self._action_name = name
self._as = actionlib.SimpleActionServer(self._action_name, actionlib_tutorials.msg.FibonacciAction, execute_cb=self.execute_cb, auto_start = False)
self._as.start()
def execute_cb(self, goal):
# helper variables
r = rospy.Rate(1)
success = True
# append the seeds for the fibonacci sequence
self._feedback.sequence = []
self._feedback.sequence.append(0)
self._feedback.sequence.append(1)
# publish info to the console for the user
rospy.loginfo('%s: Executing, creating fibonacci sequence of order %i with seeds %i, %i' % (self._action_name, goal.order, self._feedback.sequence[0], self._feedback.sequence[1]))
# start executing the action
for i in range(1, goal.order):
# check that preempt has not been requested by the client
if self._as.is_preempt_requested():
rospy.loginfo('%s: Preempted' % self._action_name)
self._as.set_preempted()
success = False
break
self._feedback.sequence.append(self._feedback.sequence[i] + self._feedback.sequence[i-1])
# publish the feedback
self._as.publish_feedback(self._feedback)
# this step is not necessary, the sequence is computed at 1 Hz for demonstration purposes
r.sleep()
if success:
self._result.sequence = self._feedback.sequence
rospy.loginfo('%s: Succeeded' % self._action_name)
self._as.set_succeeded(self._result)
if __name__ == '__main__':
rospy.init_node('fibonacci')
server = FibonacciAction(rospy.get_name())
rospy.spin()
ㅇ Client
- 'action_name' : 액션 이름
- action_message_typeAction : 액션 메시지 인터페이스
client = actionlib.SimpleActionClient('action_name', action_message_typeAction)
# 액션 서버 접속할때까지 대기
client.wait_for_server()
# 액션 요청 메시지 설정
goal = action_message_typeGoal(data)
# 액션 요청 보내기
client.send_goal(goal)
# 결과 나올때까지 대기
client.wait_for_result()
# 결과 값 리턴
return client.get_result()
- 예제
#! /usr/bin/env python
import rospy
from __future__ import print_function
# Brings in the SimpleActionClient
import actionlib
# Brings in the messages used by the fibonacci action, including the
# goal message and the result message.
import actionlib_tutorials.msg
def fibonacci_client():
# Creates the SimpleActionClient, passing the type of the action
# (FibonacciAction) to the constructor.
client = actionlib.SimpleActionClient('fibonacci', actionlib_tutorials.msg.FibonacciAction)
# Waits until the action server has started up and started
# listening for goals.
client.wait_for_server()
# Creates a goal to send to the action server.
goal = actionlib_tutorials.msg.FibonacciGoal(order=20)
# Sends the goal to the action server.
client.send_goal(goal)
# Waits for the server to finish performing the action.
client.wait_for_result()
# Prints out the result of executing the action
return client.get_result() # A FibonacciResult
if __name__ == '__main__':
try:
# Initializes a rospy node so that the SimpleActionClient can
# publish and subscribe over ROS.
rospy.init_node('fibonacci_client_py')
result = fibonacci_client()
print("Result:", ', '.join([str(n) for n in result.sequence]))
except rospy.ROSInterruptException:
print("program interrupted before completion", file=sys.stderr)
참고 링크 1: http://wiki.ros.org/ROS/Tutorials/WritingPublisherSubscriber%28python%29
ROS/Tutorials/WritingPublisherSubscriber(python) - ROS Wiki
Writing the Publisher Node "Node" is the ROS term for an executable that is connected to the ROS network. Here we'll create the publisher ("talker") node which will continually broadcast a message. Change directory into the beginner_tutorials package, you
wiki.ros.org
참고 링크 2: http://wiki.ros.org/ROS/Tutorials/WritingServiceClient%28python%29
ROS/Tutorials/WritingServiceClient(python) - ROS Wiki
Writing a Service Node Here we'll create the service ("add_two_ints_server") node which will receive two ints and return the sum. Change directory into the beginner_tutorials package, you created in the earlier tutorial, creating a package: Change director
wiki.ros.org
참고 링크 3: http://wiki.ros.org/actionlib_tutorials
actionlib_tutorials - ROS Wiki
Workspace Setup If you have not yet created a workspace in which to complete the tutorials, click here for some brief instructions click here for some brief instructions . kinetic melodic noetic Show EOL distros: EOL distros: electric fuerte groo
wiki.ros.org
ROS 프로그래밍 (python) -1- 토픽 메시지 통신
ROS python 프로그래밍 =토픽 메시지 통신= Created Date: 2018.03.24 Modified Date: 2018.03.24 revision 1 키워드:ROS, python, 토픽, 메시지 개발 환경:Ubuntu 16.04 + ROS kinetic + laptop ※파이썬을 이용..
htsstory.tistory.com
ROS 프로그래밍 (python) -2- 서비스 통신
ROS python 프로그래밍 =서비스 통신= Created Date: 2018.04.01 Modified Date: 2018.04.01 revision 1 키워드:ROS,ptyhon,service,파이썬,서비스 개발 환경:ROS kinetic kame + Ubuntu 16.04 + laptop ※국내 RO..
htsstory.tistory.com
참고 링크 6: http://wiki.ros.org/rospy/Overview/Initialization%20and%20Shutdown
rospy/Overview/Initialization and Shutdown - ROS Wiki
Initialization There are two parts to initializing your Python code to work in ROS: Configuring your PYTHONPATH: if you use other ROS Packages in your code, you'll need to dynamically load their libraries onto your path so that you can import them. Initial
wiki.ros.org
'ROS > ROS 1 Noetic' 카테고리의 다른 글
ROS 1 Noetic - ROS 명령어 (0) | 2021.10.03 |
---|---|
ROS 1 Noetic - 패키지 생성 및 설정 (0) | 2021.10.01 |
ROS 1 Noetic - 설치 및 개발환경 구성 (Ubuntu) (0) | 2021.10.01 |
ROS - 로봇, 센서, 모터 패키지 (유튜브 ROS 강의 Chapter8. 로봇, 센서, 모터) (0) | 2021.09.26 |
ROS - roslaunch (유튜브 ROS 강의 Chapter7. ROS 기본 프로그래밍) (0) | 2021.09.26 |