1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """\
19 Python Gevent based port forwarding server (openssh -L option) for the
20 proxying of graphical X2Go elements.
21
22 """
23 __NAME__ = "x2gofwtunnel-pylib"
24
25
26 import copy
27
28
29 import gevent
30 from gevent import select, socket
31 from gevent.server import StreamServer
32
33
34 import log
35 from defaults import X2GOCLIENT_OS as _X2GOCLIENT_OS
36
38 """\
39 L{X2GoFwServer} implements a gevent's StreamServer based Paramiko/SSH port
40 forwarding server.
41
42 An L{X2GoFwServer} class object is used to tunnel graphical trafic
43 through an external proxy command launched by a C{X2GoProxy*} backend.
44
45 """
46 - def __init__ (self, listener,
47 remote_host, remote_port,
48 ssh_transport, session_instance=None, session_name=None,
49 subsystem=None, logger=None, loglevel=log.loglevel_DEFAULT,):
50 """\
51 @param listener: listen on TCP/IP socket C{(<IP>, <Port>)}
52 @type listener: C{tuple}
53 @param remote_host: hostname or IP of remote host (in case of X2Go mostly 127.0.0.1)
54 @type remote_host: C{str}
55 @param remote_port: port of remote host
56 @type remote_port: C{int}
57 @param ssh_transport: a valid Paramiko/SSH transport object
58 @type ssh_transport: C{obj}
59 @param session_instance: the complete L{X2GoSession} instance of the X2Go session this port forwarding server belongs to.
60 Note: for new L{X2GoSession} instances the object has the session name not yet set(!!!)
61 @type session_instance: C{obj}
62 @param session_name: the session name of the X2Go session this port forwarding server belongs to
63 @type session_name: C{str}
64 @param logger: you can pass an L{X2GoLogger} object to the
65 L{X2GoFwServer} constructor
66 @type logger: C{obj}
67 @param loglevel: if no L{X2GoLogger} object has been supplied a new one will be
68 constructed with the given loglevel
69 @type loglevel: C{int}
70
71 """
72 if logger is None:
73 self.logger = log.X2GoLogger(loglevel=loglevel)
74 else:
75 self.logger = copy.deepcopy(logger)
76 self.logger.tag = __NAME__
77
78 self.chan = None
79 self.is_active = False
80 self.failed = False
81 self.keepalive = None
82 self.listener = listener
83 self.chain_host = remote_host
84 self.chain_port = remote_port
85 self.ssh_transport = ssh_transport
86 self.session_name = session_name
87 self.session_instance = session_instance
88 self.subsystem = subsystem
89
90 self.fw_socket = None
91
92 StreamServer.__init__(self, self.listener, self.x2go_forward_tunnel_handle)
93
95 self.keepalive = True
96 return StreamServer.start(self)
97
99 """\
100 Handle for SSH/Paramiko forwarding tunnel.
101
102 @param fw_socket: local end of the forwarding tunnel
103 @type fw_socket: C{obj}
104 @param address: unused/ignored
105 @type address: C{tuple}
106
107 """
108 self.fw_socket = fw_socket
109
110
111 self.fw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
112
113 _success = False
114 _count = 0
115 _maxwait = 20
116
117 while not _success and _count < _maxwait and self.keepalive:
118
119 _count += 1
120 try:
121 self.chan = self.ssh_transport.open_channel('direct-tcpip',
122 (self.chain_host, self.chain_port),
123 self.fw_socket.getpeername())
124 chan_peername = self.chan.getpeername()
125 _success = True
126 except Exception, e:
127 if self.keepalive:
128 self.logger('incoming request to %s:%d failed on attempt %d of %d: %s' % (self.chain_host,
129 self.chain_port,
130 _count,
131 _maxwait,
132 repr(e)),
133 loglevel=log.loglevel_WARN)
134 gevent.sleep(.4)
135
136 if not _success:
137 if self.keepalive:
138 self.logger('incoming request to %s:%d failed after %d attempts' % (self.chain_host,
139 self.chain_port,
140 _count),
141 loglevel=log.loglevel_ERROR)
142 if self.session_instance:
143 self.session_instance.set_session_name(self.session_name)
144 self.session_instance.HOOK_forwarding_tunnel_setup_failed(chain_host=self.chain_host, chain_port=self.chain_port, subsystem=self.subsystem)
145 self.failed = True
146
147 else:
148 self.logger('connected! Tunnel open %r -> %r (on master connection %r -> %r)' % (
149 self.listener, (self.chain_host, self.chain_port),
150 self.fw_socket.getpeername(), chan_peername),
151 loglevel=log.loglevel_INFO)
152
153 self.is_active = True
154
155 try:
156 while self.keepalive:
157 r, w, x = select.select([self.fw_socket, self.chan], [], [])
158 if fw_socket in r:
159 data = fw_socket.recv(1024)
160 if len(data) == 0:
161 break
162 self.chan.send(data)
163 if self.chan in r:
164 data = self.chan.recv(1024)
165 if len(data) == 0:
166 break
167 fw_socket.send(data)
168 self.close_channel()
169 self.close_socket()
170 except socket.error:
171 pass
172
173 self.logger('Tunnel closed from %r' % (chan_peername,),
174 loglevel=log.loglevel_INFO)
175
177 """\
178 Close an open channel again.
179
180 """
181
182 if self.chan is not None:
183 try:
184 if _X2GOCLIENT_OS != 'Windows':
185 self.chan.close()
186 self.chan = None
187 except EOFError:
188 pass
189
191 """\
192 Close the forwarding tunnel's socket again.
193
194 """
195 _success = False
196 _count = 0
197 _maxwait = 20
198
199
200 while not _success and _count < _maxwait:
201 _count += 1
202 try:
203 self.close_channel()
204 if self.fw_socket is not None:
205 self.fw_socket.close()
206 _success = True
207 except socket.error:
208 gevent.sleep(.2)
209 self.logger('could not close fw_tunnel socket, try again (%s of %s)' % (_count, _maxwait), loglevel=log.loglevel_WARN)
210
211 if _count >= _maxwait:
212 self.logger('forwarding tunnel to [%s]:%d could not be closed properly' % (self.chain_host, self.chain_port), loglevel=log.loglevel_WARN)
213
215 """\
216 Stop the forwarding tunnel.
217
218 """
219 self.is_active = False
220 self.close_socket()
221 StreamServer.stop(self)
222
223
224 -def start_forward_tunnel(local_host='127.0.0.1', local_port=22022,
225 remote_host='127.0.0.1', remote_port=22,
226 ssh_transport=None,
227 session_instance=None,
228 session_name=None,
229 subsystem=None,
230 logger=None, ):
231 """\
232 Setup up a Paramiko/SSH port forwarding tunnel (like openssh -L option).
233
234 The tunnel is used to transport X2Go graphics data through a proxy application like nxproxy.
235
236 @param local_host: local starting point of the forwarding tunnel
237 @type local_host: C{int}
238 @param local_port: listen port of the local starting point
239 @type local_port: C{int}
240 @param remote_host: from the endpoint of the tunnel, connect to host C{<remote_host>}...
241 @type remote_host: C{str}
242 @param remote_port: ... on port C{<remote_port>}
243 @type remote_port: C{int}
244 @param ssh_transport: the Paramiko/SSH transport (i.e. the X2Go session's Paramiko/SSH transport object)
245 @type ssh_transport: C{obj}
246 @param session_instance: the L{X2GoSession} instance that initiates this tunnel
247 @type session_instance: C{obj}
248 @param session_name: the session name of the X2Go session this port forwarding server belongs to
249 @type session_name: C{str}
250 @param subsystem: a custom string with a component name that tries to evoke a new tunnel setup
251 @type subsystem: C{str}
252 @param logger: an X2GoLogger object
253 @type logger: C{obj}
254
255 @return: returns an L{X2GoFwServer} instance
256 @rtype: C{obj}
257
258 """
259 fw_server = X2GoFwServer(listener=(local_host, local_port),
260 remote_host=remote_host, remote_port=remote_port,
261 ssh_transport=ssh_transport,
262 session_instance=session_instance, session_name=session_name,
263 subsystem=subsystem,
264 logger=logger,
265 )
266 try:
267 fw_server.start()
268 except socket.error:
269 fw_server.failed = True
270 fw_server.is_active = False
271
272 return fw_server
273
275 """\
276 Tear down a given Paramiko/SSH port forwarding tunnel.
277
278 @param fw_server: an L{X2GoFwServer} instance as returned by the L{start_forward_tunnel()} function
279 @type fw_server: C{obj}
280
281 """
282 if fw_server is not None:
283 fw_server.keepalive = False
284 gevent.sleep(.5)
285 fw_server.stop()
286
287
288 if __name__ == '__main__':
289 pass
290