1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import os
19 import random
20 import socket
21 import string
22 import time
23 from urllib2 import urlparse
24
25 from twisted.cred import portal
26 from twisted.internet import protocol, reactor, error, defer
27 from twisted.spread import pb
28 from zope.interface import implements
29
30 from flumotion.common import medium, log, messages, errors
31 from flumotion.common.i18n import N_, gettexter
32 from flumotion.component import component
33 from flumotion.component.component import moods
34 from flumotion.twisted import fdserver, checkers
35 from flumotion.twisted import reflect
36
37 __version__ = "$Rev$"
38 T_ = gettexter()
39
40
42 """
43 An Avatar in the porter representing a streamer
44 """
45
46 - def __init__(self, avatarId, porter, mind):
53
55 return self.mind != None
56
58 self.debug("porter client %s logging out", self.avatarId)
59 self.mind = None
60
64
68
72
76
78 return self.porter._iptablesPort
79
80
82 """
83 A Realm within the Porter that creates Avatars for streamers logging into
84 the porter.
85 """
86 implements(portal.IRealm)
87
89 """
90 @param porter: The porter that avatars created from here should use.
91 @type porter: L{Porter}
92 """
93 self.porter = porter
94
103
104
106
108 """
109 Return the location, login username/password, and listening port
110 and interface for the porter as a tuple (path, username,
111 password, port, interface, external-interface).
112 """
113 return (self.comp._socketPath, self.comp._username,
114 self.comp._password, self.comp._iptablesPort,
115 self.comp._interface, self.comp._external_interface)
116
117
118 -class Porter(component.BaseComponent, log.Loggable):
119 """
120 The porter optionally sits in front of a set of streamer components.
121 The porter is what actually deals with incoming connections on a socket.
122 It decides which streamer to direct the connection to, then passes the FD
123 (along with some amount of already-read data) to the appropriate streamer.
124 """
125
126 componentMediumClass = PorterMedium
127
129
130
131 self._mappings = {}
132 self._prefixes = {}
133
134 self._socketlistener = None
135
136 self._socketPath = None
137 self._username = None
138 self._password = None
139 self._port = None
140 self._iptablesPort = None
141 self._porterProtocol = None
142
143 self._interface = ''
144 self._external_interface = ''
145
147 """
148 Register a path as being served by a streamer represented by this
149 avatar. Will remove any previous registration at this path.
150
151 @param path: The path to register
152 @type path: str
153 @param avatar: The avatar representing the streamer to direct this path
154 to
155 @type avatar: L{PorterAvatar}
156 """
157 self.debug("Registering porter path \"%s\" to %r" % (path, avatar))
158 if path in self._mappings:
159 self.warning("Replacing existing mapping for path \"%s\"" % path)
160
161 self._mappings[path] = avatar
162
164 """
165 Attempt to deregister the given path. A deregistration will only be
166 accepted if the mapping is to the avatar passed.
167
168 @param path: The path to deregister
169 @type path: str
170 @param avatar: The avatar representing the streamer being deregistered
171 @type avatar: L{PorterAvatar}
172 """
173 if path in self._mappings:
174 if self._mappings[path] == avatar:
175 self.debug("Removing porter mapping for \"%s\"" % path)
176 del self._mappings[path]
177 else:
178 self.warning(
179 "Mapping not removed: refers to a different avatar")
180 else:
181 self.warning("Mapping not removed: no mapping found")
182
184 """
185 Register a destination for all requests directed to anything beginning
186 with a specified prefix. Where there are multiple matching prefixes,
187 the longest is selected.
188
189 @param avatar: The avatar being registered
190 @type avatar: L{PorterAvatar}
191 """
192
193 self.debug("Setting prefix \"%s\" for porter", prefix)
194 if prefix in self._prefixes:
195 self.warning("Overwriting prefix")
196
197 self._prefixes[prefix] = avatar
198
200 """
201 Attempt to deregister a default destination for all requests not
202 directed to a specifically-mapped path. This will only succeed if the
203 default is currently equal to this avatar.
204
205 @param avatar: The avatar being deregistered
206 @type avatar: L{PorterAvatar}
207 """
208 if prefix not in self._prefixes:
209 self.warning("Mapping not removed: no mapping found")
210 return
211
212 if self._prefixes[prefix] == avatar:
213 self.debug("Removing prefix destination from porter")
214 del self._prefixes[prefix]
215 else:
216 self.warning(
217 "Not removing prefix destination: expected avatar not found")
218
220 found = None
221
222 for prefix in self._prefixes.keys():
223 self.log("Checking: %r, %r" % (prefix, path))
224 if (path.startswith(prefix) and
225 (not found or len(found) < len(prefix))):
226 found = prefix
227 if found:
228 return self._prefixes[found]
229 else:
230 return None
231
233 """
234 Find a destination Avatar for this path.
235 @returns: The Avatar for this mapping, or None.
236 """
237
238 if path in self._mappings:
239 return self._mappings[path]
240 else:
241 return self.findPrefixMatch(path)
242
244 """
245 Generate a socket pathname in an appropriate location
246 """
247
248
249 import tempfile
250 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.')
251 os.close(fd)
252
253 return name
254
256 """
257 Generate a random US-ASCII string of length numchars
258 """
259 return ''.join(random.choice(string.ascii_letters)
260 for x in range(numchars))
261
263 props = self.config['properties']
264
265 self.fixRenamedProperties(props,
266 [('socket_path', 'socket-path')])
267
268
269
270
271
272 if 'socket-path' in props:
273
274 self._socketPath = props['socket-path']
275 self._username = props['username']
276 self._password = props['password']
277 else:
278
279
280 self._username = self.generateRandomString(12)
281 self._password = self.generateRandomString(12)
282 self._socketPath = self.generateSocketPath()
283
284 self._requirePassword = props.get('require-password', True)
285 self._socketMode = props.get('socket-mode', 0666)
286 self._port = int(props['port'])
287 self._iptablesPort = int(props.get('iptables-port', self._port))
288 self._porterProtocol = props.get('protocol',
289 'flumotion.component.misc.porter.porter.HTTPPorterProtocol')
290 self._interface = props.get('interface', '')
291
292
293 self._external_interface = props.get('external-interface',
294 self._interface)
295
297 d = None
298 if self._socketlistener:
299
300
301 d = self._socketlistener.stopListening()
302 self._socketlistener = None
303 return d
304
306
307 self.have_properties()
308 realm = PorterRealm(self)
309 checker = checkers.FlexibleCredentialsChecker()
310 checker.addUser(self._username, self._password)
311 if not self._requirePassword:
312 checker.allowPasswordless(True)
313
314 p = portal.Portal(realm, [checker])
315 serverfactory = pb.PBServerFactory(p)
316
317 try:
318
319
320
321 try:
322 os.unlink(self._socketPath)
323 except OSError:
324 pass
325
326
327
328
329
330
331 self._socketlistener = fdserver.FDPort(self._socketPath,
332 serverfactory, reactor=reactor, mode=self._socketMode)
333 self._socketlistener.startListening()
334
335 self.info("Now listening on socketPath %s", self._socketPath)
336 except error.CannotListenError:
337 self.warning("Failed to create socket %s" % self._socketPath)
338 m = messages.Error(T_(N_(
339 "Network error: socket path %s is not available."),
340 self._socketPath))
341 self.addMessage(m)
342 self.setMood(moods.sad)
343 return defer.fail(errors.ComponentSetupHandledError())
344
345
346
347 try:
348 proto = reflect.namedAny(self._porterProtocol)
349 self.debug("Created proto %r" % proto)
350 except (ImportError, AttributeError):
351 self.warning("Failed to import protocol '%s', defaulting to HTTP" %
352 self._porterProtocol)
353 proto = HTTPPorterProtocol
354
355
356
357 factory = PorterProtocolFactory(self, proto)
358 try:
359 p = fdserver.PassableServerPort(self._port, factory,
360 interface=self._interface, reactor=reactor)
361 p.startListening()
362 self.info("Now listening on interface %r on port %d",
363 self._interface, self._port)
364 except error.CannotListenError:
365 self.warning("Failed to listen on interface %r on port %d",
366 self._interface, self._port)
367 m = messages.Error(T_(N_(
368 "Network error: TCP port %d is not available."), self._port))
369 self.addMessage(m)
370 self.setMood(moods.sad)
371 return defer.fail(errors.ComponentSetupHandledError())
372
373
384
385
387 """
388 The base porter is capable of accepting HTTP-like protocols (including
389 RTSP) - it reads the first line of a request, and makes the decision
390 solely on that.
391
392 We can't guarantee that we read precisely a line, so the buffer we
393 accumulate will actually be larger than what we actually parse.
394
395 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line
396 @cvar delimiters: a list of valid line delimiters I check for
397 """
398
399 logCategory = 'porterprotocol'
400
401
402 MAX_SIZE = 4096
403
404
405
406 PORTER_CLIENT_TIMEOUT = 30
407
408
409
410
411
412 delimiters = ['\r\n', '\n', '\r']
413
421
430
436
438 if self._timeoutDC:
439 self._timeoutDC.cancel()
440 self._timeoutDC = None
441
443 self._buffer = self._buffer + data
444 self.log("Got data, buffer now \"%s\"" % self._buffer)
445
446
447 for delim in self.delimiters:
448 try:
449 line, remaining = self._buffer.split(delim, 1)
450 break
451 except ValueError:
452
453 pass
454 else:
455
456 self.log("No valid delimiter found")
457 if len(self._buffer) > self.MAX_SIZE:
458
459
460 self.debug("[fd %5d] (ts %f) (request-id %r) dropping, "
461 "buffer exceeded",
462 self.transport.fileno(), time.time(),
463 self.requestId)
464
465 return self.transport.loseConnection()
466 else:
467
468
469 return
470
471
472
473 parsed = self.parseLine(line)
474 if not parsed:
475 self.log("Couldn't parse the first line")
476 return self.transport.loseConnection()
477
478 identifier = self.extractIdentifier(parsed)
479 if not identifier:
480 self.log("Couldn't find identifier in first line")
481 return self.transport.loseConnection()
482
483 if self.requestId:
484 self.log("Injecting request-id %r", self.requestId)
485 parsed = self.injectRequestId(parsed, self.requestId)
486
487
488
489
490 self._buffer = delim.join((self.unparseLine(parsed), remaining))
491
492
493 self.debug("[fd %5d] (ts %f) (request-id %r) identifier %s",
494 self.transport.fileno(), time.time(), self.requestId,
495 identifier)
496
497
498
499 destinationAvatar = self._porter.findDestination(identifier)
500
501 if not destinationAvatar or not destinationAvatar.isAttached():
502 if destinationAvatar:
503 self.debug("There was an avatar, but it logged out?")
504
505
506 self.debug(
507 "[fd %5d] (ts %f) (request-id %r) no destination avatar found",
508 self.transport.fileno(), time.time(), self.requestId)
509
510 self.writeNotFoundResponse()
511 return self.transport.loseConnection()
512
513
514
515
516
517
518
519
520 self.debug("[fd %5d] (ts %f) (request-id %r) send fd to avatarId %s",
521 self.transport.fileno(), time.time(), self.requestId,
522 destinationAvatar.avatarId)
523
524
525
526 try:
527 destinationAvatar.mind.broker.transport.sendFileDescriptor(
528 self.transport.fileno(), self._buffer)
529 except OSError, e:
530 self.warning("[fd %5d] failed to send FD: %s",
531 self.transport.fileno(), log.getExceptionMessage(e))
532 self.writeServiceUnavailableResponse()
533 return self.transport.loseConnection()
534
535
536 self.debug("[fd %5d] (ts %f) (request-id %r) sent fd to avatarId %s",
537 self.transport.fileno(), time.time(), self.requestId,
538 destinationAvatar.avatarId)
539
540
541
542
543
544 self.transport.keepSocketAlive = True
545 self.transport.loseConnection()
546
548 """
549 Parse the initial line of the request. Return an object that can be
550 used to uniquely identify the stream being requested by passing it to
551 extractIdentifier, or None if the request is unreadable.
552
553 Subclasses should override this.
554 """
555 raise NotImplementedError
556
558 """
559 Recreate the initial request line from the parsed representation. The
560 recreated line does not need to be exactly identical, but both
561 parsedLine(unparseLine(line)) and line should contain the same
562 information (i.e. unparseLine should not lose information).
563
564 UnparseLine has to return a valid line from the porter protocol's
565 scheme point of view (for instance, HTTP).
566
567 Subclasses should override this.
568 """
569 raise NotImplementedError
570
572 """
573 Extract a string that uniquely identifies the requested stream from the
574 parsed representation of the first request line.
575
576 Subclasses should override this, depending on how they implemented
577 parseLine.
578 """
579 raise NotImplementedError
580
582 """
583 Return a string that will uniquely identify the request.
584
585 Subclasses should override this if they want to use request-ids and
586 also implement injectRequestId.
587 """
588 raise NotImplementedError
589
591 """
592 Take the parsed representation of the first request line and a string
593 token, return a parsed representation of the request line with the
594 request-id possibly mixed into it.
595
596 Subclasses should override this if they generate request-ids.
597 """
598
599 return parsed
600
602 """
603 Write a response indicating that the requested resource was not found
604 in this protocol.
605
606 Subclasses should override this to use the correct protocol.
607 """
608 raise NotImplementedError
609
611 """
612 Write a response indicating that the requested resource was
613 temporarily uavailable in this protocol.
614
615 Subclasses should override this to use the correct protocol.
616 """
617 raise NotImplementedError
618
619
621 scheme = 'http'
622 protos = ["HTTP/1.0", "HTTP/1.1"]
623 requestIdParameter = 'FLUREQID'
624 requestIdBitsNo = 256
625
627 try:
628 (method, location, proto) = map(string.strip, line.split(' ', 2))
629
630 if proto not in self.protos:
631 return None
632
633
634 parsed_url = urlparse.urlparse(location)
635
636 return method, parsed_url, proto
637
638 except ValueError:
639 return None
640
642 method, parsed_url, proto = parsed
643 return ' '.join((method, urlparse.urlunparse(parsed_url), proto))
644
650
652 method, parsed_url, proto = parsed
653
654 sep = ''
655 if parsed_url[4] != '':
656 sep = '&'
657 query_string = ''.join((parsed_url[4],
658 sep, self.requestIdParameter, '=',
659 requestId))
660 parsed_url = (parsed_url[:4] +
661 (query_string, )
662 + parsed_url[5:])
663 return method, parsed_url, proto
664
666 method, parsed_url, proto = parsed
667
668 return parsed_url[2]
669
671 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
672
674 self.transport.write("HTTP/1.0 503 Service Unavailable\r\n\r\n"
675 "Service temporarily unavailable")
676
677
679 scheme = 'rtsp'
680 protos = ["RTSP/1.0"]
681
683 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
684
686 self.transport.write("RTSP/1.0 503 Service Unavailable\r\n\r\n"
687 "Service temporarily unavailable")
688