dataqueue.cc

Go to the documentation of this file.
00001 ///
00002 /// \file       dataqueue.cc
00003 ///             FIFO queue of Data objects
00004 ///
00005 
00006 /*
00007     Copyright (C) 2008-2009, Net Direct Inc. (http://www.netdirect.ca/)
00008 
00009     This program is free software; you can redistribute it and/or modify
00010     it under the terms of the GNU General Public License as published by
00011     the Free Software Foundation; either version 2 of the License, or
00012     (at your option) any later version.
00013 
00014     This program is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
00017 
00018     See the GNU General Public License in the COPYING file at the
00019     root directory of this project for more details.
00020 */
00021 
00022 #include "dataqueue.h"
00023 #include "scoped_lock.h"
00024 #include "data.h"
00025 #include <sys/time.h>
00026 #include <time.h>
00027 
00028 namespace Barry {
00029 
00030 //////////////////////////////////////////////////////////////////////////////
00031 // DataQueue class
00032 
00033 DataQueue::DataQueue()
00034 {
00035         pthread_mutex_init(&m_waitMutex, NULL);
00036         pthread_cond_init(&m_waitCond, NULL);
00037 
00038         pthread_mutex_init(&m_accessMutex, NULL);
00039 }
00040 
00041 DataQueue::~DataQueue()
00042 {
00043         scoped_lock lock(m_accessMutex);        // FIXME - is this sane?
00044 
00045         while( m_queue.size() ) {
00046                 delete m_queue.front();
00047                 m_queue.pop();
00048         }
00049 }
00050 
00051 //
00052 // push
00053 //
00054 /// Pushes data into the end of the queue.
00055 ///
00056 /// The queue owns this pointer as soon as the function is
00057 /// called.  In the case of an exception, it will be freed.
00058 /// Performs a thread broadcast once new data has been added.
00059 ///
00060 void DataQueue::push(Data *data)
00061 {
00062         try {
00063 
00064                 {
00065                         scoped_lock lock(m_accessMutex);
00066                         m_queue.push(data);
00067                 }
00068 
00069                 scoped_lock wait(m_waitMutex);
00070                 pthread_cond_broadcast(&m_waitCond);
00071 
00072         }
00073         catch(...) {
00074                 delete data;
00075                 throw;
00076         }
00077 }
00078 
00079 //
00080 // pop
00081 //
00082 /// Pops the next element off the front of the queue.
00083 ///
00084 /// Returns 0 if empty.
00085 /// The queue no longer owns this pointer upon return.
00086 ///
00087 Data* DataQueue::pop()
00088 {
00089         scoped_lock lock(m_accessMutex);
00090 
00091         if( m_queue.size() == 0 )
00092                 return 0;
00093 
00094         Data *ret = m_queue.front();
00095         m_queue.pop();
00096         return ret;
00097 }
00098 
00099 //
00100 // wait_pop
00101 //
00102 /// Pops the next element off the front of the queue, and
00103 /// waits until one exists if empty.  If still no data
00104 /// on timeout, returns null.
00105 /// (unlock the access mutex while waiting!)
00106 ///
00107 /// Timeout specified in milliseconds.  Default is wait forever.
00108 ///
00109 Data* DataQueue::wait_pop(int timeout)
00110 {
00111         Data *ret = 0;
00112 
00113         // check if something's there already
00114         {
00115                 scoped_lock access(m_accessMutex);
00116                 if( m_queue.size() ) {
00117                         ret = m_queue.front();
00118                         m_queue.pop();
00119                         return ret;
00120                 }
00121         }
00122 
00123         // nothing there, so wait...
00124 
00125         if( timeout == -1 ) {
00126                 // no timeout
00127                 int size = 0;
00128                 do {
00129                         {
00130                                 scoped_lock wait(m_waitMutex);
00131                                 pthread_cond_wait(&m_waitCond, &m_waitMutex);
00132                         }
00133 
00134                         // anything there?
00135                         scoped_lock access(m_accessMutex);
00136                         size = m_queue.size();
00137                         if( size != 0 ) {
00138                                 // already have the lock, return now
00139                                 ret = m_queue.front();
00140                                 m_queue.pop();
00141                                 return ret;
00142                         }
00143 
00144                 } while( size == 0 );
00145         }
00146         else {
00147                 // timeout in conditional wait
00148                 struct timeval now;
00149                 struct timespec to;
00150 
00151                 gettimeofday(&now, NULL);
00152                 to.tv_sec = now.tv_sec + timeout / 1000;
00153                 to.tv_nsec = (now.tv_usec + timeout % 1000 * 1000) * 1000;
00154 
00155                 scoped_lock wait(m_waitMutex);
00156                 pthread_cond_timedwait(&m_waitCond, &m_waitMutex, &to);
00157         }
00158 
00159         scoped_lock access(m_accessMutex);
00160         if( m_queue.size() == 0 )
00161                 return 0;
00162 
00163         ret = m_queue.front();
00164         m_queue.pop();
00165         return ret;
00166 }
00167 
00168 //
00169 // append_from
00170 //
00171 /// Pops all data from other and appends it to this.
00172 ///
00173 /// After calling this function, other will be empty, and
00174 /// this will contain all its data.
00175 ///
00176 /// In the case of an exception, any uncopied data will
00177 /// remain in other.
00178 ///
00179 /// This is a locking optimization, so all copying can happen
00180 /// inside one lock, instead of locking for each copy.
00181 ///
00182 void DataQueue::append_from(DataQueue &other)
00183 {
00184         scoped_lock us(m_accessMutex);
00185         scoped_lock them(other.m_accessMutex);
00186 
00187         while( other.m_queue.size() ) {
00188                 m_queue.push( other.m_queue.front() );
00189 
00190                 // only pop after the copy, since in the
00191                 // case of an exception we want to leave other intact
00192                 other.m_queue.pop();
00193         }
00194 }
00195 
00196 //
00197 // empty
00198 //
00199 /// Returns true if the queue is empty.
00200 ///
00201 bool DataQueue::empty() const
00202 {
00203         scoped_lock access(m_accessMutex);
00204         return m_queue.size() == 0;
00205 }
00206 
00207 //
00208 // size
00209 //
00210 /// Returns number of items in the queue.
00211 ///
00212 size_t DataQueue::size() const
00213 {
00214         scoped_lock access(m_accessMutex);
00215         return m_queue.size();
00216 }
00217 
00218 } // namespace Barry
00219 
00220 
00221 #ifdef __DQ_TEST_MODE__
00222 
00223 #include <iostream>
00224 
00225 using namespace std;
00226 using namespace Barry;
00227 
00228 void *WriteThread(void *userdata)
00229 {
00230         DataQueue *dq = (DataQueue*) userdata;
00231 
00232         dq->push( new Data );
00233         dq->push( new Data );
00234         sleep(5);
00235         dq->push( new Data );
00236 
00237         return 0;
00238 }
00239 
00240 void *ReadThread(void *userdata)
00241 {
00242         DataQueue *dq = (DataQueue*) userdata;
00243 
00244         sleep(1);
00245         if( Data *d = dq->pop() ) {
00246                 cout << "Received via pop: " << d << endl;
00247                 delete d;
00248         }
00249         else {
00250                 cout << "No data in the queue yet." << endl;
00251         }
00252 
00253         while( Data *d = dq->wait_pop(5010) ) {
00254                 cout << "Received: " << d << endl;
00255                 delete d;
00256         }
00257         return 0;
00258 }
00259 
00260 int main()
00261 {
00262         DataQueue from;
00263         from.push( new Data );
00264 
00265         DataQueue dq;
00266         dq.append_from(from);
00267 
00268         pthread_t t1, t2;
00269         pthread_create(&t1, NULL, &ReadThread, &dq);
00270         pthread_create(&t2, NULL, &WriteThread, &dq);
00271 
00272         pthread_join(t2, NULL);
00273         pthread_join(t1, NULL);
00274 }
00275 
00276 #endif
00277 

Generated on Mon Jan 12 10:51:12 2009 for Barry by  doxygen 1.5.7.1