10 #ifndef __PARSEBUFFER_H__ 11 #define __PARSEBUFFER_H__ 13 #include <shogun/lib/config.h> 15 #if defined(HAVE_CXX11) || defined(HAVE_PTHREAD) 21 #include <condition_variable> 34 enum E_IS_EXAMPLE_USED
77 template <
class T>
class CParseBuffer:
public CSGObject
85 CParseBuffer(int32_t size = 1024);
99 Example<T>* get_free_example()
102 std::unique_lock<std::mutex> write_lk(*write_mutex, std::defer_lock);
103 std::unique_lock<std::mutex> current_ex_lock(*ex_in_use_mutex[ex_write_index], std::defer_lock);
104 std::lock(write_lk, current_ex_lock);
105 while (ex_used[ex_write_index] == E_NOT_USED)
106 ex_in_use_cond[ex_write_index]->wait(current_ex_lock);
107 Example<T>* ex=&ex_ring[ex_write_index];
109 pthread_mutex_lock(write_lock);
110 pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]);
111 while (ex_used[ex_write_index] == E_NOT_USED)
112 pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
113 Example<T>* ex=&ex_ring[ex_write_index];
114 pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]);
115 pthread_mutex_unlock(write_lock);
129 int32_t write_example(Example<T>* ex);
136 Example<T>* return_example_to_read();
143 Example<T>* get_unused_example();
153 int32_t copy_example(Example<T>* ex);
162 void finalize_example(
bool free_after_release);
173 void set_free_vectors_on_destruct(
bool destroy)
175 free_vectors_on_destruct = destroy;
182 bool get_free_vectors_on_destruct()
184 return free_vectors_on_destruct;
192 virtual const char* get_name()
const {
return "ParseBuffer"; }
204 virtual void inc_read_index()
206 ex_read_index=(ex_read_index + 1) % ring_size;
213 virtual void inc_write_index()
215 ex_write_index=(ex_write_index + 1) % ring_size;
226 E_IS_EXAMPLE_USED* ex_used;
228 std::vector<std::shared_ptr<std::mutex> > ex_in_use_mutex;
231 std::vector<std::shared_ptr<std::condition_variable> > ex_in_use_cond;
233 std::shared_ptr<std::mutex> read_mutex;
235 std::shared_ptr<std::mutex> write_mutex;
237 pthread_mutex_t* ex_in_use_mutex;
240 pthread_cond_t* ex_in_use_cond;
242 pthread_mutex_t* read_lock;
244 pthread_mutex_t* write_lock;
248 int32_t ex_write_index;
250 int32_t ex_read_index;
253 bool free_vectors_on_destruct;
257 template <
class T>
void CParseBuffer<T>::init_vector()
259 if (!free_vectors_on_destruct)
261 for (int32_t i=0; i<ring_size; i++)
263 if(ex_ring[i].fv==NULL)
264 ex_ring[i].fv =
new T();
268 template <
class T> CParseBuffer<T>::CParseBuffer(int32_t size)
271 ex_ring = SG_CALLOC(Example<T>, ring_size);
272 ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);
274 read_mutex = std::make_shared<std::mutex>();
275 write_mutex = std::make_shared<std::mutex>();
277 ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size);
278 ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size);
279 read_lock = SG_MALLOC(pthread_mutex_t, 1);
280 write_lock = SG_MALLOC(pthread_mutex_t, 1);
283 SG_SINFO(
"Initialized with ring size: %d.\n", ring_size)
288 for (int32_t i=0; i<ring_size; i++)
290 ex_used[i] = E_EMPTY;
292 ex_ring[i].fv = NULL;
293 ex_ring[i].length = 1;
294 ex_ring[i].label = FLT_MAX;
297 ex_in_use_mutex.push_back(std::make_shared<std::mutex>());
298 ex_in_use_cond.push_back(std::make_shared<std::condition_variable>());
299 #elif defined(HAVE_PTHREAD) 300 pthread_cond_init(&ex_in_use_cond[i], NULL);
301 pthread_mutex_init(&ex_in_use_mutex[i], NULL);
304 #if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11) 305 pthread_mutex_init(read_lock, NULL);
306 pthread_mutex_init(write_lock, NULL);
309 free_vectors_on_destruct =
true;
312 template <
class T> CParseBuffer<T>::~CParseBuffer()
314 for (int32_t i=0; i<ring_size; i++)
316 if (ex_ring[i].fv != NULL && free_vectors_on_destruct)
318 SG_DEBUG(
"%s::~%s(): destroying examples ring vector %d at %p\n",
319 get_name(), get_name(), i, ex_ring[i].fv);
320 delete ex_ring[i].fv;
322 #if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11) 323 pthread_mutex_destroy(&ex_in_use_mutex[i]);
324 pthread_cond_destroy(&ex_in_use_cond[i]);
330 ex_in_use_mutex.clear();
331 ex_in_use_cond.clear();
335 SG_FREE(ex_in_use_mutex);
336 SG_FREE(ex_in_use_cond);
344 int32_t CParseBuffer<T>::write_example(Example<T> *ex)
346 ex_ring[ex_write_index].label = ex->label;
347 ex_ring[ex_write_index].fv = ex->fv;
348 ex_ring[ex_write_index].length = ex->length;
349 ex_used[ex_write_index] = E_NOT_USED;
356 Example<T>* CParseBuffer<T>::return_example_to_read()
358 if (ex_read_index >= 0)
359 return &ex_ring[ex_read_index];
365 Example<T>* CParseBuffer<T>::get_unused_example()
368 std::lock_guard<std::mutex> read_lk(*read_mutex);
370 pthread_mutex_lock(read_lock);
374 int32_t current_index = ex_read_index;
378 std::lock_guard<std::mutex> current_ex_lk(*ex_in_use_mutex[current_index]);
380 pthread_mutex_lock(&ex_in_use_mutex[current_index]);
383 if (ex_used[current_index] == E_NOT_USED)
384 ex = return_example_to_read();
388 #if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11) 389 pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
390 pthread_mutex_unlock(read_lock);
396 int32_t CParseBuffer<T>::copy_example(Example<T> *ex)
399 std::lock_guard<std::mutex> write_lk(*write_mutex);
401 pthread_mutex_lock(write_lock);
404 int32_t current_index = ex_write_index;
407 std::unique_lock<std::mutex> current_ex_lock(*ex_in_use_mutex[current_index]);
409 pthread_mutex_lock(&ex_in_use_mutex[current_index]);
411 while (ex_used[ex_write_index] == E_NOT_USED)
414 ex_in_use_cond[ex_write_index]->wait(current_ex_lock);
416 pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
420 ret = write_example(ex);
422 #if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11) 423 pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
424 pthread_mutex_unlock(write_lock);
431 void CParseBuffer<T>::finalize_example(
bool free_after_release)
434 std::lock_guard<std::mutex> read_lk(*read_mutex);
435 std::unique_lock<std::mutex> current_ex_lock(*ex_in_use_mutex[ex_read_index]);
437 pthread_mutex_lock(read_lock);
438 pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]);
440 ex_used[ex_read_index] = E_USED;
442 if (free_after_release)
444 SG_DEBUG(
"Freeing object in ring at index %d and address: %p.\n",
445 ex_read_index, ex_ring[ex_read_index].fv);
447 SG_FREE(ex_ring[ex_read_index].fv);
448 ex_ring[ex_read_index].fv=NULL;
452 ex_in_use_cond[ex_read_index]->notify_one();
453 current_ex_lock.unlock();
455 pthread_cond_signal(&ex_in_use_cond[ex_read_index]);
456 pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]);
460 #if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11) 461 pthread_mutex_unlock(read_lock);
466 #endif // defined(HAVE_CXX11) || defined(HAVE_PTHREAD) 467 #endif // __PARSEBUFFER_H__
all of classes and functions are contained in the shogun namespace