Trees | Indices | Help |
---|
|
1 # -*- Mode: Python -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 from flumotion.common import log 23 from flumotion.extern.fdpass import fdpass 24 25 from twisted.internet import unix, main, address, tcp 26 from twisted.spread import pb 27 28 import errno 29 import os 30 import socket 31 import struct 32 33 # Heavily based on 34 # http://twistedmatrix.com/trac/browser/sandbox/exarkun/copyover/server.py 35 # and client.py 36 # Thanks for the inspiration! 37 38 # Since we're doing this over a stream socket, our file descriptor messages 39 # aren't guaranteed to be received alone; they could arrive along with some 40 # unrelated data. 41 # So, we prefix the message with a 16 byte magic signature, and a length, 42 # and if we receive file descriptors decode based on this. 43 MAGIC_SIGNATURE = "\xfd\xfc\x8e\x7f\x07\x47\xb9\xea" \ 44 "\xa1\x75\xee\xd8\xdc\x36\xc8\xa3" 45 5052 transport = FDServer53 5557 if not self.connected: 58 return 59 try: 60 (fds, message) = fdpass.readfds(self.fileno(), 64 * 1024) 61 except socket.error, se: 62 if se.args[0] == errno.EWOULDBLOCK: 63 return 64 else: 65 return main.CONNECTION_LOST 66 else: 67 if not message: 68 return main.CONNECTION_DONE 69 70 if len(fds) > 0: 71 # Look for our magic cookie in (possibly) the midst of other 72 # data. Pass surrounding chunks, if any, onto dataReceived(), 73 # which (undocumentedly) must return None unless a failure 74 # occurred. 75 # Pass the actual FDs and their message to 76 # fileDescriptorsReceived() 77 offset = message.find(MAGIC_SIGNATURE) 78 if offset < 0: 79 # Old servers did not send this; be hopeful that this 80 # doesn't have bits of other protocol (i.e. PB) mixed up 81 # in it. 82 return self.protocol.fileDescriptorsReceived(fds, message) 83 elif offset > 0: 84 ret = self.protocol.dataReceived(message[0:offset]) 85 if ret: 86 return ret 87 88 msglen = struct.unpack("@I", message[offset+16:offset+20])[0] 89 offset += 20 90 ret = self.protocol.fileDescriptorsReceived(fds, 91 message[offset:offset+msglen]) 92 if ret: 93 return ret 94 95 if offset+msglen < len(message): 96 return self.protocol.dataReceived(message[offset+msglen:]) 97 return ret 98 else: 99 # self.debug("No FDs, passing to dataReceived") 100 return self.protocol.dataReceived(message)101 105107 """ 108 A pb.Broker subclass that handles FDs being passed to it (with associated 109 data) over the same connection as the normal PB data stream. 110 When an FD is seen, it creates new protocol objects for them from the 111 childFactory attribute. 112 """ 113 # FIXME: looks like we can only use our own subclasses that take 114 # three __init__ args152116 """ 117 @param connectionClass: a subclass of L{twisted.internet.tcp.Connection} 118 """ 119 pb.Broker.__init__(self, **kwargs) 120 121 self.childFactory = childFactory 122 self._connectionClass = connectionClass123 124 # This is the complex bit. If our underlying transport receives a file 125 # descriptor, this gets called - along with the data we got with the FD. 126 # We create an appropriate protocol object, and attach it to the reactor.128 if len(fds) == 1: 129 fd = fds[0] 130 131 # Note that we hardcode IPv4 here! 132 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 133 134 self.debug("Received FD %d->%d" % (fd, sock.fileno())) 135 136 # Undocumentedly (other than a comment in 137 # Python/Modules/socketmodule.c), socket.fromfd() calls dup() on 138 # the passed FD before it actually wraps it in a socket object. 139 # So, we need to close the FD that we originally had... 140 os.close(fd) 141 142 peeraddr = sock.getpeername() 143 144 # Based on bits in tcp.Port.doRead() 145 protocol = self.childFactory.buildProtocol( 146 address._ServerFactoryIPv4Address('TCP', 147 peeraddr[0], peeraddr[1])) 148 149 self._connectionClass(sock, protocol, message) 150 else: 151 self.warning("Unexpected: FD-passing message with len(fds) != 1")154 """ 155 A subclass of tcp.Server that permits passing the FDs used to other 156 processes (by just calling close(2) rather than shutdown(2) on them) 157 """ 158175160 tcp.Server.__init__(self, sock, protocol, client, server, sessionno) 161 self.keepSocketAlive = False162164 # We override this (from tcp._SocketCloser) so that we can close sockets 165 # properly in the normal case, but once we've passed our socket on via 166 # the FD-channel, we just close() it (not calling shutdown() which will 167 # close the TCP channel without closing the FD itself) 168 if self.keepSocketAlive: 169 try: 170 self.socket.close() 171 except socket.error: 172 pass 173 else: 174 tcp.Server._closeSocket(self)177 transport = PassableServerConnection178
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Fri Apr 11 07:40:37 2008 | http://epydoc.sourceforge.net |