有一个网站 人物模型可以做各种动作,开发公司组织架构及岗位职责,深圳公司贷款,wordpress 订餐模板本文介绍ROS机器人操作系统#xff08;Robot Operating System#xff09;的实现原理#xff0c;从最底层分析ROS代码是如何实现的。 1、序列化
把通信的内容#xff08;也就是消息message#xff09;序列化是通信的基础#xff0c;所以我们先研究序列化。
尽管笔者从事…本文介绍ROS机器人操作系统Robot Operating System的实现原理从最底层分析ROS代码是如何实现的。 1、序列化
把通信的内容也就是消息message序列化是通信的基础所以我们先研究序列化。
尽管笔者从事机器人学习和研发很长时间了但是在研究ROS的过程中“序列化”这个词还是这辈子第一次听到。
所以可想而知很多人在看到“把一个消息序列化”这样的描述时是如何一脸懵逼。
但其实序列化是一个比较常见的概念你虽然不知道它但一定接触过它。
下面我们先介绍“序列化”的一些常识然后解释ROS里的序列化是怎么做的
1.1 什么是序列化
“序列化”Serialization 的意思是将一个对象转化为字节流。
这里说的对象可以理解为“面向对象”里的那个对象具体的就是存储在内存中的对象数据。
与之相反的过程是“反序列化”Deserialization 。
虽然挂着机器人的羊头但是后面的介绍全部是计算机知识跟机器人一丁点关系都没有序列化就是一个纯粹的计算机概念。
序列化的英文Serialize就有把一个东西变成一串连续的东西之意。
形象的描述数据对象是一团面序列化就是将面团拉成一根面条反序列化就将面条捏回面团。
另一个形象的类比是我们在对话或者打电话时一个人的思想转换成一维的语音然后在另一个人的头脑里重新变成结构化的思想这也是一种序列化。 面对序列化很多人心中可能会有很多疑问。
首先为什么要序列化或者更具体的说既然对象的信息本来就是以字节的形式储存在内存中那为什么要多此一举把一些字节数据转换成另一种形式的、一维的、连续的字节数据呢
如果我们的程序在内存中存储了一个数字比如25。那要怎么传递25这个数字给别的程序节点或者把这个数字永久存储起来呢
很简单直接传递25这个数字的字节表示即0X19当然最终会变成二进制表示11001以高低电平传输存储或者直接把这个数字的字节表示写进硬盘里即可。
所以对于本来就是连续的、一维的、一连串的数据例如字符串序列化并不需要做太多东西其本质是就是由内存向其它地方拷贝数据而已。
所以如果你在一个序列化库里看到memcpy函数不用觉得奇怪因为你知道序列化最底层不过就是在操作内存数据而已还有些库使用了流的ostream.rdbuf()-sputn函数。
可是实际程序操作的对象很少是这么简单的形式大多数时候我们面对的是包含不同数据类型int、double、string的复杂数据结构比如vector、list它们很可能在内存中是不连续存储的而是分散在各处。比如ROS的很多消息都包含向量。
数据中还有各种指针和引用。而且如果数据要在运行于不同架构的计算机之上的、由不同编程语言所编写的节点程序之间传递那问题就更复杂了它们的字节顺序endianness规定有可能不一样基本数据类型比如int的长度也不一样有的int是4个字节、有的是8个字节。
这些都不是通过简单地、原封不动地复制粘贴原始数据就能解决的。这时候就需要序列化和反序列化了。
所以在程序之间需要通信时ROS恰好就是这种情况或者希望保存程序的中间运算结果时序列化就登场了。
另外在某种程度上序列化还起到统一标准的作用。 我们把被序列化的东西叫object对象它可以是任意的数据结构或者对象结构体、数组、类的实例等等。
把序列化后得到的东西叫archive它既可以是人类可读的文本形式也可以是二进制形式。
前者比如JSON和XML这两个是网络应用里最常用的序列化格式通过记事本就能打开阅读
后者就是原始的二进制文件比如后缀名是bin的文件人类是没办法直接阅读一堆的0101或者0XC9D23E72的。
序列化算是一个比较常用的功能所以大多数编程语言比如C、Python、Java等都会附带用于序列化的库不需要你再去造轮子。
以C为例虽然标准STL库没有提供序列化功能但是第三方库Boost提供了[ 2 ]谷歌的protobuf也是一个序列化库还有Fast-CDR以及不太知名的CerealJava自带序列化函数python可以使用第三方的pickle模块实现。
总之序列化没有什么神秘的用户可以看看这些开源的序列化库代码或者自己写个小程序试试简单数据的序列化例如这个例子或者这个有助于更好地理解ROS中的实现。
1.2 ROS中的序列化实现
理解了序列化再回到ROS。我们发现ROS没有采用第三方的序列化工具而是选择自己实现代码在roscpp_core项目下的roscpp_serialization中见下图。这个功能涉及的代码量不是很多。
为什么ROS不使用现成的序列化工具或者库呢可能ROS诞生的时候2007年有些序列化库可能还不存在protobuf诞生于2008年更有可能是ROS的创造者认为当时没有合适的工具。
1.2.1 serialization.h
核心的函数都在serialization.h里简而言之里面使用了C语言标准库的memcpy函数把消息拷贝到流中。
下面来看一下具体的实现。
序列化功能的特点是要处理很多种数据类型针对每种具体的类型都要实现相应的序列化函数。
为了尽量减少代码量ROS使用了模板的概念所以代码里有一堆的template。
从后往前梳理先看Stream这个结构体吧。在C里结构体和类基本没什么区别结构体里也可以定义函数。
Stream翻译为流流是一个计算机中的抽象概念前面我们提到过字节流它是什么意思呢
在需要传输数据的时候我们可以把数据想象成传送带上连续排列的一个个被传送的物体它们就是一个流。
更形象的可以想象磁带或者图灵机里连续的纸带。在文件读写、使用串口、网络Socket通信等领域流经常被使用。例如我们常用的输入输出流
couthelllo; 由于使用很多流的概念也在演变。想了解更多可以看这里。
struct Stream{ // Returns a pointer to the current position of the stream inline uint8_t* getData() { return data_; } // Advances the stream, checking bounds, and returns a pointer to the position before it was advanced. // \throws StreamOverrunException if len would take this stream past the end of its buffer ROS_FORCE_INLINE uint8_t* advance(uint32_t len){ uint8_t* old_data data_; data_ len; if (data_ end_) { // Throwing directly here causes a significant speed hit due to the extra code generated for the throw statement throwStreamOverrun(); } return old_data; } // Returns the amount of space left in the stream inline uint32_t getLength() { return static_castuint32_t(end_ - data_); } protected: Stream(uint8_t* _data, uint32_t _count) : data_(_data), end_(_data _count) {}
private: uint8_t* data_; uint8_t* end_;};注释表明Stream是个基类输入输出流IStream和OStream都继承自它。
Stream的成员变量data_是个指针指向序列化的字节流开始的位置它的类型是uint8_t。
在Ubuntu系统中uint8_t的定义是typedef unsigned char uint8_t;
所以uint8_t就是一个字节可以用size_of()函数检验。data_指向的空间就是保存字节流的。
输出流类OStream用来序列化一个对象它引用了serialize函数如下。
struct OStream : public Stream{ static const StreamType stream_type stream_types::Output; OStream(uint8_t* data, uint32_t count) : Stream(data, count) {} /* Serialize an item to this output stream*/ templatetypename T ROS_FORCE_INLINE void next(const T t){ serialize(*this, t); } templatetypename T ROS_FORCE_INLINE OStream operator(const T t) { serialize(*this, t); return *this; }}; 输入流类IStream用来反序列化一个字节流它引用了deserialize函数如下。 struct ROSCPP_SERIALIZATION_DECL IStream : public Stream{ static const StreamType stream_type stream_types::Input; IStream(uint8_t* data, uint32_t count) : Stream(data, count) {} /* Deserialize an item from this input stream */ templatetypename T ROS_FORCE_INLINE void next(T t){ deserialize(*this, t); } templatetypename T ROS_FORCE_INLINE IStream operator(T t) { deserialize(*this, t); return *this; }}; 自然serialize函数和deserialize函数就是改变数据形式的地方它们的定义在比较靠前的地方。它们都接收两个模板都是内联函数然后里面没什么东西只是又调用了Serializer类的成员函数write和read。所以serialize和deserialize函数就是个二道贩子。
// Serialize an object. Stream here should normally be a ros::serialization::OStreamtemplatetypename T, typename Streaminline void serialize(Stream stream, const T t){ SerializerT::write(stream, t);}// Deserialize an object. Stream here should normally be a ros::serialization::IStreamtemplatetypename T, typename Streaminline void deserialize(Stream stream, T t){ SerializerT::read(stream, t);} 所以我们来分析Serializer类如下。我们发现write和read函数又调用了类型里的serialize函数和deserialize函数。
头别晕这里的serialize和deserialize函数跟上面的同名函数不是一回事。
注释中说“Specializing the Serializer class is the only thing you need to do to get the ROS serialization system to work with a type”要想让ROS的序列化功能适用于其它的某个类型你唯一需要做的就是特化这个Serializer类。
这就涉及到的另一个知识点——模板特化template specialization。
templatetypename T struct Serializer{ // Write an object to the stream. Normally the stream passed in here will be a ros::serialization::OStream templatetypename Stream inline static void write(Stream stream, typename boost::call_traitsT::param_type t){ t.serialize(stream.getData(), 0); } // Read an object from the stream. Normally the stream passed in here will be a ros::serialization::IStream templatetypename Stream inline static void read(Stream stream, typename boost::call_traitsT::reference t){ t.deserialize(stream.getData()); } // Determine the serialized length of an object. inline static uint32_t serializedLength(typename boost::call_traitsT::param_type t){ return t.serializationLength(); }}; 接着又定义了一个带参数的宏函数ROS_CREATE_SIMPLE_SERIALIZER(Type)然后把这个宏作用到了ROS中的10种基本数据类型分别是uint8_t, int8_t, uint16_t, int16_t, uint32_t, int32_t, uint64_t, int64_t, float, double。
说明这10种数据类型的处理方式都是类似的。看到这里大家应该明白了write和read函数都使用了memcpy函数进行数据的移动。
注意宏定义中的template语句这正是模板特化的标志关键词template后面跟一对尖括号。
关于模板特化可以看这里。
#define ROS_CREATE_SIMPLE_SERIALIZER(Type) \ template struct SerializerType \ { \ templatetypename Stream inline static void write(Stream stream, const Type v) \{ \ memcpy(stream.advance(sizeof(v)), v, sizeof(v) ); \ } \ templatetypename Stream inline static void read(Stream stream, Type v) \{ \ memcpy(v, stream.advance(sizeof(v)), sizeof(v) ); \ } \ inline static uint32_t serializedLength(const Type) \{ \ return sizeof(Type); \ } \};ROS_CREATE_SIMPLE_SERIALIZER(uint8_t)ROS_CREATE_SIMPLE_SERIALIZER(int8_t)ROS_CREATE_SIMPLE_SERIALIZER(uint16_t)ROS_CREATE_SIMPLE_SERIALIZER(int16_t)ROS_CREATE_SIMPLE_SERIALIZER(uint32_t)ROS_CREATE_SIMPLE_SERIALIZER(int32_t)ROS_CREATE_SIMPLE_SERIALIZER(uint64_t)ROS_CREATE_SIMPLE_SERIALIZER(int64_t)ROS_CREATE_SIMPLE_SERIALIZER(float)ROS_CREATE_SIMPLE_SERIALIZER(double)对于其它类型的数据例如bool、std::string、std::vector、ros::Time、ros::Duration、boost::array等等它们各自的处理方式有细微的不同所以不再用上面的宏函数而是用模板特化的方式每种单独定义这也是为什么serialization.h这个文件这么冗长。
对于int、double这种单个元素的数据直接用上面特化的Serializer类中的memcpy函数实现序列化。
对于vector、array这种多个元素的数据类型怎么办呢方法是分成几种情况对于固定长度简单类型的fixed-size simple types还是用各自特化的Serializer类中的memcpy函数实现没啥太大区别。
对于固定但是类型不简单的fixed-size non-simple types或者既不固定也不简单的non-fixed-size, non-simple types或者固定但是不简单的fixed-size, non-simple types用for循环遍历一个元素一个元素的单独处理。
那怎么判断一个数据是不是固定是不是简单呢这是在roscpp_traits文件夹中的message_traits.h完成的。
其中采用了萃取Type Traits这是相对高级一点的编程技巧了笔者也不太懂。
对序列化的介绍暂时就到这里了有一些细节还没讲等笔者看懂了再补。 2、消息订阅发布
2.1 ROS的本质
如果问ROS的本质是什么或者用一句话概括ROS的核心功能。那么笔者认为ROS就是个通信库让不同的程序节点能够相互对话。
很多文章和书籍在介绍ROS是什么的时候经常使用“ROS是一个通信框架”这种描述。
但是笔者认为这种描述并不是太合适。“框架”是个对初学者非常不友好的抽象词汇用一个更抽象难懂的概念去解释一个本来就不清楚的概念对初学者起不到任何帮助
而且笔者严重怀疑绝大多数作者能对机器人的本质或者软件框架能有什么太深的理解他们的见解不会比你我深刻多少。
既然提到本质那我们就深入到最基本的问题。
在接触无穷的细节之前我们不妨先做一个哲学层面的思考。
那就是为什么ROS要解决通信问题
机器人涉及的东西千千万万机械、电子、软件、人工智能无所不包为什么底层的设计是一套用来通信的程序而不是别的东西。
到目前为止我还没有看到有人讨论过这个问题。这要回到机器人或者智能的本质。
当我们在谈论机器人的时候最首要的问题不是硬件设计而是对信息的处理。一个机器人需要哪些信息信息从何而来如何传递又被谁使用这些才是最重要的问题。
人类飞不鸟游不过鱼跑不过马力不如牛为什么却自称万物之灵呢。
因为人有大脑而且人类大脑处理的信息更多更复杂。
抛开物质从信息的角度看人与动物、与机器人存在很多相似的地方。
机器人由许多功能模块组成它们之间需要协作才能形成一个有用的整体机器人与机器人之间也需要协作才能形成一个有用的系统要协作就离不开通信。
需要什么样的信息以及信息从何而来不是ROS首先关心的因为这取决于机器人的应用场景。
因此ROS首先要解决的是通信的问题即如何建立通信、用什么方式通信、通信的格式是什么等等一系列具体问题。
带着这些问题我们看看ROS是如何设计的。
2.2 客户端库
实现通信的代码在ros_comm包中如下。
其中clients文件夹一共有127个文件看来是最大的包了。
现在我们来到了ROS最核心的地带。 客户端这个名词出现的有些突然一个机器人操作系统里为什么需要客户端。
原因是节点与主节点master之间的关系是client/server这时每个节点都是一个客户端client而master自然就是服务器端server。
那客户端库client libraries是干什么的就是为实现节点之间通信的。
虽然整个文件夹中包含的文件众多但是我们如果按照一定的脉络来分析就不会眼花缭乱。
节点之间最主要的通信方式就是基于消息的。为了实现这个目的需要三个步骤如下。
弄明白这三个步骤就明白ROS的工作方式了。这三个步骤看起来是比较合乎逻辑的并不奇怪。
消息的发布者和订阅者即消息的接收方建立连接
发布者向话题发布消息订阅者在话题上接收消息将消息保存在回调函数队列中
调用回调函数队列中的回调函数处理消息。
2.2.1 一个节点的诞生
在建立连接之前首先要有节点。
节点就是一个独立的程序它运行起来后就是一个普通的进程与计算机中其它的进程并没有太大区别。
一个问题是ROS中为什么把一个独立的程序称为“节点”
这是因为ROS沿用了计算机网络中“节点”的概念。
在一个网络中例如互联网每一个上网的计算机就是一个节点。前面我们看到的客户端、服务器这样的称呼也是从计算机网络中借用的。
下面来看一下节点是如何诞生的。我们在第一次使用ROS时一般都会照着官方教程编写一个talker和一个listener节点以熟悉ROS的使用方法。
我们以talker为例它的部分代码如下。
#include ros/ros.hint main(int argc, char **argv){ /* You must call one of the versions of ros::init() before using any other part of the ROS system. */ ros::init(argc, argv, talker); ros::NodeHandle n;main函数里首先调用了init()函数初始化一个节点该函数的定义在init.cpp文件中。
当我们的程序运行到init()函数时一个节点就呱呱坠地了。
而且在出生的同时我们还顺道给他起好了名字也就是talker。
名字是随便起的但是起名是必须的。 我们进入init()函数里看看它做了什么代码如下看上去还是挺复杂的。它初始化了一个叫g_global_queue的数据它的类型是CallbackQueuePtr。
这是个相当重要的类叫“回调队列”后面还会见到它。init()函数还调用了network、master、this_node、file_log、param这几个命名空间里的init初始化函数各自实现一些变量的初始化这些变量都以g开头例如g_host、g_uri用来表明它们是全局变量。
其中network::init完成节点主机名、IP地址等的初始化master::init获取master的URI、主机号和端口号。
this_node::init定义节点的命名空间和节点的名字没错把我们给节点起的名字就存储在这里。file_log::init初始化日志文件的路径。
void init(const M_string remappings, const std::string name, uint32_t options){ if (!g_atexit_registered) { g_atexit_registered true; atexit(atexitCallback); } if (!g_global_queue) { g_global_queue.reset(new CallbackQueue); } if (!g_initialized) { g_init_options options; g_ok true; ROSCONSOLE_AUTOINIT; // Disable SIGPIPE#ifndef WIN32 signal(SIGPIPE, SIG_IGN);#else WSADATA wsaData; WSAStartup(MAKEWORD(2, 0), wsaData);#endif check_ipv6_environment(); network::init(remappings); master::init(remappings); // names:: namespace is initialized by this_node this_node::init(name, remappings, options); file_log::init(remappings); param::init(remappings); g_initialized true; }}完成初始化以后就进入下一步ros::NodeHandle n定义句柄。
我们再进入node_handle.cpp文件发现构造函数NodeHandle::NodeHandle调用了自己的construct函数。然后顺藤摸瓜找到construct函数它里面又调用了ros::start()函数。
没错我们又绕回到了init.cpp文件。
ros::start()函数主要实例化了几个重要的类如下。
完成实例化后马上又调用了各自的start()函数启动相应的动作。
这些都做完了以后就可以发布或订阅消息了。
一个节点的故事暂时就到这了。
TopicManager::instance()-start();ServiceManager::instance()-start();ConnectionManager::instance()-start();PollManager::instance()-start();XMLRPCManager::instance()-start();2.2.1 XMLRPC是什么
关于ROS节点建立连接的技术细节官方文档说的非常简单在这里ROS Technical Overview。没有基础的同学看这个介绍必然还是不懂。
在ROS中节点与节点之间的通信依靠节点管理器master牵线搭桥。
master像一个中介它介绍节点们互相认识。一旦节点们认识了以后master就完成自己的任务了它就不再掺和了。
这也是为什么你启动节点后再杀死master节点之间的通信依然保持正常的原因。
使用过电驴和迅雷而且研究过BitTorrent的同学对master的工作方式应该很熟悉master就相当于Tracker服务器它存储着其它节点的信息。
我们每次下载之前都会查询Tracker服务器找到有电影资源的节点然后就可以与它们建立连接并开始下载电影了。
那么master是怎么给节点牵线搭桥的呢ROS使用了一种叫XMLRPC的方式实现这个功能。
XMLRPC中的RPC的意思是远程过程调用Remote Procedure Call。
简单来说远程过程调用的意思就是一个计算机中的程序在我们这就是节点啦可以调用另一个计算机中的函数只要这两个计算机在一个网络中。
这是一种听上去很高大上的功能它能让节点去访问网络中另一台计算机上的程序资源。
XMLRPC中的XML我们在1.1节讲消息序列化时提到了它就是一种数据表示方式而已。
所以合起来XMLRPC的意思就是把由XML表示的数据发送给其它计算机上的程序运行。
运行后返回的结果仍然以XML格式返回回来然后我们通过解析它还原回纯粹的数据就能干别的事了。
想了解更多XMLRPC的细节可以看这个XML-RPC概述。
举个例子一个XMLRPC请求是下面这个样子的。因为XMLRPC是基于HTTP协议的所以下面的就是个标准的HTTP报文。
POST / HTTP/1.1User-Agent: XMLRPC 0.7Host: localhost:11311Content-Type: text/xmlContent-length: 78
?xml version1.0?methodCall methodNamecircleArea/methodName params param valuedouble2.41/double/value /param /params/methodCall如果你没学过HTTP协议看上面的语句可能会感到陌生。《图解HTTP》这本小书可以让你快速入门。
HTTP报文比较简单它分两部分前半部分是头部后半部分是主体。
头部和主体之间用空行分开这都是HTTP协议规定的标准。
上面主体部分的格式就是XML见的多了你就熟悉了。
所以XMLRPC传递的消息其实就是主体部分是XML格式的HTTP报文而已没什么神秘的。 对应客户端一个XMLRPC请求服务器端会执行它并返回一个响应它也是一个HTTP报文如下。
它的结构和请求一样不再解释了。所以XMLRPC跟我们上网浏览网页的过程其实差不多。
HTTP/1.1 200 OKDate: Sat, 06 Oct 2001 23:20:04 GMTServer: Apache.1.3.12 (Unix)Connection: closeContent-Type: text/xmlContent-Length: 124
?xml version1.0?methodResponse params param valuedouble18.24668429131/double/value /param /params/methodResponse 2.2.2 ROS中XMLRPC的实现
上面的例子解释了XMLRPC是什么下面我们看看ROS是如何实现XMLRPC的。
ROS使用的XMLRPC介绍在这里
http://wiki.ros.org/xmlrpcpp。这次ROS的创作者没有从零开始造轮子而是在一个已有的XMLRPC库的基础上改造的。
XMLRPC的C代码在下载后的ros_comm-noetic-devel\utilities\xmlrpcpp路径下。
还好整个工程不算太大。XMLRPC分成客户端和服务器端两大部分。
咱们先看客户端主要代码在XmlRpcClient.cpp文件里。
擒贼先擒王XmlRpcClient.cpp文件中最核心的函数就是execute用于执行远程调用代码如下。
// Execute the named procedure on the remote server.// Params should be an array of the arguments for the method.// Returns true if the request was sent and a result received (although the result might be a fault).bool XmlRpcClient::execute(const char* method, XmlRpcValue const params, XmlRpcValue result){ XmlRpcUtil::log(1, XmlRpcClient::execute: method %s (_connectionState %s)., method, connectionStateStr(_connectionState));// This is not a thread-safe operation, if you want to do multithreading, use separate // clients for each thread. If you want to protect yourself from multiple threads // accessing the same client, replace this code with a real mutex. if (_executing) return false;_executing true; ClearFlagOnExit cf(_executing);_sendAttempts 0; _isFault false;if ( ! setupConnection()) return false;if ( ! generateRequest(method, params)) return false;result.clear(); double msTime -1.0; // Process until exit is called _disp.work(msTime);if (_connectionState ! IDLE || ! parseResponse(result)) { _header ; return false; }// close() if server does not supports HTTP1.1 // otherwise, reusing the socket to write leads to a SIGPIPE because // the remote server could shut down the corresponding socket. if (_header.find(HTTP/1.1 200 OK, 0, 15) ! 0) { close(); }XmlRpcUtil::log(1, XmlRpcClient::execute: method %s completed., method); _header ; _response ; return true;}它首先调用setupConnection()函数与服务器端建立连接。
连接成功后调用generateRequest()函数生成发送请求报文。
XMLRPC请求报文的头部又交给generateHeader()函数做了代码如下。
// Prepend http headersstd::string XmlRpcClient::generateHeader(size_t length) const{ std::string header POST _uri HTTP/1.1\r\n User-Agent: ; header XMLRPC_VERSION; header \r\nHost: ; header _host;char buff[40]; std::snprintf(buff,40,:%d\r\n, _port);header buff; header Content-Type: text/xml\r\nContent-length: ;std::snprintf(buff,40,%zu\r\n\r\n, length); return header buff;} 主体部分则先将远程调用的方法和参数变成XML格式generateRequest()函数再将头部和主体组合成完整的报文如下
std::string header generateHeader(body.length());_request header body;
把报文发给服务器后就开始静静地等待。
一旦接收到服务器返回的报文后就调用parseResponse函数解析报文数据也就是把XML格式变成纯净的数据格式。
我们发现XMLRPC使用了socket功能实现客户端和服务器通信。
我们搜索socket这个单词发现它原始的意思是插座如下。这非常形象建立连接实现通信就像把插头插入插座。 虽说XMLRPC也是ROS的一部分但它毕竟只是一个基础功能我们会用即可暂时不去探究其实现细节
所以对它的分析到此为止。下面我们来看节点是如何调用XMLRPC的。
2.2.3 节点间通过XMLRPC建立连接
在一个节点刚启动的时候它并不知道其它节点的存在更不知道它们在交谈什么当然也就谈不上通信。
所以它要先与master对话查询其它节点的状态然后再与其它节点通信。
而节点与master对话使用的就是XMLRPC。
从这一点来看master叫节点管理器确实名副其实它是一个大管家给刚出生的节点提供服务。
下面我们以两个节点talker和listener为例介绍其通过XMLRPC建立通信连接的过程如下图所示。 talker注册
假设我们先启动talker。启动后它通过1234端口使用XMLRPC向master注册自己的信息包含所发布消息的话题名。master会将talker的注册信息加入注册列表中
2.listener注册
listener启动后同样通过XMLRPC向master注册自己的信息包含需要订阅的话题名
3.master进行匹配
master根据listener的订阅信息从注册列表中查找如果没有找到匹配的发布者则等待发布者的加入如果找到匹配的发布者信息则通过XMLRPC向listener发送talker的地址信息。
4.listener发送连接请求
listener接收到master发回的talker地址信息尝试通过XMLRPC向talker发送连接请求传输订阅的话题名、消息类型以及通信协议TCP或者UDP;
5.talker确认连接请求
talker接收到listener发送的连接请求后继续通过XMLRPC向listener确认连接信息其中包含自身的TCP地址信息;
6.listener尝试与talker建立连接
listener接收到确认信息后使用TCP尝试与talker建立网络连接。
7.talker向listener发布消息
成功建立连接后talker开始向listener发送话题消息数据master不再参与。
从上面的分析中可以发现前五个步骤使用的通信协议都是XMLRPC最后发布数据的过程才使用到TCP。
master只在节点建立连接的过程中起作用但是并不参与节点之间最终的数据传输。
节点在请求建立连接时会通过master.cpp文件中的execute()函数调用XMLRPC库中的函数。
我们举个例子加入talker节点要发布消息它会调用topic_manager.cpp中的TopicManager::advertise()函数在函数中会调用execute()函数该部分代码如下。 XmlRpcValue args, result, payload; args[0] this_node::getName(); args[1] ops.topic; args[2] ops.datatype; args[3] xmlrpc_manager_-getServerURI(); master::execute(registerPublisher, args, result, payload, true);其中registerPublisher就是一个远程过程调用的方法或者叫函数。节点通过这个远程过程调用向master注册表示自己要发布发消息了。
你可能会问registerPublisher方法在哪里被执行了呢我们来到ros_comm-noetic-devel\tools\rosmaster\src\rosmaster路径下打开master_api.py文件然后搜索registerPublisher这个方法就会找到对应的代码如下。
匆匆扫一眼就知道它在通知所有订阅这个消息的节点让它们做好接收消息的准备。
你可能注意到了这个被调用的XMLRPC是用python语言实现的。
也就是说XMLRPC通信时只要报文的格式是一致的不管C还是python语言都可以实现远程调用的功能。
def registerPublisher(self, caller_id, topic, topic_type, caller_api): try: self.ps_lock.acquire() self.reg_manager.register_publisher(topic, caller_id, caller_api) # dont let * type squash valid typing if topic_type ! rosgraph.names.ANYTYPE or not topic in self.topics_types: self.topics_types[topic] topic_type pub_uris self.publishers.get_apis(topic) sub_uris self.subscribers.get_apis(topic) self._notify_topic_subscribers(topic, pub_uris, sub_uris) mloginfo(PUB [%s] %s %s,topic, caller_id, caller_api) sub_uris self.subscribers.get_apis(topic) finally: self.ps_lock.release() return 1, Registered [%s] as publisher of [%s]%(caller_id, topic), sub_uris2.3 master是什么
当我们在命令行中输入roscore想启动ROS的节点管理器时背后到底发生了什么我们先用Ubuntu的which命令找找roscore这个命令在什么地方发现它位于/opt/ros/melodic/bin/roscore路径下如下图。再用file命令查看它的属性发现它是一个Python脚本。 2.3.1 roscore脚本
我们回到自己下载的源码ros_comm文件夹中找到roscore文件它在\ros_comm-melodic-devel\tools\roslaunch\scripts路径下。
虽然它是个Python脚本但是却没有.py后缀名。
用记事本打开它迎面第一句话是#!/usr/bin/env python说明这还是一个python 2版本的脚本。
我们发现这个roscore只是个空壳真正重要的只有最后一行指令如下
import roslaunchroslaunch.main([roscore, --core] sys.argv[1:])2.3.2 roslaunch模块
2.3.2.1 XMLRPC服务器如何启动
roscore调用了roslaunch.main我们继续追踪进到ros_comm-noetic-devel\tools\roslaunch\src\roslaunch文件夹中发现有个__init__.py文件说明这个文件夹是一个python包打开__init__.py文件找到def main(argvsys.argv)这就是roscore调用的函数roslaunch.main的实现如下这里只保留主要的代码不太重要的删掉了。
def main(argvsys.argv): options None logger None try: from . import rlutil parser _get_optparse() (options, args) parser.parse_args(argv[1:]) args rlutil.resolve_launch_arguments(args) write_pid_file(options.pid_fn, options.core, options.port) uuid rlutil.get_or_generate_uuid(options.run_id, options.wait_for_master) configure_logging(uuid) # #3088: dont check disk usage on remote machines if not options.child_name and not options.skip_log_check: rlutil.check_log_disk_usage()logger logging.getLogger(roslaunch) logger.info(roslaunch starting with args %s%str(argv)) logger.info(roslaunch env is %s%os.environ) if options.child_name: # 这里没执行到就不列出来了 else: logger.info(starting in server mode) # #1491 change terminal name if not options.disable_title: rlutil.change_terminal_name(args, options.core) # Read roslaunch string from stdin when - is passed as launch filename. roslaunch_strs [] # This is a roslaunch parent, spin up parent server and launch processes. # args are the roslaunch files to load from . import parent as roslaunch_parent # force a port binding spec if we are running a core if options.core: options.port options.port or DEFAULT_MASTER_PORT p roslaunch_parent.ROSLaunchParent(uuid, args, roslaunch_strsroslaunch_strs, is_coreoptions.core, portoptions.port, local_onlyoptions.local_only, verboseoptions.verbose, force_screenoptions.force_screen, force_logoptions.force_log, num_workersoptions.num_workers, timeoutoptions.timeout, master_logger_leveloptions.master_logger_level, show_summarynot options.no_summary, force_requiredoptions.force_required, sigint_timeoutoptions.sigint_timeout, sigterm_timeoutoptions.sigterm_timeout) p.start() p.spin()roslaunch.main开启了日志日志记录的信息可以帮我们了解main函数执行的顺序。
我们去Ubuntu的.ros/log/路径下打开roslaunch-ubuntu-52246.log日志文件内容如下。 通过阅读日志我们发现main函数首先检查日志文件夹磁盘占用情况如果有剩余空间就继续往下运行。
然后把运行roscore的终端的标题给改了。
再调用ROSLaunchParent类中的函数这大概就是main函数中最重要的地方了。
ROSLaunchParent类的定义是在同一路径下的parent.py文件中。为什么叫LaunchParent笔者也不清楚。
先不管它我们再看日志发现运行到了下面这个函数它打算启动XMLRPC服务器端。
所以调用的顺序是roslaunch\__init__.py文件中的main()函数调用parent.py\start()函数start()函数调用自己类中的_start_infrastructure()函数_start_infrastructure()函数调用自己类中的_start_server()函数_start_server()函数再调用server.py中的start函数。 def _start_server(self): self.logger.info(starting parent XML-RPC server) self.server roslaunch.server.ROSLaunchParentNode(self.config, self.pm) self.server.start()我们再进到server.py文件中找到ROSLaunchNode类里面的start函数又调用了父类XmlRpcNode中的start函数。
class ROSLaunchNode(xmlrpc.XmlRpcNode): Base XML-RPC server for roslaunch parent/child processes def start(self): logger.info(starting roslaunch XML-RPC server) super(ROSLaunchNode, self).start()我们来到ros_comm-noetic-devel\tools\rosgraph\src\rosgraph路径找到xmlrpc.py文件。找到class XmlRpcNode(object)类再进入start(self)函数发现它调用了自己类的run函数run函数又调用了自己类中的_run函数_run函数又调用了自己类中的_run_init()函数在这里才调用了真正起作用的ThreadingXMLRPCServer类。
因为master节点是用python实现的所以需要有python版的XMLRPC库。
幸运的是python有现成的XMLRPC库叫SimpleXMLRPCServer。SimpleXMLRPCServer已经内置到python中了无需安装。
所以ThreadingXMLRPCServer类直接继承了SimpleXMLRPCServer如下。
class ThreadingXMLRPCServer(socketserver.ThreadingMixIn, SimpleXMLRPCServer)
2.3.2.2 master如何启动
我们再来看看节点管理器master是如何被启动的再回到parent.py\start()函数如下。
我们发现它启动了XMLRPC服务器后接下来就调用了_init_runner()函数。 def start(self, auto_terminateTrue): self.logger.info(starting roslaunch parent run) # load config, start XMLRPC servers and process monitor try: self._start_infrastructure() except: self._stop_infrastructure() # Initialize the actual runner. self._init_runner() # Start the launch self.runner.launch()init_runner()函数实例化了ROSLaunchRunner类这个类的定义在launch.py里。 def _init_runner(self): self.runner roslaunch.launch.ROSLaunchRunner(self.run_id, self.config, server_uriself.server.uri, ...)实例化完成后parent.py\start()函数就调用了它的launch()函数。
我们打开launch.py文件找到launch()函数发现它又调用了自己类中的_setup()函数而_setup()函数又调用了_launch_master()函数。
看名字就能猜出来_launch_master()函数肯定是启动节点管理器master的它调用了create_master_process函数这个函数在nodeprocess.py里。
所以我们打开nodeprocess.pycreate_master_process函数使用了LocalProcess类。这个类继承自Process类。nodeprocess.py文件引用了python中用于创建新的进程的subprocess模块。
create_master_process函数实例化LocalProcess类用的是’rosmaster’即ros_comm-noetic-devel\tools\rosmaster中的包。
main.py文件中的rosmaster_main函数使用了master.py中的Master类。
Master类中又用到了master_api.py中的ROSMasterHandler类这个类包含所有的XMLRPC服务器接收的远程调用一共24个如下。
shutdown(self, caller_id, msg)getUri(self, caller_id)getPid(self, caller_id)deleteParam(self, caller_id, key)setParam(self, caller_id, key, value)getParam(self, caller_id, key)searchParam(self, caller_id, key)subscribeParam(self, caller_id, caller_api, key)unsubscribeParam(self, caller_id, caller_api, key)hasParam(self, caller_id, key)getParamNames(self, caller_id)param_update_task(self, caller_id, caller_api, param_key, param_value)_notify_topic_subscribers(self, topic, pub_uris, sub_uris)registerService(self, caller_id, service, service_api, caller_api)lookupService(self, caller_id, service)unregisterService(self, caller_id, service, service_api)registerSubscriber(self, caller_id, topic, topic_type, caller_api)unregisterSubscriber(self, caller_id, topic, caller_api)registerPublisher(self, caller_id, topic, topic_type, caller_api)unregisterPublisher(self, caller_id, topic, caller_api)lookupNode(self, caller_id, node_name)getPublishedTopics(self, caller_id, subgraph)getTopicTypes(self, caller_id) getSystemState(self, caller_id)2.3.2.1 检查log文件夹占用空间
roslaunch这个python包还负责检查保存log的文件夹有多大。在ros_comm-noetic-devel\tools\roslaunch\src\roslaunch\__init__.py文件中的main函数里有以下语句。
看名字就知道是干啥的了。
rlutil.check_log_disk_usage()
再打开同一路径下的rlutil.py发现它又调用了rosclean包中的get_disk_usage函数。
我们发现这个函数里直接写死了比较的上限disk_usage 1073741824当然这样不太好应该改为可配置的。
数字1073741824的单位是字节刚好就是1GB102 4 3 1024^31024 3byte。
我们要是想修改log文件夹报警的上限直接改这个值即可。
def check_log_disk_usage(): Check size of log directory. If high, print warning to user try: d rospkg.get_log_dir() roslaunch.core.printlog(Checking log directory for disk usage. This may take a while.\nPress Ctrl-C to interrupt) disk_usage rosclean.get_disk_usage(d) # warn if over a gig if disk_usage 1073741824: roslaunch.core.printerrlog(WARNING: disk usage in log directory [%s] is over 1GB.\nIts recommended that you use the rosclean command.%d) else: roslaunch.core.printlog(Done checking log file disk usage. Usage is 1GB.) except: pass我们刨根问底追查rosclean.get_disk_usage(d)是如何实现的。
这个rosclean包不在ros_comm里面需要单独下载。
打开后发现这个包还是跨平台的给出了Windows和Linux下的实现。
如果是Windows系统用os.path.getsize函数获取文件的大小通过os.walk函数遍历所有文件加起来就是文件夹的大小。
如果是Linux系统用Linux中的du -sb命令获取文件夹的大小。哎搞个机器人不仅要学习python还得熟悉Linux容易吗 主节点会获取用户设置的ROS_MASTER_URI变量中列出的URI地址和端口号默认为当前的本地IP和11311端口号。 3、时间
不只是机器人在任何一个系统里时间都是一个绕不开的重要话题。下面我们就从万物的起点——时间开始吧。
ROS中定义时间的程序都在roscpp_core项目下的rostime中见下图。
如果细分一下时间其实有两种一种叫“时刻”也就是某个时间点
一种叫“时段”或者时间间隔也就是两个时刻之间的部分。这两者的代码是分开实现的时刻是time时间间隔是duration。
在Ubuntu中把rostime文件夹中的文件打印出来发现确实有time和duration两类文件但是还多了个rate如下图所示。 我们还注意到里面有 CMakeLists.txt 和 package.xml 两个文件那说明rostime本身也是个ROS的package可以单独编译。
3.1 时刻time
先看下文件间的依赖关系。跟时刻有关的文件是两个time.h文件和一个time.cpp文件。
time.h给出了类的声明而impl\time.h给出了类运算符重载的实现time.cpp给出了其它函数的实现。
3.1.1 TimeBase基类
首先看time.h文件它定义了一个叫TimeBase的类。注释中说TimeBase是个基类定义了两个成员变量uint32_t sec, nsec还重载了 、− -−、 、 、 等运算符。
成员变量uint32_t sec, nsec其实就是时间的秒和纳秒两部分它们合起来构成一个完整的时刻。
至于为啥要分成两部分而不是用一个来定义可能是考虑到整数表示精度的问题。
因为32位整数最大表示的数字是2147483647。如果我们要用纳秒这个范围估计是不够的。
你可能会问机器人系统怎么会使用到纳秒这么高精度的时间分辨率毕竟控制器的定时器最高精度可能也只能到微秒
如果你做过自动驾驶项目有使用过GPS和激光雷达传感器的经验就会发现GPS的时钟精度就是纳秒级的它可以同步激光雷达每个激光点的时间戳。
还有为什么定义TimeBase这个基类原因大家很容易就能猜到。
因为在程序里时间本质上就是一个数字而已数字系统的序关系能比较大小和运算加减乘除也同样适用于时间这个东西。
当然这里只有加减没有乘除因为时间的乘除没有意义。
3.1.2 Time类
紧接着TimeBase类的是Time类它是TimeBase的子类。我们做机器人应用程序开发时用不到TimeBase基类但是Time类会经常使用。
3.1.3 now()函数
Time类比TimeBase类多了now()函数它是我们的老熟人了。在开发应用的时候我们直接用下面的代码就能得到当前的时间戳
ros::Time begin ros::Time::now(); //获取当前时间
now()函数的定义在rostime\src\time.cpp里因为它很常用很重要笔者就把代码粘贴在这里如下。
函数很简单可以看到如果定义了使用仿真时间g_use_sim_time为true那就使用仿真时间否则就使用墙上时间。 Time Time::now(){ if (!g_initialized) throw TimeNotInitializedException(); if (g_use_sim_time) { boost::mutex::scoped_lock lock(g_sim_time_mutex); Time t g_sim_time; return t; } Time t; ros_walltime(t.sec, t.nsec); return t; }在ROS里时间分成两类一种叫仿真时间一种叫墙上时间。
顾名思义墙上时间就是实际的客观世界的时间它一秒一秒地流逝谁都不能改变它让它快一点慢一点都不可能除非你有超能力。
仿真时间则是可以由你控制的让它快它就快。之所以多了一个仿真时间是因为有时我们在仿真机器人希望可以自己控制时间例如为了提高验证算法的效率让它按我们的期望速度推进。
在使用墙上时间的情况下now()函数调用了ros_walltime函数这个函数也在rostime\src\time.cpp里。
剥开层层洋葱皮最后发现这个ros_walltime函数才是真正调用操作系统时间函数的地方而且它还是个跨平台的实现Windows和Linux。
如果操作系统是Linux那它会使用clock_gettime函数在笔者使用的Ubuntu 18.04系统中这个函数在usr\include路径下。
但是万一缺少这个函数那么ROS会使用gettimeofday函数但是gettimeofday没有clock_gettime精确clock_gettime能提供纳秒的精确度。
如果操作系统是Windows那它会使用标准库STL的chrono库获取当前的时刻要用这个库只需要引用它的头文件所以在time.cpp中引用了#include。
具体使用的函数就是
std::chrono::system_clock::now().time_since_epoch()。
当然时间得是秒和纳秒的形式所以用了count方法
uint64_t now_ns std::chrono::duration_caststd::chrono::nanoseconds (std::chrono::system_clock::now().time_since_epoch()).count();3.1.4 WallTime类
后面又接着声明了WallTime类和SteadyTime类。
3.2 时间间隔duration
时间间隔duration的定义与实现跟time时刻差不多但是Duration类里的sleep()延时函数是在time.cpp里定义的其中使用了Linux操作系统的nanosleep系统调用。
这个系统调用虽然叫纳秒但实际能实现的精度也就是几十毫秒即便这样也比C语言提供的sleep函数的精度好多了。如果是Windows系统则调用STL chrono函数
std::this_thread::sleep_for(std::chrono::nanoseconds(static_castint64_t(sec * 1e9 nsec))); 3.3 频率rate 关于Rate类声明注释中是这么解释的“Class to help run loops at a desired frequency”也就是帮助程序按照期望的频率循环执行。 我们看看一个ROS节点是怎么使用Rate类的如下图所示的例子。 首先用Rate的构造函数实例化一个对象loop_rate。调用的构造函数如下。
可见构造函数使用输入完成了对三个参数的初始化。
然后在While循环里调用sleep()函数实现一段时间的延迟。
既然用到延迟所以就使用了前面的time类。
Rate::Rate(double frequency): start_(Time::now()), expected_cycle_time_(1.0 / frequency), actual_cycle_time_(0.0){ }3.4 总结
至此rostime就解释清楚了。可以看到这部分实现主要是依靠STL标准库函数比如chrono、BOOST库date_time、Linux操作系统的系统调用以及标准的C函数。这就是ROS的底层了。
说句题外话在百度Apollo自动驾驶项目中也受到了rostime的影响其cyber\time中的Time、Duration、Rate类的定义方式与rostime中的类似但是没有定义基类实现更加简单了。