1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import os
22 import time
23 import string
24
25 from flumotion.component import component
26 from flumotion.common import log, messages, errors, netutils, interfaces
27 from flumotion.component.component import moods
28 from flumotion.component.misc.porter import porterclient
29 from flumotion.component.base import http as httpbase
30 from twisted.web import resource, static, server, http
31 from twisted.web import error as weberror
32 from twisted.internet import defer, reactor, error
33 from flumotion.twisted import fdserver
34 from flumotion.twisted.compat import implements
35 from twisted.cred import credentials
36
37 from flumotion.component.misc.httpfile import file
38
39 from flumotion.common.messages import N_
40 T_ = messages.gettexter('flumotion')
41
43
45 server.Request.__init__(self, channel, queued)
46
47 self._component = channel.factory.component
48 self._completed = False
49 self._transfer = None
50
51 self._bytes_written = 0
52 self._start_time = time.time()
53 self._lastTimeWritten = self._start_time
54
55 self._component.requestStarted(self)
56
62
69
73
75 if not self._completed:
76 self._component.requestFinished(self, self._bytes_written,
77 time.time() - self._start_time)
78 self._completed = True
79
80 -class Site(server.Site):
87
94
96 """
97 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
98 """
99 return self.callRemote('authenticate', bouncerName, keycard)
100
102 """
103 @rtype: L{twisted.internet.defer.Deferred}
104 """
105 return self.callRemote('removeKeycardId', bouncerName, keycardId)
106
109
112
115
118
119 -class HTTPFileStreamer(component.BaseComponent, httpbase.HTTPAuthentication,
120 log.Loggable):
121 implements(interfaces.IStreamingComponent)
122
123 componentMediumClass = HTTPFileMedium
124
125 REQUEST_TIMEOUT = 30
126
127
131
133 self.mountPoint = None
134 self.type = None
135 self.port = None
136 self.hostname = None
137 self.loggers = []
138 self.logfilter = None
139
140 self.description = 'On-Demand Flumotion Stream',
141
142 self._singleFile = False
143 self._connected_clients = []
144 self._total_bytes_written = 0
145
146 self._pbclient = None
147
148
149 self.uiState.addKey("connected-clients", 0)
150 self.uiState.addKey("bytes-transferred", 0)
151
154
156 props = self.config['properties']
157
158 mountPoint = props.get('mount-point', '')
159 if not mountPoint.startswith('/'):
160 mountPoint = '/' + mountPoint
161 self.mountPoint = mountPoint
162 self.hostname = props.get('hostname', None)
163 if not self.hostname:
164 self.hostname = netutils.guess_public_hostname()
165
166 self.filePath = props.get('path')
167 self.type = props.get('type', 'master')
168 self.port = props.get('port', 8801)
169 if self.type == 'slave':
170
171 self._porterPath = props['porter-socket-path']
172 self._porterUsername = props['porter-username']
173 self._porterPassword = props['porter-password']
174 self.loggers = \
175 self.plugs['flumotion.component.plugs.loggers.Logger']
176
177 if 'bouncer' in props:
178 self.setBouncerName(props['bouncer'])
179 if 'issuer-class' in props:
180 self.setIssuerClass(props['issuer-class'])
181 if 'ip-filter' in props:
182 filter = http.LogFilter()
183 for f in props['ip-filter']:
184 filter.addIPFilter(f)
185 self.logfilter = filter
186
192
194 """
195 Provide a new set of porter login information, for when we're in slave
196 mode and the porter changes.
197 If we're currently connected, this won't disconnect - it'll just change
198 the information so that next time we try and connect we'll use the
199 new ones
200 """
201 if self.type == 'slave':
202 self._porterUsername = username
203 self._porterPassword = password
204
205 creds = credentials.UsernamePassword(self._porterUsername,
206 self._porterPassword)
207 self._pbclient.startLogin(creds, self.medium)
208
209
210 if path != self._porterPath:
211 self._porterPath = path
212 self._pbclient.stopTrying()
213
214 self._pbclient.resetDelay()
215 reactor.connectWith(
216 fdserver.FDConnector, self._porterPath,
217 self._pbclient, 10, checkPID=False)
218 else:
219 raise errors.WrongStateError(
220 "Can't specify porter details in master mode")
221
223
224 root = resource.Resource()
225
226 mount = self.mountPoint[1:]
227
228 children = string.split(mount, '/')
229 current_resource = root
230 for child in children[:-1]:
231 res = resource.Resource()
232 current_resource.putChild(child, res)
233 current_resource = res
234 fileResource = file.File(self.filePath, self)
235 self.debug("Putting File resource at %r", children[-1:][0])
236 current_resource.putChild(children[-1:][0], fileResource)
237
238 reactor.callLater(self.REQUEST_TIMEOUT, self._timeoutRequests)
239
240 d = defer.Deferred()
241 if self.type == 'slave':
242
243 if self._singleFile:
244 self._pbclient = porterclient.HTTPPorterClientFactory(
245 Site(root, self), [self.mountPoint], d)
246 else:
247 self._pbclient = porterclient.HTTPPorterClientFactory(
248 Site(root, self), [], d,
249 prefixes=[self.mountPoint])
250 creds = credentials.UsernamePassword(self._porterUsername,
251 self._porterPassword)
252 self._pbclient.startLogin(creds, self.medium)
253 self.debug("Starting porter login!")
254
255 reactor.connectWith(fdserver.FDConnector, self._porterPath,
256 self._pbclient, 10, checkPID=False)
257 else:
258
259 try:
260 self.debug('Listening on %s' % self.port)
261 iface = ""
262 reactor.listenTCP(self.port, Site(root, self),
263 interface=iface)
264 except error.CannotListenError:
265 t = 'Port %d is not available.' % self.port
266 self.warning(t)
267 m = messages.Error(T_(N_(
268 "Network error: TCP port %d is not available."), self.port))
269 self.addMessage(m)
270 self.setMood(moods.sad)
271 return defer.fail(errors.ComponentStartHandledError(t))
272
273 d.callback(None)
274
275 def setComponentHappy(result):
276 self.setMood(moods.happy)
277 return result
278 d.addCallback(setComponentHappy)
279 return d
280
282 props = self.config['properties']
283 self.fixRenamedProperties(props, [
284 ('issuer', 'issuer-class'),
285 ('porter_socket_path', 'porter-socket-path'),
286 ('porter_username', 'porter-username'),
287 ('porter_password', 'porter-password'),
288 ('mount_point', 'mount-point')
289 ])
290
291 if props.get('type', 'master') == 'slave':
292 for k in 'socket-path', 'username', 'password':
293 if not 'porter-' + k in props:
294 msg = 'slave mode, missing required property porter-%s' % k
295 return defer.fail(errors.ConfigError(msg))
296 else:
297 if not 'port' in props:
298 msg = "master mode, missing required property 'port'"
299 return defer.fail(errors.ConfigError(msg))
300
301 if props.get('mount-point', None) is not None:
302 if props['mount-point'] == '/':
303 return defer.fail(errors.ConfigError(
304 "A mount-point of / is not supported in this release"))
305
306 path = props.get('path', None)
307 if path is None:
308 msg = "missing required property 'path'"
309 return defer.fail(errors.ConfigError(msg))
310 if os.path.isfile(path):
311 self._singleFile = True
312 elif os.path.isdir(path):
313 self._singleFile = False
314 else:
315 msg = "the file or directory specified in 'path': %s does " \
316 "not exist or is neither a file nor directory" % path
317 return defer.fail(errors.ConfigError(msg))
318
332
334 self._connected_clients.append(request)
335 self.uiState.set("connected-clients", self._connected_clients)
336
338 headers = request.getAllHeaders()
339
340 ip = request.getClientIP()
341 if not self.logfilter or not self.logfilter.isInRange(ip):
342 args = {'ip': ip,
343 'time': time.gmtime(),
344 'method': request.method,
345 'uri': request.uri,
346 'username': '-',
347 'get-parameters': request.args,
348 'clientproto': request.clientproto,
349 'response': request.code,
350 'bytes-sent': bytesWritten,
351 'referer': headers.get('referer', None),
352 'user-agent': headers.get('user-agent', None),
353 'time-connected': timeConnected}
354
355 for logger in self.loggers:
356 logger.event('http_session_completed', args)
357
358 self._connected_clients.remove(request)
359
360 self.uiState.set("connected-clients", len(self._connected_clients))
361
362 self._total_bytes_written += bytesWritten
363 self.uiState.set("bytes-transferred", self._total_bytes_written)
364
366 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
367
369 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider'
370 if self.plugs[socket]:
371 plug = self.plugs[socket][-1]
372 return plug.getStreamData()
373 else:
374 return {
375 'protocol': 'HTTP',
376 'description': self.description,
377 'url' : self.getUrl()
378 }
379
381 """
382 Return a tuple (deltaadded, deltaremoved, bytes_transferred,
383 current_clients, current_load) of our current bandwidth and user values.
384 The deltas and current_load are NOT currently implemented here, we set
385 them as zero.
386 """
387 bytesTransferred = self._total_bytes_written
388 for request in self._connected_clients:
389 if request._transfer:
390 bytesTransferred += request._transfer.bytesSent
391
392 return (0, 0, bytesTransferred, len(self._connected_clients), 0)
393
394
397
400
402
403 self.warning ("Expiring clients is not implemented for static "
404 "fileserving")
405
407 """
408 Close the logfile, then reopen using the previous logfilename
409 """
410 for logger in self.loggers:
411 self.debug('rotating logger %r' % logger)
412 logger.rotate()
413