dedecms建手机网站流程,网站跳出率多少正常,怎么注册微信公众号,网页设计与制作考试试题及答案使用共享内存(内存映射)实现发布订阅模式
多进程实现PubSub发布订阅模式#xff0c;从而实现进程间的通信。通信方式可以是TCP/UDP#xff0c;管道Pipe/消息队列#xff0c;共享内存shared memory等等。其中TCP/UDP的方式是可以用作局域网以及跨平台的通信#xff0c;Pipe…使用共享内存(内存映射)实现发布订阅模式
多进程实现PubSub发布订阅模式从而实现进程间的通信。通信方式可以是TCP/UDP管道Pipe/消息队列共享内存shared memory等等。其中TCP/UDP的方式是可以用作局域网以及跨平台的通信Pipe/消息队列是进程间基于系统实现比较基础的通信这两者有大量优秀的第三方库支持如ZeroMQ只要加入我们自定义数据的转换方式即可方便实现而共享内存是实现进程间通信最快的方式但因为共享内存的设计并不是用来做类似PubSub这种模式的实现的并且共享内存实质上就是一段进程间共享的内存空间使用自由度是极高的所以也很少有第三方库来实现共享内存方式的进程间通信。因此本文的重点是如何使用共享内存shared memory来实现高效的PubSub发布订阅模式。
需求
消息通过事先分配好的共享内存空间来传递需要有一定的机制来管理消息的发送写和接收读需要实现发布订阅模式也就是一个发布者一写多个订阅者多读考虑到平台的原因最后采用了文件映射内存的这种方式在各种系统中都有比较通用的实现
逻辑分析
显然只要创建了一个文件并且设置好需要的大小即可以使用mmap映射到进程的内存空间并且在退出时可以用munmap将映射释放掉。但是空间真正的释放是要把文件删掉的因此我们需要一个计数器来记录使用这块共享内存的进程数类似共享指针shared_ptr的实现在计数为零时把文件删掉。在修改这个计数的时候还需要一把进程间读写锁对于只有单个订阅者数据之后包含一个标志位发布者写完后置为true订阅者读完之后置为false可能再加上一个信号灯的控制来避免频繁读写对于多个订阅者数据中的这个标志位变成一个计数发布者写完之后将计数器置为订阅者的数量订阅者读完之后将计数器减1再加上一个进程条件变量的控制来避免频繁读写。这两种方案都有一定的弊端最大的问题在于订阅者还需要修改共享内存的内容这样就发挥不出读写锁支持多读的优势了。我们需要一个更好的机制。一个简单的实现是数据中带有一个单调递增的标签订阅者读到数据后本地保存一下这个标签的值如果下次读到的这个值不比保存的值大就认为读到了旧数据忽略之。这个标签比较好的实现是用当前的系统时间而不是计数因为发布者可能会重启清零就算重启后可以从已经写入的数据中读取但后面为了实现无锁队列会让这个事情变得麻烦。这样还有一个问题是依然会频繁地去读取这个标签。因此需要加入进程条件变量的控制来减少这种频繁。接下来是2实现消息发送写和接收读的管理。因为我们已经有了一把读写锁很自然地想到可以用它来管理读写啊。事实上并不是这样因为发布者写完数据之后可能会有一段时间不会占有写锁这时候就要一种机制来限制订阅者不会重复来读这个数据。对于这个实现已有的方案有对于每一个订阅者都开辟一块共享内存可以按一对一的方式同时复制多份数据使用生产消费模式使用循环队列来实现读写分离。第1种方案是解决了读写锁争抢的问题但是增加了内存复制的开销反而没有第2种方案好。但是我们要稍微修改一下传统的生产消费模式的实现只用一个指针来指向最新的数据。之所以这样做是因为内存是事先分配好的我们把它改造成环形的内存缓冲区很难保证数据读取的序列性再者就是循环的尾指针应该由订阅者自己来维护因为每个订阅者处理的速度是不一样的。如此一来所有数据的修改完全是由发布者来做的也就是说对于订阅者来说这是个无锁队列
代码实现
#include iostream
#include cstring
#include vector
#include functional
#include memory
#include sys/mman.h
#include atomic
#include thread
#include sys/stat.h
#include unistd.h
#include sys/types.h
#include fcntl.hstruct ShmData{bool written_;long timestamp_;size_t size_;char data_[1];ShmData():written_(false){}void Write(const char *data,const size_t len){written_ false;memcpy(data_,data,len);size_ len;timestamp_ GetTimestamp();written_ true;}bool Read(std::vectorchar* data,long* time nullptr){if (!written_){return false;}if (time){*time timestamp_;}data-resize(size_);memcpy(data-data(),data_,size_);return true;}static long GetTimestamp(){struct timespec ts;clock_gettime(CLOCK_REALTIME,ts);return ts.tv_sec * 1000000 ts.tv_nsec / 1000;}
};struct ShmQueue{size_t size_;int count_;int head_;char data_[1];ShmQueue(const size_t size,const int count):size_(sizeof(ShmData) size),count_(count),head_(0){new(data_)ShmData;}void Write(const char* data,const size_t len){const int next (head_ 1) % count_;(reinterpret_castShmData *(data_ next * size_))-Write(data,len);head_ next;}bool Read(std::vectorchar*data,long* time){return (reinterpret_castShmData *(data_ head_ * size_))-Read(data,time);}
};struct ShmSlice{int attached_;pthread_rwlock_t rwlock_;pthread_mutex_t mutex_;pthread_cond_t cond_;char data_[1];ShmSlice(const size_t size,const int count,const bool init false){if (init){//init rwlockpthread_rwlockattr_t rwattr;pthread_rwlockattr_init(rwattr);pthread_rwlockattr_setpshared(rwattr,PTHREAD_PROCESS_SHARED);pthread_rwlock_init(rwlock_,rwattr);//init mutexpthread_mutexattr_t mattr;pthread_mutexattr_init(mattr);pthread_mutexattr_setpshared(mattr,PTHREAD_PROCESS_SHARED);pthread_mutex_init(mutex_,mattr);//init condition variablepthread_condattr_t cattr;pthread_condattr_init(cattr);pthread_condattr_setpshared(cattr,PTHREAD_PROCESS_SHARED);pthread_cond_init(cond_,cattr);//init shm queuenew(data_)ShmQueue(size,count);}LockWrite();if (init){attached_ 1;} else{attached_;}UnLockWrite();}~ShmSlice(){LockWrite();UnLockWrite();if (0 attached_){pthread_cond_destroy(cond_);pthread_mutex_destroy(mutex_);pthread_rwlock_destroy(rwlock_);}}int count(){LockRead();const int count attached_;UnlockRead();return count;}void Write(const char* data,const size_t len){LockWrite();(reinterpret_castShmQueue*(data_))-Write(data,len);UnLockWrite();}bool Read(std::vectorchar *data,long* time){return (reinterpret_castShmQueue *(data_))-Read(data,time);}void LockWrite(){pthread_rwlock_wrlock(rwlock_);}void UnLockWrite(){pthread_rwlock_unlock(rwlock_);}void LockRead(){pthread_rwlock_rdlock(rwlock_);}void UnlockRead(){pthread_rwlock_unlock(rwlock_);}void LockMutex(){while (EOWNERDEAD pthread_mutex_lock(mutex_)){UnlockMutex();}}void UnlockMutex(){pthread_mutex_unlock(mutex_);}void NotifyOne(){pthread_cond_signal(cond_);}void NotifyAll(){pthread_cond_broadcast(cond_);}void wait(){LockMutex();pthread_cond_wait(cond_,mutex_);UnlockMutex();}bool WaitFor(struct timespec *ts,const std::functionbool()cond){if (cond cond()){return true;}LockMutex();pthread_cond_timedwait(cond_,mutex_,ts);UnlockMutex();bool ret;if (cond){ret cond();} else{struct timespec now;clock_gettime(CLOCK_REALTIME,now);ret now.tv_sec ts-tv_sec || (now.tv_sec ts-tv_sec now.tv_nsec ts-tv_nsec);}return ret;}
};class ShmManger{
public:ShmManger(std::string file_name,const int size): name_(std::move(file_name)),size_(sizeof(ShmSlice) sizeof(ShmQueue) 3 * (sizeof(ShmData) size)){bool init false;//open file descriptorint fd open(name_.c_str(),O_RDWR | O_CREAT | O_EXCL,0600);if(fd 0){fd open(name_.c_str(),O_RDWR,0600);}else{//set file sizestruct stat fs;fstat(fd,fs);if (fs.st_size 1){ftruncate(fd,size_);}init true;}//mmapvoid *shmaddr mmap(NULL,size_,PROT_READ | PROT_WRITE,MAP_SHARED,fd,0);new (shmaddr) ShmSlice(size,3,init);auto deleter [](ShmSlice *ptr){ptr-~ShmSlice();};slice_ std::shared_ptrShmSlice(reinterpret_castShmSlice *(shmaddr),deleter);close(fd);}~ShmManger(){running_ false;slice_-NotifyAll();if (read_thread_.joinable()){read_thread_.join();}const int count slice_-count();auto ptr slice_.get();slice_.reset();if(count 1){//unmapmunmap(ptr,size_);} else{//remove fileremove(name_.c_str());}}void Publish(const std::vectorchar data){slice_-Write(data.data(),data.size());slice_-NotifyAll();}void Subscribe(std::functionvoid (const std::vectorchar)callback){callback std::move(callback);running_ true;read_thread_ std::thread(ShmManger::ReadThread,this);}private:void ReadThread(){long read_time 0;while (running_){std::vectorchar data;long time;struct timespec ts;clock_gettime(CLOCK_REALTIME,ts);ts.tv_sec 5;if (!slice_-WaitFor(ts,[]{return slice_-Read(data,time) time read_time;})){continue;}read_time time;//deal with datacallback_(data);}}std::string name_;int size_;std::shared_ptrShmSliceslice_;std::functionvoid(const std::vectorchar)callback_;std::atomic_bool running_;std::thread read_thread_;
};
int main() {std::cout Hello, World! std::endl;return 0;
}参考链接
共享内存一写多读无锁实现共享内存消息队列【转载】同步和互斥的POSIX支持(互斥锁条件变量自旋锁)Linux线程-互斥锁pthread_mutex_tC语言open()函数打开文件函数C语言mmap()函数建立内存映射