00001 #ifndef SOCKETPROXY_H
00002 #define SOCKETPROXY_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "qpid/sys/Socket.h"
00025 #include "qpid/sys/Runnable.h"
00026 #include "qpid/sys/Thread.h"
00027 #include "qpid/sys/Mutex.h"
00028 #include "qpid/client/Connection.h"
00029 #include "qpid/log/Statement.h"
00030
00031 #include <algorithm>
00032
00037 class SocketProxy : private qpid::sys::Runnable
00038 {
00039 public:
00043 SocketProxy(int connectPort, const std::string host="localhost")
00044 : closed(false), port(listener.listen())
00045 {
00046 int r=::pipe(closePipe);
00047 if (r<0) throwErrno(QPID_MSG("::pipe returned " << r));
00048 client.connect(host, connectPort);
00049 thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
00050 }
00051
00052 ~SocketProxy() { close(); }
00053
00055 void close() {
00056 {
00057 qpid::sys::Mutex::ScopedLock l(lock);
00058 if (closed) return;
00059 closed=true;
00060 }
00061 write(closePipe[1], this, 1);
00062 thread.join();
00063 client.close();
00064 ::close(closePipe[0]);
00065 ::close(closePipe[1]);
00066 }
00067
00068 bool isClosed() const {
00069 qpid::sys::Mutex::ScopedLock l(lock);
00070 return closed;
00071 }
00072
00073 uint16_t getPort() const { return port; }
00074
00075 private:
00076 static void throwErrno(const std::string& msg) {
00077 throw qpid::Exception(msg+":"+qpid::strError(errno));
00078 }
00079 static void throwIf(bool condition, const std::string& msg) {
00080 if (condition) throw qpid::Exception(msg);
00081 }
00082
00083 struct FdSet : fd_set {
00084 FdSet() : maxFd(0) { clear(); }
00085 void clear() { FD_ZERO(this); }
00086 void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); }
00087 bool isSet(int fd) const { return FD_ISSET(fd, this); }
00088 bool operator[](int fd) const { return isSet(fd); }
00089
00090 int maxFd;
00091 };
00092
00093 enum { RD=1, WR=2, ER=4 };
00094
00095 struct Selector {
00096 FdSet rd, wr, er;
00097
00098 void set(int fd, int sets) {
00099 if (sets & RD) rd.set(fd);
00100 if (sets & WR) wr.set(fd);
00101 if (sets & ER) er.set(fd);
00102 }
00103
00104 int select() {
00105 for (;;) {
00106 int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd));
00107 int r = ::select(maxFd + 1, &rd, &wr, &er, NULL);
00108 if (r == -1 && errno == EINTR) continue;
00109 if (r < 0) throwErrno(QPID_MSG("select returned " <<r));
00110 return r;
00111 }
00112 }
00113 };
00114
00115 void run() {
00116 std::auto_ptr<qpid::sys::Socket> server;
00117 try {
00118
00119 Selector accept;
00120 accept.set(listener.toFd(), RD|ER);
00121 accept.set(closePipe[0], RD|ER);
00122 accept.select();
00123 throwIf(accept.rd[closePipe[0]], "Closed by close()");
00124 throwIf(!accept.rd[listener.toFd()],"Accept failed");
00125 server.reset(listener.accept(0, 0));
00126
00127
00128 char buffer[1024];
00129 for (;;) {
00130 Selector select;
00131 select.set(server->toFd(), RD|ER);
00132 select.set(client.toFd(), RD|ER);
00133 select.set(closePipe[0], RD|ER);
00134 select.select();
00135 throwIf(select.rd[closePipe[0]], "Closed by close()");
00136
00137 bool gotData=false;
00138 if (select.rd[server->toFd()] || select.er[server->toFd()]) {
00139 client.write(buffer, server->read(buffer, sizeof(buffer)));
00140 gotData=true;
00141 }
00142 if (select.rd[client.toFd()] || select.er[client.toFd()]) {
00143 server->write(buffer, client.read(buffer, sizeof(buffer)));
00144 gotData=true;
00145 }
00146 throwIf(!gotData, "No data from select()");
00147 }
00148 }
00149 catch (const std::exception& e) {
00150 QPID_LOG(debug, "SocketProxy::run exiting: " << e.what());
00151 }
00152 if (server.get()) server->close();
00153 close();
00154 }
00155
00156 mutable qpid::sys::Mutex lock;
00157 bool closed;
00158 qpid::sys::Socket client, listener;
00159 uint16_t port;
00160 int closePipe[2];
00161 qpid::sys::Thread thread;
00162 };
00163
00164 #endif