条件变量是线程之前同步的另一种机制。条件变量给多线程提供了一种会和的场所。当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生。这样大大减少了锁竞争引起的线程调度和线程等待。
     消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错。博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲、互斥锁和条件变量的消息队列;这个大概也参考了一下java的blockingqueue,在前面一个博客中有简单介绍!!基于三缓冲的队列,虽然最大限度上解除了线程竞争,但是在玩家很少,消息很小的时候,需要添加一些buff去填充数据,这大概也是其一个缺陷吧!
     消息队列在服务器开发过程中主要用于什么对象呢?
     1: 我想大概就是通信层和逻辑层之间的交互,通信层接受到的网络数据,验证封包之后,通过消息队列传递给逻辑层,逻辑层将处理结果封包再传递给通信层!
     2:逻辑线程和数据库IO线程的分离;数据库IO线程负责对数据库的读写更新,逻辑层对数据库的操作,封装成消息去请求数据库IO线程,数据库IO线程处理完之后,再交回给逻辑层。
     3:日志;处理模式与方式2 类似。不过日志大概是不需要返回的!
给出源代码:
BlockingQueue.h文件
/* 
 * BlockingQueue.h 
 * 
 *  Created on: Apr 19, 2013 
 *      Author: archy_yu 
 */
#ifndef BLOCKINGQUEUE_H_ 
#define BLOCKINGQUEUE_H_ 
#include <queue> 
#include <pthread.h> 
typedef void* CommonItem; 
class BlockingQueue 
{ 
public: 
    BlockingQueue(); 
    virtual ~BlockingQueue(); 
    int peek(CommonItem &item); 
    int append(CommonItem item); 
private: 
    pthread_mutex_t _mutex; 
    pthread_cond_t _cond; 
    std::queue<CommonItem> _read_queue; 
    std::queue<CommonItem> _write_queue; 
}; 
  
#endif /* BLOCKINGQUEUE_H_ */
BlockingQueue.cpp 文件代码
/* 
 * BlockingQueue.cpp 
 * 
 *  Created on: Apr 19, 2013 
 *      Author: archy_yu 
 */
#include "BlockingQueue.h" 
BlockingQueue::BlockingQueue() 
{ 
    pthread_mutex_init(&this->_mutex,NULL); 
    pthread_cond_init(&this->_cond,NULL); 
} 
BlockingQueue::~BlockingQueue() 
{ 
    pthread_mutex_destroy(&this->_mutex); 
    pthread_cond_destroy(&this->_cond); 
} 
int BlockingQueue::peek(CommonItem &item) 
{ 
    if( !this->_read_queue.empty() ) 
    { 
        item = this->_read_queue.front(); 
        this->_read_queue.pop(); 
    } 
    else
    { 
        pthread_mutex_lock(&this->_mutex); 
        while(this->_write_queue.empty()) 
        { 
            pthread_cond_wait(&this->_cond,&this->_mutex); 
        } 
        while(!this->_write_queue.empty()) 
        { 
            this->_read_queue.push(this->_write_queue.front()); 
            this->_write_queue.pop(); 
        } 
        pthread_mutex_unlock(&this->_mutex); 
    } 
  
    return 0; 
} 
int BlockingQueue::append(CommonItem item) 
{ 
    pthread_mutex_lock(&this->_mutex); 
    this->_write_queue.push(item); 
    pthread_cond_signal(&this->_cond); 
    pthread_mutex_unlock(&this->_mutex); 
    return 0; 
}
测试代码:
BlockingQueue _queue; 
void* process(void* arg) 
{ 
    int i=0; 
    while(true) 
    { 
        int *j = new int(); 
        *j = i; 
        _queue.append((void *)j); 
        i ++; 
    } 
    return NULL; 
} 
int main(int argc,char** argv) 
{ 
    pthread_t pid; 
    pthread_create(&pid,0,process,0); 
    long long int start = get_os_system_time(); 
    int i = 0; 
    while(true) 
    { 
        int* j = NULL; 
        _queue.peek((void* &)j); 
        i ++; 
        if(j != NULL && (*j) == 100000) 
        { 
            long long int end = get_os_system_time(); 
            printf("consume %d\n",end - start); 
            break; 
        } 
    } 
    return 0; 
}