1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import socket
24 import time
25 import errno
26 import string
27 import resource
28
29 import gst
30
31 try:
32 from twisted.web import http
33 except ImportError:
34 from twisted.protocols import http
35
36 from twisted.web import server, resource as web_resource
37 from twisted.internet import reactor, defer
38 from twisted.python import reflect
39
40 from flumotion.configure import configure
41 from flumotion.common import errors
42
43 from flumotion.common import common, log, keycards
44
45 from flumotion.component.base import http as httpbase
46
47 __all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer']
48
49 HTTP_NAME = 'FlumotionHTTPServer'
50 HTTP_VERSION = configure.version
51
52 ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN">
53 <html>
54 <head>
55 <title>%(code)d %(error)s</title>
56 </head>
57 <body>
58 <h2>%(code)d %(error)s</h2>
59 </body>
60 </html>
61 """
62
63 HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION)
64
65
68
69 __reserve_fds__ = 50
70
71 logCategory = 'httpstreamer'
72
73
74
75
76 isLeaf = True
77
96
98
99
100 if fd in self._requests:
101 request = self._requests[fd]
102 self._removeClient(request, fd, stats)
103 else:
104 self.warning('[fd %5d] not found in _requests' % fd)
105
107 self.putChild(path, self)
108
110 self.logfilter = logfilter
111
113 """
114 Close the logfile, then reopen using the previous logfilename
115 """
116 for logger in self.loggers:
117 self.debug('rotating logger %r' % logger)
118 logger.rotate()
119
120 - def logWrite(self, fd, ip, request, stats):
121
122 headers = request.getAllHeaders()
123
124 if stats:
125 bytes_sent = stats[0]
126 time_connected = int(stats[3] / gst.SECOND)
127 else:
128 bytes_sent = -1
129 time_connected = -1
130
131 args = {'ip': ip,
132 'time': time.gmtime(),
133 'method': request.method,
134 'uri': request.uri,
135 'username': '-',
136 'get-parameters': request.args,
137 'clientproto': request.clientproto,
138 'response': request.code,
139 'bytes-sent': bytes_sent,
140 'referer': headers.get('referer', None),
141 'user-agent': headers.get('user-agent', None),
142 'time-connected': time_connected}
143
144 for logger in self.loggers:
145 logger.event('http_session_completed', args)
146
148 self.info('setting maxclients to %d' % limit)
149 self.maxclients = self.getMaxAllowedClients(limit)
150
151 self.info('set maxclients to %d' % self.maxclients)
152
153
154 """
155 Write out the HTTP headers for the incoming HTTP request.
156
157 @rtype: boolean
158 @returns: whether or not the file descriptor can be used further.
159 """
161 fd = request.transport.fileno()
162 fdi = request.fdIncoming
163
164
165 if fd == -1:
166 self.info('[fd %5d] Client gone before writing header' % fdi)
167
168 return False
169 if fd != request.fdIncoming:
170 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd))
171
172 return False
173
174 headers = []
175
176 def setHeader(field, name):
177 headers.append('%s: %s\r\n' % (field, name))
178
179
180 content = self.streamer.get_content_type()
181 setHeader('Server', HTTP_SERVER)
182 setHeader('Date', http.datetimeToString())
183 setHeader('Cache-Control', 'no-cache')
184 setHeader('Cache-Control', 'private')
185 setHeader('Content-type', content)
186
187
188
189
190
191
192
193
194
195
196
197
198
199 try:
200
201
202
203
204 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers))
205
206 request.startedWriting = True
207 return True
208 except OSError, (no, s):
209 if no == errno.EBADF:
210 self.info('[fd %5d] client gone before writing header' % fd)
211 elif no == errno.ECONNRESET:
212 self.info('[fd %5d] client reset connection writing header' % fd)
213 else:
214 self.info('[fd %5d] unhandled write error when writing header: %s' % (fd, s))
215
216 del request
217 return False
218
220 if self.streamer.caps == None:
221 self.debug('We have no caps yet')
222 return False
223
224 return True
225
227 """
228 maximum number of allowed clients based on soft limit for number of
229 open file descriptors and fd reservation. Increases soft limit to
230 hard limit if possible.
231 """
232 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE)
233 import sys
234 version = sys.version_info
235
236 if maxclients != -1:
237 neededfds = maxclients + self.__reserve_fds__
238
239
240 if version[:3] == (2,4,3) and not hasattr(socket,"has_2_4_3_patch"):
241 hardmax = 1024
242
243 if neededfds > softmax:
244 lim = min(neededfds, hardmax)
245 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax))
246 return lim - self.__reserve_fds__
247 else:
248 return maxclients
249 else:
250 return softmax - self.__reserve_fds__
251
253 return len(self._requests) >= self.maxclients and self.maxclients >= 0
254
256 """
257 Add a request, so it can be used for statistics.
258
259 @param request: the request
260 @type request: twisted.protocol.http.Request
261 """
262
263 fd = request.transport.fileno()
264 self._requests[fd] = request
265
267 """
268 Returns whether we want to log a request from this IP; allows us to
269 filter requests from automated monitoring systems.
270 """
271 if self.logfilter:
272 return not self.logfilter.isInRange(ip)
273 else:
274 return True
275
277 """
278 Removes a request and add logging.
279 Note that it does not disconnect the client; it is called in reaction
280 to a client disconnecting.
281 It also removes the keycard if one was created.
282
283 @param request: the request
284 @type request: L{twisted.protocols.http.Request}
285 @param fd: the file descriptor for the client being removed
286 @type fd: L{int}
287 @param stats: the statistics for the removed client
288 @type stats: GValueArray
289 """
290
291 ip = request.getClientIP()
292 if self._logRequestFromIP(ip):
293 self.logWrite(fd, ip, request, stats)
294 self.info('[fd %5d] Client from %s disconnected' % (fd, ip))
295
296
297
298
299 self.cleanupAuth(fd)
300
301 self.debug('[fd %5d] closing transport %r' % (fd, request.transport))
302
303
304
305 del self._requests[fd]
306 request.transport.loseConnection()
307
308 self.debug('[fd %5d] closed transport %r' % (fd, request.transport))
309
312
324
325
326
327
350
353
356
358 self.debug('Not sending data, it\'s not ready')
359 return server.NOT_DONE_YET
360
373
375
376 fdi = request.fdIncoming
377 if not self._writeHeaders(request):
378 self.debug("[fd %5d] not adding as a client" % fdi)
379 return
380 self._addClient(request)
381
382
383
384
385
386
387
388
389 fd = fdi
390 self.debug("taking away [fd %5d] from Twisted" % fd)
391 reactor.removeReader(request.transport)
392
393
394
395
396 import fcntl
397 try:
398 fcntl.fcntl(fd, fcntl.F_GETFL)
399 except IOError, e:
400 if e.errno == errno.EBADF:
401 self.warning("[fd %5d] is not actually open, ignoring" % fd)
402 else:
403 self.warning("[fd %5d] error during check: %s (%d)" % (
404 fd, e.strerror, e.errno))
405 return
406
407
408 self.streamer.add_client(fd)
409 ip = request.getClientIP()
410
411 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
412
413 render_GET = _render
414 render_HEAD = _render
415
416 -class HTTPRoot(web_resource.Resource, log.Loggable):
430