茶叶公司网站模板,政务网站建设云计算中心,wordpress获取部分分类,推广型网站开发公司一. 总体结构 二. 主要类的功能
2.1 TransportDescriptor和TransportInterface
FastDDS中整个Transport类的设计遵循的是设计模式中的建造者模式#xff0c;其中#xff0c;TransportDescriptor就是建造者#xff0c;而TransportInterface则是建造出来的产品。
Tra…一. 总体结构 二. 主要类的功能
2.1 TransportDescriptor和TransportInterface
FastDDS中整个Transport类的设计遵循的是设计模式中的建造者模式其中TransportDescriptor就是建造者而TransportInterface则是建造出来的产品。
TransportDescriptor是抽象类申明了后续实现类必须实现的最终要的一个函数也是建造者需要完成的主要工作就是建造出实际的Transport对象
struct RTPS_DllAPI TransportDescriptorInterface
{ .../*** Factory method pattern. It will create and return a TransportInterface* corresponding to this descriptor. This provides an interface to the NetworkFactory* to create the transports without the need to know about their type*/virtual TransportInterface* create_transport() const 0;...
}; TransportInterface定义了Transort的接口所有的Transport都必须实现该接口根据传输方式的不同有TCPTransportInterface UDPTransportInterface和SharedMemTransport而根据TCP/UDP版本的不同前两者分别有V4和V6两个派生类。 TransportInterface接口类的初衷是为了在FastRTPS中隔离传输层的实现用户需要实现基于特定传输层协议(TCP, UDP, SharedMem)的通道和对应地址之间的代码逻辑。TransportInterface接口类中申明了实现类必须实现的一些接口initIsInputChannelOpenIsLocatorSupported is_locator_allowed RemoteToMainLocaltransform_remote_locator等其中最重要的两个接口是OpenOutputChannel和OpenInputChannel这两个接口函数前者会创建SenderResource而后者创建ReceiverResource。
2.2 TCPTransportInterface和UDPTransportInterface
2.2.1 TCPTransportInterface
TCPTransportInterface定义并且实现了基于TCP传输层的通信操作主要接口都在TCPTransportInterface中另外还有一个辅助类TCPAcceptor用于实现TCP独有的连接操作接收客户端的连接。
TCPTransoprtInterface中的perform_listen_operation函数实现基于TCP传输层的数据接收操作send函数实现基于TCP传输层的数据发送操作在初始化TCPTransportInterface的时候如果发现TCPTransportDescriptor中定义了listen_port那么会额外创建TCPAcceptBasic对象:
TCPv4Transport::TCPv4Transport(const TCPv4TransportDescriptor descriptor): TCPTransportInterface(LOCATOR_KIND_TCPv4), configuration_(descriptor)
{...// 如果TCPv4TransportDescriptor配置了listening_ports监听端口则创建TCPAcceptorBasicfor (uint16_t port : configuration_.listening_ports){Locator locator(LOCATOR_KIND_TCPv4, port);create_acceptor_socket(locator);}...
} TCPAcceptBasic可以理解为对于listen socket的封装主要执行listen操作返回建立连接了的client socket给到TCPTransoprtInterface:
void TCPAcceptorBasic::accept(TCPTransportInterface* parent)
{...acceptor_.async_accept(socket_,[parent, locator, this](const std::error_code error){if (!error){auto socket std::make_sharedtcp::socket(std::move(socket_)); // 返回建立连接的客户端socketparent-SocketAccepted(socket, locator, error); //TCPTransoprtInterface根据该socket创建ChannelResource}});
}2.2.2 UDPTransportInterface
UDPTRansportInterface的接口结构比TCPTransportInterface要简单很多并且接口函数也会少许多因为没有TCP的连接操作此外UDPTransportInterface中没有实现perform_listen_operation接口UDP的perform_listen_operation接口在UDPChannelResource中实现send函数实现基于UDP传输层的数据发送操作。
2.2.3 OpenInputChannel和OpenOutputChannel
在2.1 TransportInterface中说过其最重要的接口就是OpenInputChannel和OpenOutputChannel下面以UDPTransportInterface来看下这两个接口实现了什么功能。
首先这两个接口的名字中都带有Channel但是不要被名字误导认为这两个接口都会和下面的ChannelResource打交道从代码看只有InputChannel才会创建ChannelResource而OutputChannel创建的是SenderResource。
OpenInputChannel
bool UDPv4Transport::OpenInputChannel(const Locator locator,TransportReceiverInterface* receiver,uint32_t maxMsgSize) {...if (!IsInputChannelOpen(locator)){success OpenAndBindInputSockets(locator, receiver, IPLocator::isMulticast(locator), maxMsgSize);}... // 如果是组播地址下面还会创建额外的ChannelResource同时将网卡加入组播地址
}bool UDPTransportInterface::OpenAndBindInputSockets(...) {...std::vectorstd::string vInterfaces get_binding_interfaces_list(); // 获取白名单网卡列表for (std::string sInterface : vInterfaces){ // 在每张网卡上创建InputChannelResource一般来说用户会通过白名单限制用于DDS通信的网卡不太会出现在多个网卡上创建的情况UDPChannelResource* p_channel_resource;p_channel_resource CreateInputChannelResource(sInterface, locator, is_multicast, maxMsgSize, receiver);mInputSockets[IPLocator::getPhysicalPort(locator)].push_back(p_channel_resource);}...
}UDPChannelResource* UDPTransportInterface::CreateInputChannelResource(...) {// 创建接收数据用的socketeProsimaUDPSocket unicastSocket OpenAndBindInputSocket(sInterface,IPLocator::getPhysicalPort(locator), is_multicast);// 创建ChannelResourceChannelResource包装传入的socket并且对外提供接收数据的统一接口UDPChannelResource* p_channel_resource new UDPChannelResource(this, unicastSocket, maxMsgSize, locator,sInterface, receiver);return p_channel_resource;
}OpenOutputChannel
bool UDPTransportInterface::OpenOutputChannel(SendResourceList sender_resource_list, // 函数中创建的SenderResource放在这个list中返回给上层const Locator locator)
{...if (is_interface_whitelist_empty()) { ... } // 没有配置网卡白名单会在0.0.0.0地址上创建socket和SenderResourceelse {// 获取网卡列表(需要剔除已经在locator参数上创建SenderResource的网卡)get_unknown_network_interfaces(sender_resource_list, locNames, true);for (const auto infoIP : locNames){ // 创建用于发送数据的socketeProsimaUDPSocket unicastSocket OpenAndBindUnicastOutputSocket(generate_endpoint(infoIP.name, port), port);SetSocketOutboundInterface(unicastSocket, infoIP.name); // 设置多播数据的发送接口...sender_resource_list.emplace_back( // 创建SenderResource用于包装上面创建的发送socket的操作接口static_castSenderResource*(new UDPSenderResource(*this, unicastSocket, false, true)));}}...
}2.3 ChannelResource, TCPChannelResource, UDPChannelResource
ChannelResource包装了用于接口数据的socket并且对外提供相对统一的操作接口针对不同的传输类型还是有所区别的 ChannelResource是接口类主要提供了用于接收数据使用的线程成员thread_以及保存接收到的消息的message_buffer_成员后续可以看到transportinterface的perform_listen_operation函数就是在这个thread_成员上运行的。
class ChannelResource {...inline void thread(std::thread pThread){// 初始化接收数据的线程thread_...}...
};UDPChannelResource::UDPChannelResource(...): ChannelResource(maxMsgSize)...
{ //指定thread_线程运行perform_listen_operation函数thread(std::thread(UDPChannelResource::perform_listen_operation, this, locator));
} TCPChannelResource和UDPChannelResource两者都包含了用于接收数据使用的socket对象除此以外两者的接口差距还是挺大的TCP和UDP的通信流程本来就有区别TCPChannelResource的接口比较多实现也很复杂并且不属于DDS协议中约定的默认通信方式因此不需要太关注TCP的视线。
这里主要来看UDPChannelResource的接口UDPChannelResource的接口很简单就是perform_listen_operation
void UDPChannelResource::perform_listen_operation(Locator input_locator) {// set thread name for debugpthread_setname_np(pthread_self(), UDPChannel); // thread_的线程名称为UDPChannelLocator remote_locator;while (alive()){// Blocking receive.auto msg message_buffer();if (!Receive(msg.buffer, msg.max_size, msg.length, remote_locator)) // Receive函数调用socket的recv_from接收数据阻塞式{continue;}// 将收到的消息交个MessageReceiver去处理通过message_receiver接口主导到UDPChannelResource中...}
}2.4 SenderResource, UDPSenderResource
SenderResource从名字看就是用于发送数据的SenderResource是接口类其中最重要的一个接口就是sendsend内部调用了send_lambda_这个std::function类型的成员send_lambda_成员有SenderResource的派生类来实现 对于UDPSenderResource来说send_lambda_函数对象依赖UDPTransportInterface的接口来完成最终的数据发送
send_lambda_ [this, transport](...) - bool{ // 调用UDPTransportInterface::send接口 并且将socket对象传入return transport.send(data, dataSize, socket_, destination_locators_begin,destination_locators_end, only_multicast_purpose_, whitelisted_,max_blocking_time_point);};[!TIP] 读到这段代码的时候不太理解为什么不在UDPSenderResource的send_lambda_中直接实现send的逻辑代码反而要依赖UDPTransportInterface来实现。个人理解是不是因为SenderResource这个类主要功能还是体现在Resource上说明其只是一个用来保管发送socket的资源类而已个人猜想。 RTPSParticipantImpl是SenderResource的容器保存了其创建的所有SenderResource在发送数据的时候会调用这些SenderResource的接口
bool sendSync(...) {...for (auto send_resource : send_resource_list_) // send_resource_list_中保存了该Participant创建的所有SenderResource{...send_resource-send(msg-buffer, msg-length, locators_begin, locators_end, max_blocking_time_point);}...
} 第三章节中的3.2发送数据中会说明SenderResource的send接口是怎么被调用到的调用栈是如何的。
2.5 ReceiverResource, MessageReciever
ReceiverResource实现TransportReceiverInterface接口TransportReceiverInterface这个接口的用途从名字看就是用于接收Transport传输层收到数据的接口其实现了TransoprtReceiverInterface接口中的OnDataReceived函数接口OnDataReceived函数接口在UDPChannelResource的数据接收接口perform_listen_operation函数中被调用
void UDPChannelResource::perform_listen_operation(Locator input_locator) {...// Processes the data through the CDR Message interface.if (message_receiver() ! nullptr){message_receiver()-OnDataReceived(msg.buffer, msg.length, input_locator, remote_locator);}...
} ReceiverResource的OnDataReceived接口实现非常简单就是将Transoprt收到的CDRMessage交给MessageReceiverMessageReceiver会对该CDRMessage进行进一步细化的处理看过DDS协议文档的应该知道一个完整的DDS Message包括Header和Submessages两部分其中Submessages中包含不止一种子消息HeartBeatGAPTimestampData DataFrag等等。 而MessageReceiver的作用就是解析ReceiverResource传递过来的CDRMessage解析其中的Header以及各个子消息可以看到MessageReceiver中定义了各种proess函数用来处理不同的子消息 这些proc函数中最重要的一个函数是process_data_message_without_security假设没有开启TLS功能开启的话就是with_security该函数将Data子消息中的payload交给RTPSReader进行处理
void MessageReceiver::process_data_message_without_security(const EntityId_t reader_id,CacheChange_t change)
{auto process_message [change](RTPSReader* reader){reader-processDataMsg(change);};findAllReaders(reader_id, process_message); // 找到对应EntityID的RTPSReader调用其processDataMsg处理Data子消息
}2.6 LocatorSelector LocatorSelectorSender和LocatorSelectorEntry
LocatorSelectorSender: Class used by writers to inform a RTPSMessageGroup object which remote participants will be addressees of next RTPS submessages. (Writer使用LocatorSelectSender告知RTPSMessageGroup下一包要发送的RTPS消息要发给哪些远端Particiant)
LocatorSelector: A class used for the efficient selection of locators when sending data to multiple entities. (当发送数据给不同的远端Reader时该类可以协助选择合适的Locator地址进行发送)
LocatorSelectorEntry: This class holds the locators of a remote endpoint along with data required for the locator selection algorithm. (该类报错了某个远端Endpiont的地址信息地址信息回被用在locator选择上)
LocatorSelector内部对于每一个匹配的远端RTPSReader都维护了其Locator信息在LocatorSelectorEntry中。然后LocatorSelector是LocatorSelectorSender的内部成员而LocatorSelectorSender是每个StatelessWriter/StatefulWriter的内部成员当Writer匹配到远端Reader的时候会将远端Reader的Locator信息以LocatorSelectorEntry的方式更新到LocatorSelectorSender的LocatorSelector成员中
bool StatelessWriter::matched_reader_add(...) {...filter_remote_locators(*new_reader-general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);locator_selector_.locator_selector.add_entry(new_reader-general_locator_selector_entry());...
} 而在RTPSWriter发送RTPSMessage的时候则可以根据locator_selector_.locator_selector找到目前还存活的匹配的远端RTPSRerader的LocatorSelectorEntry取出其中的Locator作为消息的发送目的地
DeliveryRetCode StatelessWriter::deliver_sample_nts(...) {...locator_selector.locator_selector.reset(true); //重新选择目前还存活的RTPSReader的LocatorSelectorEntry作为目标地址集合size_t num_locators locator_selector.locator_selector.selected_size() fixed_locators_.size();if (0 num_locators) {...if (group.add_data(*cache_change, is_inline_qos_expected_)) { // 向RTPSMessageGroup添加Data_Submessage...}}
}bool LocatorSelectorSender::send(CDRMessage_t* message,std::chrono::steady_clock::time_point max_blocking_time_point) const
{return writer_.send_nts(message, *this, max_blocking_time_point);
}bool RTPSWriter::send_nts(...) {RTPSParticipantImpl* participant getRTPSParticipant();// 向LocatorSelectorSender.LocatorSelector中每一个有效的RTPSReader的Locator发送消息return locator_selector.locator_selector.selected_size() 0 ||participant-sendSync(message, m_guid, locator_selector.locator_selector.begin(),locator_selector.locator_selector.end(), max_blocking_time_point);
}三. 主要流程
3.1 初始化
初始化流程中RTPSParticipantImpl和各种Endpoint会创建各个SenderResource/ReceiverResource这里主要梳理一下会创建哪些Resource在什么时候创建的 对于RecevierResource创建的socket其端口一般是有固定的计算规则的根据domainidparticipantid以及是地址是否为组播地址可以算出固定的端口。
对于SenderResource创建的socket则是随机绑定未使用的端口而且因为每个RTPSWriter知道匹配的Reader信息保存在ReaderProxy/ReaderLocator中,因此socket绑定的IP都是本地网卡的IP然后发送的时候使用sendto发送到Reader的地址和端口上去就行了。
3.2 发送数据
从RTPS层看发送数据时我们一般向RTPSWriter索取一个CacheChange然后将要发送的数据填充到CacheChange的payload中最后将这个CacheChange加入到WriterHistory中。 从我们将数据填充到CacheChange到通过SenderResource关联的socket发送到对端的大致流程如下
3.3 接收数据
接收数据的工作从UDPChannelResource的thread_线程开始的前面说到过UDPChannelResource的thread_线程被用来运行perform_listen_operation函数该函数中调用关联的socket执行receive_from操作从绑定的Locator上读取数据读取到的数据通过MessageReceiverRTPSParticipantImpl最终到达RTPSReader手上