00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <config.h>
00026
00027 #include <qmutex.h>
00028 #include <qtimer.h>
00029
00030 #include "ksocketdevice.h"
00031 #include "ksocketaddress.h"
00032 #include "ksocketbuffer_p.h"
00033 #include "kbufferedsocket.h"
00034
00035 using namespace KNetwork;
00036 using namespace KNetwork::Internal;
00037
00038 class KNetwork::KBufferedSocketPrivate
00039 {
00040 public:
00041 mutable KSocketBuffer *input, *output;
00042
00043 KBufferedSocketPrivate()
00044 {
00045 input = 0L;
00046 output = 0L;
00047 }
00048 };
00049
00050 KBufferedSocket::KBufferedSocket(const QString& host, const QString& service,
00051 QObject *parent, const char *name)
00052 : KStreamSocket(host, service, parent, name),
00053 d(new KBufferedSocketPrivate)
00054 {
00055 setInputBuffering(true);
00056 setOutputBuffering(true);
00057 }
00058
00059 KBufferedSocket::~KBufferedSocket()
00060 {
00061 closeNow();
00062 delete d->input;
00063 delete d->output;
00064 delete d;
00065 }
00066
00067 void KBufferedSocket::setSocketDevice(KSocketDevice* device)
00068 {
00069 KStreamSocket::setSocketDevice(device);
00070 device->setBlocking(false);
00071 }
00072
00073 bool KBufferedSocket::setSocketOptions(int opts)
00074 {
00075 if (opts == Blocking)
00076 return false;
00077
00078 opts &= ~Blocking;
00079 return KStreamSocket::setSocketOptions(opts);
00080 }
00081
00082 void KBufferedSocket::close()
00083 {
00084 if (!d->output || d->output->isEmpty())
00085 closeNow();
00086 else
00087 {
00088 setState(Closing);
00089 QSocketNotifier *n = socketDevice()->readNotifier();
00090 if (n)
00091 n->setEnabled(false);
00092 emit stateChanged(Closing);
00093 }
00094 }
00095
00096 Q_LONG KBufferedSocket::bytesAvailable() const
00097 {
00098 if (!d->input)
00099 return KStreamSocket::bytesAvailable();
00100
00101 return d->input->length();
00102 }
00103
00104 Q_LONG KBufferedSocket::waitForMore(int msecs, bool *timeout)
00105 {
00106 Q_LONG retval = KStreamSocket::waitForMore(msecs, timeout);
00107 if (d->input)
00108 {
00109 resetError();
00110 slotReadActivity();
00111 return bytesAvailable();
00112 }
00113 return retval;
00114 }
00115
00116 Q_LONG KBufferedSocket::readBlock(char *data, Q_ULONG maxlen)
00117 {
00118 if (d->input)
00119 {
00120 if (d->input->isEmpty())
00121 {
00122 setError(IO_ReadError, WouldBlock);
00123 emit gotError(WouldBlock);
00124 return -1;
00125 }
00126 resetError();
00127 return d->input->consumeBuffer(data, maxlen);
00128 }
00129 return KStreamSocket::readBlock(data, maxlen);
00130 }
00131
00132 Q_LONG KBufferedSocket::readBlock(char *data, Q_ULONG maxlen, KSocketAddress& from)
00133 {
00134 from = peerAddress();
00135 return readBlock(data, maxlen);
00136 }
00137
00138 Q_LONG KBufferedSocket::peekBlock(char *data, Q_ULONG maxlen)
00139 {
00140 if (d->input)
00141 {
00142 if (d->input->isEmpty())
00143 {
00144 setError(IO_ReadError, WouldBlock);
00145 emit gotError(WouldBlock);
00146 return -1;
00147 }
00148 resetError();
00149 return d->input->consumeBuffer(data, maxlen, false);
00150 }
00151 return KStreamSocket::peekBlock(data, maxlen);
00152 }
00153
00154 Q_LONG KBufferedSocket::peekBlock(char *data, Q_ULONG maxlen, KSocketAddress& from)
00155 {
00156 from = peerAddress();
00157 return peekBlock(data, maxlen);
00158 }
00159
00160 Q_LONG KBufferedSocket::writeBlock(const char *data, Q_ULONG len)
00161 {
00162 if (state() != Connected)
00163 {
00164
00165 setError(IO_WriteError, NotConnected);
00166 return -1;
00167 }
00168
00169 if (d->output)
00170 {
00171 if (d->output->isFull())
00172 {
00173 setError(IO_WriteError, WouldBlock);
00174 emit gotError(WouldBlock);
00175 return -1;
00176 }
00177 resetError();
00178
00179
00180 QSocketNotifier *n = socketDevice()->writeNotifier();
00181 if (n)
00182 n->setEnabled(true);
00183
00184 return d->output->feedBuffer(data, len);
00185 }
00186
00187 return KStreamSocket::writeBlock(data, len);
00188 }
00189
00190 Q_LONG KBufferedSocket::writeBlock(const char *data, Q_ULONG maxlen,
00191 const KSocketAddress&)
00192 {
00193
00194 return writeBlock(data, maxlen);
00195 }
00196
00197 void KBufferedSocket::enableRead(bool enable)
00198 {
00199 KStreamSocket::enableRead(enable);
00200 if (!enable && d->input)
00201 {
00202
00203 QSocketNotifier *n = socketDevice()->readNotifier();
00204 if (n)
00205 n->setEnabled(true);
00206 }
00207
00208 if (enable && state() != Connected && d->input && !d->input->isEmpty())
00209
00210
00211 QTimer::singleShot(0, this, SLOT(slotReadActivity()));
00212 }
00213
00214 void KBufferedSocket::enableWrite(bool enable)
00215 {
00216 KStreamSocket::enableWrite(enable);
00217 if (!enable && d->output && !d->output->isEmpty())
00218 {
00219
00220 QSocketNotifier *n = socketDevice()->writeNotifier();
00221 if (n)
00222 n->setEnabled(true);
00223 }
00224 }
00225
00226 void KBufferedSocket::stateChanging(SocketState newState)
00227 {
00228 if (newState == Connecting || newState == Connected)
00229 {
00230
00231
00232 if (d->input)
00233 d->input->clear();
00234 if (d->output)
00235 d->output->clear();
00236
00237
00238 enableRead(emitsReadyRead());
00239 enableWrite(emitsReadyWrite());
00240 }
00241 KStreamSocket::stateChanging(newState);
00242 }
00243
00244 void KBufferedSocket::setInputBuffering(bool enable)
00245 {
00246 QMutexLocker locker(mutex());
00247 if (!enable)
00248 {
00249 delete d->input;
00250 d->input = 0L;
00251 }
00252 else if (d->input == 0L)
00253 {
00254 d->input = new KSocketBuffer;
00255 }
00256 }
00257
00258 KIOBufferBase* KBufferedSocket::inputBuffer()
00259 {
00260 return d->input;
00261 }
00262
00263 void KBufferedSocket::setOutputBuffering(bool enable)
00264 {
00265 QMutexLocker locker(mutex());
00266 if (!enable)
00267 {
00268 delete d->output;
00269 d->output = 0L;
00270 }
00271 else if (d->output == 0L)
00272 {
00273 d->output = new KSocketBuffer;
00274 }
00275 }
00276
00277 KIOBufferBase* KBufferedSocket::outputBuffer()
00278 {
00279 return d->output;
00280 }
00281
00282 Q_ULONG KBufferedSocket::bytesToWrite() const
00283 {
00284 if (!d->output)
00285 return 0;
00286
00287 return d->output->length();
00288 }
00289
00290 void KBufferedSocket::closeNow()
00291 {
00292 KStreamSocket::close();
00293 if (d->output)
00294 d->output->clear();
00295 }
00296
00297 bool KBufferedSocket::canReadLine() const
00298 {
00299 if (!d->input)
00300 return false;
00301
00302 return d->input->canReadLine();
00303 }
00304
00305 QCString KBufferedSocket::readLine()
00306 {
00307 return d->input->readLine();
00308 }
00309
00310 void KBufferedSocket::waitForConnect()
00311 {
00312 if (state() != Connecting)
00313 return;
00314
00315 KStreamSocket::setSocketOptions(socketOptions() | Blocking);
00316 connectionEvent();
00317 KStreamSocket::setSocketOptions(socketOptions() & ~Blocking);
00318 }
00319
00320 void KBufferedSocket::slotReadActivity()
00321 {
00322 if (d->input && state() == Connected)
00323 {
00324 mutex()->lock();
00325 Q_LONG len = d->input->receiveFrom(socketDevice());
00326
00327 if (len == -1)
00328 {
00329 if (socketDevice()->error() != WouldBlock)
00330 {
00331
00332 copyError();
00333 mutex()->unlock();
00334 emit gotError(error());
00335 closeNow();
00336 return;
00337 }
00338 }
00339 else if (len == 0)
00340 {
00341
00342 setError(IO_ReadError, RemotelyDisconnected);
00343 mutex()->unlock();
00344 emit gotError(error());
00345 closeNow();
00346 return;
00347 }
00348
00349
00350 mutex()->unlock();
00351 }
00352
00353 if (state() == Connected)
00354 KStreamSocket::slotReadActivity();
00355 else if (emitsReadyRead())
00356 {
00357 if (d->input && !d->input->isEmpty())
00358 {
00359
00360
00361 QTimer::singleShot(0, this, SLOT(slotReadActivity()));
00362 emit readyRead();
00363 }
00364 }
00365 }
00366
00367 void KBufferedSocket::slotWriteActivity()
00368 {
00369 if (d->output && !d->output->isEmpty() &&
00370 (state() == Connected || state() == Closing))
00371 {
00372 mutex()->lock();
00373 Q_LONG len = d->output->sendTo(socketDevice());
00374
00375 if (len == -1)
00376 {
00377 if (socketDevice()->error() != WouldBlock)
00378 {
00379
00380 copyError();
00381 mutex()->unlock();
00382 emit gotError(error());
00383 closeNow();
00384 return;
00385 }
00386 }
00387 else if (len == 0)
00388 {
00389
00390 setError(IO_ReadError, RemotelyDisconnected);
00391 mutex()->unlock();
00392 emit gotError(error());
00393 closeNow();
00394 return;
00395 }
00396
00397 if (d->output->isEmpty())
00398
00399
00400 socketDevice()->writeNotifier()->setEnabled(false);
00401
00402 mutex()->unlock();
00403 emit bytesWritten(len);
00404 }
00405
00406 if (state() != Closing)
00407 KStreamSocket::slotWriteActivity();
00408 else if (d->output && d->output->isEmpty() && state() == Closing)
00409 {
00410 KStreamSocket::close();
00411 }
00412 }
00413
00414 #include "kbufferedsocket.moc"