您现在的位置: 万盛学电脑网 >> 程序编程 >> 服务器教程 >> 正文

linux中编写并发队列类

作者:佚名    责任编辑:admin    更新时间:2022-06-22

 这篇文章主要介绍了linux中编写并发队列类,功能有:并发阻塞队列、有超时限制、有大小限制

设计并发队列   代码如下: #include <pthread.h> #include <list> using namespace std;   template <typename T> class Queue  {  public:      Queue( )      {          pthread_mutex_init(&_lock, NULL);      }      ~Queue( )      {          pthread_mutex_destroy(&_lock);     }      void push(const T& data);     T pop( );  private:      list<T> _list;      pthread_mutex_t _lock; };   template <typename T> void Queue<T>::push(const T& value )  {      pthread_mutex_lock(&_lock);     _list.push_back(value);     pthread_mutex_unlock(&_lock); }   template <typename T> T Queue<T>::pop( )  {      if (_list.empty( ))      {          throw "element not found";     }     pthread_mutex_lock(&_lock);      T _temp = _list.front( );     _list.pop_front( );     pthread_mutex_unlock(&_lock);     return _temp; }       上述代码是有效的。但是,请考虑这样的情况:您有一个很长的队列(可能包含超过 100,000 个元素),而且在代码执行期间的某个时候,从队列中读取数据的线程远远多于添加数据的线程。因为添加和取出数据操作使用相同的互斥锁,所以读取数据的速度会影响写数据的线程访问锁。那么,使用两个锁怎么样?一个锁用于读取操作,另一个用于写操作。给出修改后的 Queue 类。   代码如下: template <typename T> class Queue  {  public:      Queue( )      {          pthread_mutex_init(&_rlock, NULL);          pthread_mutex_init(&_wlock, NULL);     }      ~Queue( )      {          pthread_mutex_destroy(&_rlock);         pthread_mutex_destroy(&_wlock);     }      void push(const T& data);     T pop( );  private:      list<T> _list;      pthread_mutex_t _rlock, _wlock; };     template <typename T> void Queue<T>::push(const T& value )  {      pthread_mutex_lock(&_wlock);     _list.push_back(value);     pthread_mutex_unlock(&_wlock); }   template <typename T> T Queue<T>::pop( )  {      if (_list.empty( ))      {          throw "element not found";     }     pthread_mutex_lock(&_rlock);     T _temp = _list.front( );     _list.pop_front( );     pthread_mutex_unlock(&_rlock);     return _temp; }       设计并发阻塞队列   目前,如果读线程试图从没有数据的队列读取数据,仅仅会抛出异常并继续执行。但是,这种做法不总是我们想要的,读线程很可能希望等待(即阻塞自身),直到有数据可用时为止。这种队列称为阻塞的队列。如何让读线程在发现队列是空的之后等待?一种做法是定期轮询队列。但是,因为这种做法不保证队列中有数据可用,它可能会导致浪费大量 CPU 周期。推荐的方法是使用条件变量,即 pthread_cond_t 类型的变量。    代码如下: template <typename T> class BlockingQueue  {  public:      BlockingQueue ( )      {          pthread_mutexattr_init(&_attr);          // set lock recursive         pthread_mutexattr_settype(&_attr,PTHREAD_MUTEX_RECURSIVE_NP);          pthread_mutex_init(&_lock,&_attr);         pthread_cond_init(&_cond, NULL);     }      ~BlockingQueue ( )      {          pthread_mutex_destroy(&_lock);         pthread_cond_destroy(&_cond);     }      void push(const T& data);     bool push(const T& data, const int seconds); //time-out push     T pop( );     T pop(const int seconds); // time-out pop   private:      list<T> _list;      pthread_mutex_t _lock;     pthread_mutexattr_t _attr;     pthread_cond_t _cond; };   template <typename T> T BlockingQueue<T>::pop( )  {      pthread_mutex_lock(&_lock);     while (_list.empty( ))      {          pthread_cond_wait(&_cond, &_lock) ;     }     T _temp = _list.front( );     _list.pop_front( );     pthread_mutex_unlock(&_lock);     return _temp; }   template <typename T> void BlockingQueue <T>::push(const T& value )  {      pthread_mutex_lock(&_lock);     const bool was_empty = _list.empty( );     _list.push_back(value);     pthread_mutex_unlock(&_lock);     if (was_empty)          pthread_cond_broadcast(&_cond); }       并发阻塞队列设计有两个要注意的方面:   1.可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 会释放至少一个等待条件变量的线程,这个线程不一定是等待时间最长的读线程。尽管使用 pthread_cond_signal 不会损害阻塞队列的功能,但是这可能会导致某些读线程的等待时间过长。   2.可能会出现虚假的线程唤醒。因此,在唤醒读线程之后,要确认列表非空,然后再继续处理。强烈建议使用基于 while 循环的 pop()。   设计有超时限制的并发阻塞队列   在许多系统中,如果无法在特定的时间段内处理新数据,就根本不处理数据了。例如,新闻频道的自动收报机显示来自金融交易所的实时股票行情,它每 n 秒收到一次新数据。如果在 n 秒内无法处理以前的一些数据,就应该丢弃这些数据并显示最新的信息。根据这个概念,我们来看看如何给并发队列的添加和取出操作增加超时限制。这意味着,如果系统无法在指定的时间限制内执行添加和取出操作,就应该根本不执行操作。     代码如下: template <typename T> bool BlockingQueue <T>::push(const T& data, const int seconds)  {     struct timespec ts1, ts2;     const bool was_empty = _list.empty( );     clock_gettime(CLOCK_REALTIME, &ts1);     pthread_mutex_lock(&_lock);     clock_gettime(CLOCK_REALTIME, &ts2);     if ((ts2.tv_sec – ts1.tv_sec) <seconds)      {         was_empty = _list.empty( );         _list.push_back(value);     }     pthread_mutex_unlock(&_lock);     if (was_empty)          pthread_cond_broadcast(&_cond); }   template <typename T> T BlockingQueue <T>::pop(const int seconds)  {      struct timespec ts1, ts2;      clock_gettime(CLOCK_REALTIME, &ts1);      pthread_mutex_lock(&_lock);     clock_gettime(CLOCK_REALTIME, &ts2);       // First Check: if time out when get the _lock      if ((ts1.tv_sec – ts2.tv_sec) < seconds)      {          ts2.tv_sec += seconds; // specify wake up time         while(_list.empty( ) && (result == 0))          {              result = pthread_cond_ti