1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 from urllib2 import urlparse
23
24 from twisted.internet import protocol, reactor, address, error, defer
25
26 from twisted.spread import pb
27 from twisted.cred import portal
28
29 from flumotion.common import medium, log, messages
30 from flumotion.twisted import credentials, fdserver, checkers
31 from flumotion.twisted import reflect
32
33 from flumotion.component import component
34 from flumotion.component.component import moods
35
36 import socket, string, os, random
37
38 from flumotion.common.messages import N_
39 T_ = messages.gettexter('flumotion')
40
42 """
43 An Avatar in the porter representing a streamer
44 """
45 - def __init__(self, avatarId, porter, mind):
52
54 return self.mind != None
55
58
62
66
70
74
76 """
77 A Realm within the Porter that creates Avatars for streamers logging into
78 the porter.
79 """
80 __implements__ = portal.IRealm
81
83 """
84 @param porter: The porter that avatars created from here should use.
85 @type porter: L{Porter}
86 """
87 self.porter = porter
88
97
99
101 """
102 Return the location, login username/password, and listening port
103 and interface for the porter as a tuple (path, username,
104 password, port, interface).
105 """
106 return (self.comp._socketPath, self.comp._username,
107 self.comp._password, self.comp._iptablesPort,
108 self.comp._interface)
109
110 -class Porter(component.BaseComponent, log.Loggable):
111 """
112 The porter optionally sits in front of a set of streamer components.
113 The porter is what actually deals with incoming connections on a TCP socket.
114 It decides which streamer to direct the connection to, then passes the FD
115 (along with some amount of already-read data) to the appropriate streamer.
116 """
117
118 componentMediumClass = PorterMedium
119
121
122
123 self._mappings = {}
124 self._prefixes = {}
125
126 self._socketlistener = None
127
128 self._socketPath = None
129 self._username = None
130 self._password = None
131 self._port = None
132 self._iptablesPort = None
133 self._porterProtocol = None
134
135 self._interface = ''
136
138 """
139 Register a path as being served by a streamer represented by this
140 avatar. Will remove any previous registration at this path.
141
142 @param path: The path to register
143 @type path: str
144 @param avatar: The avatar representing the streamer to direct this path
145 to
146 @type avatar: L{PorterAvatar}
147 """
148 self.debug("Registering porter path \"%s\" to %r" % (path, avatar))
149 if self._mappings.has_key(path):
150 self.warning("Replacing existing mapping for path \"%s\"" % path)
151
152 self._mappings[path] = avatar
153
155 """
156 Attempt to deregister the given path. A deregistration will only be
157 accepted if the mapping is to the avatar passed.
158
159 @param path: The path to deregister
160 @type path: str
161 @param avatar: The avatar representing the streamer being deregistered
162 @type avatar: L{PorterAvatar}
163 """
164 if self._mappings.has_key(path):
165 if self._mappings[path] == avatar:
166 self.debug("Removing porter mapping for \"%s\"" % path)
167 del self._mappings[path]
168 else:
169 self.warning("Mapping not removed: refers to a different avatar")
170 else:
171 self.warning("Mapping not removed: no mapping found")
172
174 """
175 Register a destination for all requests directed to anything beginning
176 with a specified prefix. Where there are multiple matching prefixes, the
177 longest is selected.
178
179 @param avatar: The avatar being registered
180 @type avatar: L{PorterAvatar}
181 """
182
183 self.debug("Setting prefix \"%s\" for porter", prefix)
184 if prefix in self._prefixes:
185 self.warning("Overwriting prefix")
186
187 self._prefixes[prefix] = avatar
188
190 """
191 Attempt to deregister a default destination for all requests not
192 directed to a specifically-mapped path. This will only succeed if the
193 default is currently equal to this avatar.
194
195 @param avatar: The avatar being deregistered
196 @type avatar: L{PorterAvatar}
197 """
198 if prefix not in self._prefixes:
199 self.warning("Mapping not removed: no mapping found")
200 return
201
202 if self._prefixes[prefix] == avatar:
203 self.debug("Removing prefix destination from porter")
204 del self._prefixes[prefix]
205 else:
206 self.warning("Not removing prefix destination: expected avatar not found")
207
209 found = None
210
211 for prefix in self._prefixes.keys():
212 self.debug("Checking: %r, %r" % (prefix, path))
213 if (path.startswith(prefix) and (not found or len(found) < len(prefix))):
214 found = prefix
215 if found:
216 return self._prefixes[found]
217 else:
218 return None
219
221 """
222 Find a destination Avatar for this path.
223 @returns: The Avatar for this mapping, or None.
224 """
225
226 if self._mappings.has_key(path):
227 return self._mappings[path]
228 else:
229 return self.findPrefixMatch(path)
230
231
233 """
234 Generate a socket pathname in an appropriate location
235 """
236
237
238 import tempfile
239 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.')
240 os.close(fd)
241
242 return name
243
245 """
246 Generate a random US-ASCII string of length numchars
247 """
248 str = ""
249 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
250 for _ in range(numchars):
251 str += chars[random.randint(0, len(chars)-1)]
252
253 return str
254
256 props = self.config['properties']
257
258 self.fixRenamedProperties(props,
259 [('socket_path', 'socket-path')])
260
261
262
263
264
265 if props.has_key('socket-path'):
266
267 self._socketPath = props['socket-path']
268 self._username = props['username']
269 self._password = props['password']
270 else:
271
272
273 self._username = self.generateRandomString(12)
274 self._password = self.generateRandomString(12)
275 self._socketPath = self.generateSocketPath()
276
277 self._port = int(props['port'])
278 self._iptablesPort = int(props.get('iptables-port', self._port))
279 self._porterProtocol = props.get('protocol',
280 'flumotion.component.misc.porter.porter.HTTPPorterProtocol')
281 self._interface = props.get('interface', '')
282
284 if self._socketlistener:
285
286
287
288
289 self._socketlistener.stopListening()
290 self._socketlistener = None
291
292 try:
293 os.unlink(self._socketPath)
294 except:
295 pass
296
297 return component.BaseComponent.do_stop(self)
298
300
301
302 realm = PorterRealm(self)
303 checker = checkers.FlexibleCredentialsChecker()
304 checker.addUser(self._username, self._password)
305
306 p = portal.Portal(realm, [checker])
307 serverfactory = pb.PBServerFactory(p)
308
309 try:
310
311
312
313 try:
314 os.unlink(self._socketPath)
315 except:
316 pass
317
318 self._socketlistener = reactor.listenWith(
319 fdserver.FDPort, self._socketPath, serverfactory)
320 self.debug("Now listening on socketPath %s" % self._socketPath)
321 except error.CannotListenError, e:
322 self.warning("Failed to create socket %s" % self._socketPath)
323 m = messages.Error(T_(N_(
324 "Network error: socket path %s is not available."),
325 self._socketPath))
326 self.addMessage(m)
327 self.setMood(moods.sad)
328 return defer.fail(e)
329
330
331
332 try:
333 proto = reflect.namedAny(self._porterProtocol)
334 self.debug("Created proto %r" % proto)
335 except:
336 self.warning("Failed to import protocol '%s', defaulting to HTTP" %
337 self._porterProtocol)
338 proto = HTTPPorterProtocol
339
340
341
342 factory = PorterProtocolFactory(self, proto)
343 try:
344 reactor.listenWith(
345 fdserver.PassableServerPort, self._port, factory,
346 interface=self._interface)
347 self.debug("Now listening on port %d" % self._port)
348 except error.CannotListenError, e:
349 self.warning("Failed to listen on port %d" % self._port)
350 m = messages.Error(T_(N_(
351 "Network error: TCP port %d is not available."), self._port))
352 self.addMessage(m)
353 self.setMood(moods.sad)
354 return defer.fail(e)
355
356 return component.BaseComponent.do_start(self, *args, **kwargs)
357
360 self._porter = porter
361 self.protocol = protocol
362
364 p = self.protocol(self._porter)
365 p.factory = self
366 return p
367
369 """
370 The base porter is capable of accepting HTTP-like protocols (including
371 RTSP) - it reads the first line of a request, and makes the decision
372 solely on that.
373
374 We can't guarantee that we read precisely a line, so the buffer we
375 accumulate will actually be larger than what we actually parse.
376
377 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line
378 @cvar delimiters: a list of valid line delimiters I check for
379 """
380
381 MAX_SIZE = 4096
382
383
384
385
386
387 delimiters = ['\r\n', '\n', '\r']
388
390 self._buffer = ''
391 self._porter = porter
392
394 self._buffer = self._buffer + data
395 self.log("Got data, buffer now \"%s\"" % self._buffer)
396
397
398 for delim in self.delimiters:
399 try:
400 line, remaining = self._buffer.split(delim, 1)
401 break
402 except ValueError:
403 self.log("No line break found yet")
404 pass
405 else:
406
407 self.log("No valid delimiter found")
408 if len(self._buffer) > self.MAX_SIZE:
409 self.log("Dropping connection!")
410 return self.transport.loseConnection()
411 else:
412
413 return
414
415
416
417 identifier = self.parseLine(line)
418
419 if not identifier:
420 self.log("Couldn't find identifier in first line")
421 return self.transport.loseConnection()
422
423
424
425 destinationAvatar = self._porter.findDestination(identifier)
426
427 if not destinationAvatar or not destinationAvatar.isAttached():
428 if destinationAvatar:
429 self.log("There was an avatar, but it logged out?")
430 self.log("No destination avatar found for \"%s\"" % identifier)
431 self.writeNotFoundResponse()
432 return self.transport.loseConnection()
433
434
435
436
437
438
439
440
441
442 self.debug("Attempting to send FD: %d" % self.transport.fileno())
443 destinationAvatar.mind.broker.transport.sendFileDescriptor(
444 self.transport.fileno(), self._buffer)
445
446
447
448
449
450 self.transport.keepSocketAlive = True
451 self.transport.loseConnection()
452
454 """
455 Parse the initial line of the response. Return a string usable for
456 uniquely identifying the stream being requested, or None if the request
457 is unreadable.
458
459 Subclasses should override this.
460 """
461 raise NotImplementedError
462
464 """
465 Write a response indicating that the requested resource was not found
466 in this protocol.
467
468 Subclasses should override this to use the correct protocol.
469 """
470 raise NotImplementedError
471
473 scheme = 'http'
474 protos = ["HTTP/1.0", "HTTP/1.1"]
475
494
496 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
497
499 scheme = 'rtsp'
500 protos = ["RTSP/1.0"]
501
503 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
504