1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#ifndef SHARED_QUEUE_H
6#define SHARED_QUEUE_H
7
8#include <pthread.h>
9#include <cassert>
10#include <deque>
11
12// This file provides a queue that uses a mutex and condition variable so that
13// one thread can put pointers into the queue and another thread can pull items
14// out of the queue.
15
16// Specifies whether we want to wait for the queue.
17enum QueueWaitingFlag {
18  kWait = 0,
19  kDontWait
20};
21
22// Indicates if we got an item, did not wait, or if the queue was cancelled.
23enum QueueGetResult {
24  kReturnedItem = 0,
25  kDidNotWait = 1,
26  kQueueWasCancelled
27};
28
29// A simple scoped mutex lock.
30// For most cases, pp::AutoLock in "ppapi/utility/threading/lock.h" can be
31// used; LockingQueue needs to use the pthread_mutex_t directly in
32// pthread_cond_wait so we reimplement a scoped lock here.
33class ScopedLock {
34 public:
35  explicit ScopedLock(pthread_mutex_t* mutex) : mutex_(mutex) {
36    const int kPthreadMutexSuccess = 0;
37    if (pthread_mutex_lock(mutex_) != kPthreadMutexSuccess) {
38      mutex_ = NULL;
39    }
40  }
41  ~ScopedLock() {
42    if (mutex_ != NULL) {
43      pthread_mutex_unlock(mutex_);
44    }
45  }
46
47 private:
48  pthread_mutex_t* mutex_;  // Weak reference, passed in to constructor.
49
50  // Disable copy and assign.
51  ScopedLock& operator=(const ScopedLock&);
52  ScopedLock(const ScopedLock&);
53};
54
55// LockingQueue contains a collection of <T>, such as a collection of
56// objects or pointers.  The Push() method is used to add items to the
57// queue in a thread-safe manner.  The GetItem() is used to retrieve
58// items from the queue in a thread-safe manner.
59template <class T> class LockingQueue {
60 public:
61  LockingQueue() : quit_(false) {
62    int result = pthread_mutex_init(&queue_mutex_, NULL);
63    assert(result == 0);
64    result = pthread_cond_init(&queue_condition_var_, NULL);
65    assert(result == 0);
66  }
67  ~LockingQueue() { pthread_mutex_destroy(&queue_mutex_); }
68
69  // The producer (who instantiates the queue) calls this to tell the
70  // consumer that the queue is no longer being used.
71  void CancelQueue() {
72    ScopedLock scoped_mutex(&queue_mutex_);
73    quit_ = true;
74    // Signal the condition var so that if a thread is waiting in
75    // GetItem the thread will wake up and see that the queue has
76    // been cancelled.
77    pthread_cond_signal(&queue_condition_var_);
78  }
79
80  // The consumer calls this to see if the queue has been cancelled by
81  // the producer.  If so, the thread should not call GetItem and may
82  // need to terminate -- i.e. in a case where the producer created
83  // the consumer thread.
84  bool IsCancelled() {
85    ScopedLock scoped_mutex(&queue_mutex_);
86    return quit_;
87  }
88
89  // Grabs the mutex and pushes a new item to the end of the queue if the
90  // queue is not full.  Signals the condition variable so that a thread
91  // that is waiting will wake up and grab the item.
92  void Push(const T& item) {
93    ScopedLock scoped_mutex(&queue_mutex_);
94    the_queue_.push_back(item);
95    pthread_cond_signal(&queue_condition_var_);
96  }
97
98  // Tries to pop the front element from the queue; returns an enum:
99  //  kReturnedItem if an item is returned in |item_ptr|,
100  //  kDidNotWait if |wait| was kDontWait and the queue was empty,
101  //  kQueueWasCancelled if the producer called CancelQueue().
102  // If |wait| is kWait, GetItem will wait to return until the queue
103  // contains an item (unless the queue is cancelled).
104  QueueGetResult GetItem(T* item_ptr, QueueWaitingFlag wait) {
105    ScopedLock scoped_mutex(&queue_mutex_);
106    // Use a while loop to get an item. If the user does not want to wait,
107    // we will exit from the loop anyway, unlocking the mutex.
108    // If the user does want to wait, we will wait for pthread_cond_wait,
109    // and the while loop will check is_empty_no_locking() one more
110    // time so that a spurious wake-up of pthread_cond_wait is handled.
111    // If |quit_| has been set, break out of the loop.
112    while (!quit_ && is_empty_no_locking()) {
113      // If user doesn't want to wait, return...
114      if (kDontWait == wait) {
115        return kDidNotWait;
116      }
117      // Wait for signal to occur.
118      pthread_cond_wait(&queue_condition_var_, &queue_mutex_);
119    }
120    // Check to see if quit_ woke us up
121    if (quit_) {
122      return kQueueWasCancelled;
123    }
124
125    // At this point, the queue was either not empty or, if it was empty,
126    // we called pthread_cond_wait (which released the mutex, waited for the
127    // signal to occur, and then atomically reacquired the mutex).
128    // Thus, if we are here, the queue cannot be empty because we either
129    // had the mutex and verified it was not empty, or we waited for the
130    // producer to put an item in and signal a single thread (us).
131    T& item = the_queue_.front();
132    *item_ptr = item;
133    the_queue_.pop_front();
134    return kReturnedItem;
135  }
136
137 private:
138  std::deque<T> the_queue_;
139  bool quit_;
140  pthread_mutex_t queue_mutex_;
141  pthread_cond_t queue_condition_var_;
142
143  // This is used by methods that already have the lock.
144  bool is_empty_no_locking() const { return the_queue_.empty(); }
145};
146
147#endif  // SHARED_QUEUE_H
148