消息队列的实现

小明 2025-05-02 12:22:12 7
8.8 消息队列

队列���一种先进先出的结构,消息队列是进程(线程)常用的一种方法,实现消息队列常用的方法:

()

(1)阻塞队列 (2)无锁队列 (3)环形队列

值得注意的是:在pop 和push 一定要使用while循环,避免虚假唤醒

()
8.8.1 阻塞队列

(1)pthread

实现阻塞队列使用生产者-消费者模式,使用步骤:

#include 
#include 
//第一步,添加成员变量
std::queue queue_;
int max_size_;
pthread_mutex_t mutex_;
pthread_cond_t condition_var_;
//第二步,实现push函数
void Push(const T& item) {
    pthread_mutex_lock(&mutex_);
    while (queue_.size() >= max_size_) {
        pthread_cond_wait(&condition_var_, &mutex_);
    }
    queue_.push(item);
    pthread_cond_signal(&condition_var_);
    pthread_mutex_unlock(&mutex_);
}
//第三步,实现pop函数
 T Pop() 
 {
     pthread_mutex_lock(&mutex_);
     while (queue_.empty()) {
         pthread_cond_wait(&condition_var_, &mutex_);
     }
     T item = queue_.front();
     queue_.pop();
     pthread_cond_signal(&condition_var_);
     pthread_mutex_unlock(&mutex_);
     return item;
  }

示例代码:

#include 
#include 
template 
class BlockingQueue {
public:
    BlockingQueue() : max_size_(100) {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&condition_var_, nullptr);
    }
    ~BlockingQueue() {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&condition_var_);
    }
    void Push(const T& item) {
        pthread_mutex_lock(&mutex_);
        while (queue_.size() >= max_size_) {
            pthread_cond_wait(&condition_var_, &mutex_);
        }
        queue_.push(item);
        pthread_cond_signal(&condition_var_);
        pthread_mutex_unlock(&mutex_);
    }
    T Pop() {
        pthread_mutex_lock(&mutex_);
        while (queue_.empty()) {
            pthread_cond_wait(&condition_var_, &mutex_);
        }
        T item = queue_.front();
        queue_.pop();
        pthread_cond_signal(&condition_var_);
        pthread_mutex_unlock(&mutex_);
        return item;
    }
private:
    std::queue queue_;
    int max_size_;
    pthread_mutex_t mutex_;
    pthread_cond_t condition_var_;
};

测试代码:

#include 
#include 
#include 
#include 
BlockingQueue queue;
void* ProducerThread(void* arg) {
    int thread_id = *static_cast(arg);
    for (int i = 1; i 
The End
微信