Apache Singa
A General Distributed Deep Learning Library
safe_queue.h
1 /************************************************************
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 *
20 *************************************************************/
21 
22 #ifndef SINGA_UTILS_SAFE_QUEUE_H_
23 #define SINGA_UTILS_SAFE_QUEUE_H_
24 
25 #include <algorithm>
26 #include <queue>
27 #include <list>
28 #include <mutex>
29 #include <condition_variable>
30 #include <thread>
31 
35 template <typename T, class Container = std::queue<T>>
36 class SafeQueue {
37  public:
38  SafeQueue() = default;
39  ~SafeQueue() {
40  std::lock_guard<std::mutex> lock(mutex_);
41  }
42 
47  bool Push(const T& e) {
48  std::lock_guard<std::mutex> lock(mutex_);
49  queue_.push(e);
50  condition_.notify_one();
51  return true;
52  }
53 
58  void Pop(T& e) {
59  std::unique_lock<std::mutex> lock(mutex_);
60  condition_.wait(lock, [this]() { return !queue_.empty(); });
61  e = queue_.front();
62  queue_.pop();
63  }
68  bool Pop(T& item, std::uint64_t timeout) {
69  std::unique_lock<std::mutex> lock(mutex_);
70 
71  if (queue_.empty()) {
72  if (timeout == 0)
73  return false;
74 
75  if (condition_.wait_for(lock, std::chrono::microseconds(timeout))
76  == std::cv_status::timeout)
77  return false;
78  }
79 
80  item = queue_.front();
81  queue_.pop();
82  return true;
83  }
84 
89  bool TryPop(T& e) {
90  std::unique_lock<std::mutex> lock(mutex_);
91 
92  if (queue_.empty())
93  return false;
94 
95  e = queue_.front();
96  queue_.pop();
97  return true;
98  }
99 
100 
104  unsigned int Size() const {
105  std::lock_guard<std::mutex> lock(mutex_);
106  return queue_.size();
107  }
108 
109  private:
110  Container queue_;
111  mutable std::mutex mutex_;
112  std::condition_variable condition_;
113 };
114 
118 template<typename T>
120  public:
121  PriorityQueue() = default;
127  bool Push(const T& e, int priority) {
128  Element ele;
129  ele.data = e;
130  ele.priority = priority;
131  queue_.push(ele);
132  return true;
133  }
134 
139  void Pop(T& e) {
140  Element ele;
141  queue_.pop(ele);
142  e = ele.data;
143  }
150  bool Pop(T& e, std::uint64_t timeout) {
151  Element ele;
152  if (queue_.pop(ele, timeout)) {
153  e = ele.data;
154  return true;
155  } else {
156  return false;
157  }
158  }
159 
164  bool TryPop(T& e) {
165  Element ele;
166  if (queue_.TryPop(ele)) {
167  e = ele.data;
168  return true;
169  } else {
170  return false;
171  }
172  }
173 
177  unsigned int Size() const {
178  return queue_.Size();
179  }
180 
181  private:
182  struct Element {
183  T data;
184  int priority;
185  inline bool operator<(const Element &other) const {
186  return priority < other.priority;
187  }
188  };
189 
191 };
192 
193 #endif // SINGA_UTILS_SAFE_QUEUE_H_
bool TryPop(T &e)
Try to pop an element from the queue.
Definition: safe_queue.h:89
bool Push(const T &e)
Push an element into the queue.
Definition: safe_queue.h:47
bool Pop(T &e, std::uint64_t timeout)
Pop the item with the highest priority from the queue until one element is poped or timeout...
Definition: safe_queue.h:150
bool Push(const T &e, int priority)
Push an element into the queue with a given priority.
Definition: safe_queue.h:127
unsigned int Size() const
Definition: safe_queue.h:177
Thread safe priority queue.
Definition: safe_queue.h:119
unsigned int Size() const
Definition: safe_queue.h:104
void Pop(T &e)
Pop an element from the queue.
Definition: safe_queue.h:58
void Pop(T &e)
Pop an element from the queue with the highest priority.
Definition: safe_queue.h:139
Thread-safe queue.
Definition: safe_queue.h:36
bool Pop(T &item, std::uint64_t timeout)
Pop an item from the queue until one element is poped or timout.
Definition: safe_queue.h:68
bool TryPop(T &e)
Try to pop an element from the queue.
Definition: safe_queue.h:164