1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """\
19 Python Gevent based port forwarding server (openssh -L option) for the
20 proxying of graphical X2Go elements.
21
22 """
23 __NAME__ = "x2gofwtunnel-pylib"
24
25
26 import copy
27
28
29 import gevent
30 from gevent import select, socket
31 from gevent.server import StreamServer
32
33
34 import log
35 from defaults import X2GOCLIENT_OS as _X2GOCLIENT_OS
36
38 """\
39 L{X2GoFwServer} implements a gevent's StreamServer based Paramiko/SSH port
40 forwarding server.
41
42 An L{X2GoFwServer} class object is used to tunnel graphical trafic
43 through an external proxy command launched by a C{X2GoProxy*} backend.
44
45 """
46 - def __init__ (self, listener, remote_host, remote_port, ssh_transport, session_instance=None, session_name=None, logger=None, loglevel=log.loglevel_DEFAULT,):
47 """\
48 @param listener: listen on TCP/IP socket C{(<IP>, <Port>)}
49 @type listener: C{tuple}
50 @param remote_host: hostname or IP of remote host (in case of X2Go mostly 127.0.0.1)
51 @type remote_host: C{str}
52 @param remote_port: port of remote host
53 @type remote_port: C{int}
54 @param ssh_transport: a valid Paramiko/SSH transport object
55 @type ssh_transport: C{obj}
56 @param session_instance: the complete L{X2GoSession} instance of the X2Go session this port forwarding server belongs to.
57 Note: for new L{X2GoSession} instances the object has the session name not yet set(!!!)
58 @type session_instance: C{obj}
59 @param session_name: the session name of the X2Go session this port forwarding server belongs to
60 @type session_name: C{str}
61 @param logger: you can pass an L{X2GoLogger} object to the
62 L{X2GoFwServer} constructor
63 @type logger: C{obj}
64 @param loglevel: if no L{X2GoLogger} object has been supplied a new one will be
65 constructed with the given loglevel
66 @type loglevel: C{int}
67
68 """
69 if logger is None:
70 self.logger = log.X2GoLogger(loglevel=loglevel)
71 else:
72 self.logger = copy.deepcopy(logger)
73 self.logger.tag = __NAME__
74
75 self.chan = None
76 self.is_active = False
77 self.failed = False
78 self.keepalive = False
79 self.listener = listener
80 self.chain_host = remote_host
81 self.chain_port = remote_port
82 self.ssh_transport = ssh_transport
83 self.session_name = session_name
84 self.session_instance = session_instance
85
86 self.fw_socket = None
87
88 StreamServer.__init__(self, self.listener, self.x2go_forward_tunnel_handle)
89
91 """\
92 Handle for SSH/Paramiko forwarding tunnel.
93
94 @param fw_socket: local end of the forwarding tunnel
95 @type fw_socket: C{obj}
96 @param address: unused/ignored
97 @type address: C{tuple}
98
99 """
100 self.fw_socket = fw_socket
101
102
103 self.fw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
104
105 _success = False
106 _count = 0
107 _maxwait = 20
108
109 while not _success and _count < _maxwait:
110
111
112 if self.session_instance:
113 if not self.session_instance.is_connected():
114 break
115
116 _count += 1
117 try:
118 self.chan = self.ssh_transport.open_channel('direct-tcpip',
119 (self.chain_host, self.chain_port),
120 self.fw_socket.getpeername())
121 chan_peername = self.chan.getpeername()
122 _success = True
123 except Exception, e:
124 self.logger('incoming request to %s:%d failed on attempt %d of %d: %s' % (self.chain_host,
125 self.chain_port,
126 _count,
127 _maxwait,
128 repr(e)),
129 loglevel=log.loglevel_WARN)
130 gevent.sleep(.4)
131
132 if not _success:
133 self.logger('incoming request to %s:%d failed after %d attempts' % (self.chain_host,
134 self.chain_port,
135 _count),
136 loglevel=log.loglevel_ERROR)
137 if self.session_instance:
138 self.session_instance.set_session_name(self.session_name)
139 self.session_instance.HOOK_forwarding_tunnel_setup_failed(chain_host=self.chain_host, chain_port=self.chain_port)
140 self.failed = True
141
142 else:
143
144 self.logger('connected! Tunnel open %r -> %r (on master connection %r -> %r)' % (
145 self.listener, (self.chain_host, self.chain_port),
146 self.fw_socket.getpeername(), chan_peername),
147 loglevel=log.loglevel_INFO)
148
149
150 self.is_active = True
151
152 self.keepalive = True
153 try:
154 while self.keepalive:
155 r, w, x = select.select([self.fw_socket, self.chan], [], [])
156 if fw_socket in r:
157 data = fw_socket.recv(1024)
158 if len(data) == 0:
159 break
160 self.chan.send(data)
161 if self.chan in r:
162 data = self.chan.recv(1024)
163 if len(data) == 0:
164 break
165 fw_socket.send(data)
166 self.close_channel()
167 self.close_socket()
168 except socket.error:
169 pass
170
171 self.is_active = False
172 self.logger('Tunnel closed from %r' % (chan_peername,),
173 loglevel=log.loglevel_INFO)
174
176 """\
177 Close an open channel again.
178
179 """
180
181 if self.chan is not None:
182 try:
183 if _X2GOCLIENT_OS != 'Windows':
184 self.chan.close()
185 self.chan = None
186 except EOFError:
187 pass
188
190 """\
191 Close the forwarding tunnel's socket again.
192
193 """
194 _success = False
195 _count = 0
196 _maxwait = 20
197
198
199 while not _success and _count < _maxwait:
200 _count += 1
201 try:
202 self.close_channel()
203 if self.fw_socket is not None:
204 self.fw_socket.close()
205 _success = True
206 except socket.error:
207 gevent.sleep(.2)
208 self.logger('could not close fw_tunnel socket, try again (%s of %s)' % (_count, _maxwait), loglevel=log.loglevel_WARN)
209
210 if _count >= _maxwait:
211 self.logger('forwarding tunnel to [%s]:%d could not be closed properly' % (self.chain_host, self.chain_port), loglevel=log.loglevel_WARN)
212
214 """\
215 Stop the forwarding tunnel.
216
217 """
218 self.close_socket()
219 StreamServer.stop(self)
220
221
222 -def start_forward_tunnel(local_host='127.0.0.1', local_port=22022,
223 remote_host='127.0.0.1', remote_port=22,
224 ssh_transport=None,
225 session_instance=None,
226 session_name=None,
227 logger=None, ):
228 """\
229 Setup up a Paramiko/SSH port forwarding tunnel (like openssh -L option).
230
231 The tunnel is used to transport X2Go graphics data through a proxy application like nxproxy.
232
233 @param local_host: local starting point of the forwarding tunnel
234 @type local_host: C{int}
235 @param local_port: listen port of the local starting point
236 @type local_port: C{int}
237 @param remote_host: from the endpoint of the tunnel, connect to host C{<remote_host>}...
238 @type remote_host: C{str}
239 @param remote_port: ... on port C{<remote_port>}
240 @type remote_port: C{int}
241 @param ssh_transport: the Paramiko/SSH transport (i.e. the X2Go session's Paramiko/SSH transport object)
242 @type ssh_transport: C{obj}
243 @param session_instance: the L{X2GoSession} instance that initiates this tunnel
244 @type session_instance: C{obj}
245 @param session_name: the session name of the X2Go session this port forwarding server belongs to
246 @type session_name: C{str}
247 @param logger: an X2GoLogger object
248 @type logger: C{obj}
249
250 @return: returns an L{X2GoFwServer} instance
251 @rtype: C{obj}
252
253 """
254 fw_server = X2GoFwServer(listener=(local_host, local_port),
255 remote_host=remote_host, remote_port=remote_port,
256 ssh_transport=ssh_transport,
257 session_instance=session_instance, session_name=session_name,
258 logger=logger,
259 )
260 try:
261 fw_server.start()
262 except socket.error:
263 fw_server.failed = True
264 fw_server.is_active = False
265
266 return fw_server
267
269 """\
270 Tear down a given Paramiko/SSH port forwarding tunnel.
271
272 @param fw_server: an L{X2GoFwServer} instance as returned by the L{start_forward_tunnel()} function
273 @type fw_server: C{obj}
274
275 """
276 if fw_server is not None:
277 fw_server.keepalive = False
278 gevent.sleep(.5)
279 fw_server.stop()
280
281
282 if __name__ == '__main__':
283 pass
284