1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import time
23
24 import gobject
25 import gst
26
27
28 import socket
29
30 from twisted.internet import reactor, error, defer
31 from twisted.web import server
32 from twisted.cred import credentials
33
34 from flumotion.component import feedcomponent
35 from flumotion.common import bundle, common, gstreamer, errors, pygobject
36 from flumotion.common import messages, netutils, log, interfaces
37
38 from flumotion.twisted import fdserver
39 from flumotion.twisted.compat import implements
40 from flumotion.component.misc.porter import porterclient
41
42
43 from flumotion.component.component import moods
44 from flumotion.common.pygobject import gsignal
45
46 from flumotion.component.consumers.httpstreamer import resources
47 from flumotion.component.base import http
48
49 from flumotion.common.messages import N_
50 T_ = messages.gettexter('flumotion')
51
52 __all__ = ['HTTPMedium', 'MultifdSinkStreamer']
53
54
57 self.sink = sink
58
59 self.no_clients = 0
60 self.clients_added_count = 0
61 self.clients_removed_count = 0
62 self.start_time = time.time()
63
64 self.peak_client_number = 0
65 self.peak_epoch = self.start_time
66 self.load_deltas = [0, 0]
67 self._load_deltas_period = 10
68 self._load_deltas_ongoing = [time.time(), 0, 0]
69
70
71 self.average_client_number = 0
72 self.average_time = self.start_time
73
74 self.hostname = "localhost"
75 self.port = 0
76 self.mountPoint = "/"
77
79
80 now = time.time()
81
82 dt1 = self.average_time - self.start_time
83 dc1 = self.average_client_number
84 dt2 = now - self.average_time
85 dc2 = self.no_clients
86 self.average_time = now
87 if dt1 == 0:
88
89 self.average_client_number = 0
90 else:
91 dt = dt1 + dt2
92 before = (dc1 * dt1) / dt
93 after = dc2 * dt2 / dt
94 self.average_client_number = before + after
95
97 self._updateAverage()
98
99 self.no_clients += 1
100 self.clients_added_count +=1
101
102
103 if self.no_clients >= self.peak_client_number:
104 self.peak_epoch = time.time()
105 self.peak_client_number = self.no_clients
106
108 self._updateAverage()
109 self.no_clients -= 1
110 self.clients_removed_count +=1
111
113 """
114 Periodically, update our statistics on load deltas, and update the
115 UIState with new values for total bytes, bitrate, etc.
116 """
117
118 oldtime, oldadd, oldremove = self._load_deltas_ongoing
119 add, remove = self.clients_added_count, self.clients_removed_count
120 now = time.time()
121 diff = float(now - oldtime)
122
123 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff]
124 self._load_deltas_ongoing = [now, add, remove]
125
126 self.update_ui_state()
127
128 self._updateCallLaterId = reactor.callLater(10, self._updateStats)
129
131 return self.sink.get_property('bytes-served')
132
134 return self.sink.get_property('bytes-to-serve')
135
137 return time.time() - self.start_time
138
140 return self.no_clients
141
143 return self.peak_client_number
144
146 return self.peak_epoch
147
149 return self.average_client_number
150
152 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
153
155 return self.load_deltas
156
158 c = self
159
160 bytes_sent = c.getBytesSent()
161 bytes_received = c.getBytesReceived()
162 uptime = c.getUptime()
163
164 set('stream-mime', c.get_mime())
165 set('stream-url', c.getUrl())
166 set('stream-uptime', common.formatTime(uptime))
167 bitspeed = bytes_received * 8 / uptime
168 set('stream-bitrate', common.formatStorage(bitspeed) + 'bit/s')
169 set('stream-totalbytes', common.formatStorage(bytes_received) + 'Byte')
170 set('stream-bitrate-raw', bitspeed)
171 set('stream-totalbytes-raw', bytes_received)
172
173 set('clients-current', str(c.getClients()))
174 set('clients-max', str(c.getMaxClients()))
175 set('clients-peak', str(c.getPeakClients()))
176 set('clients-peak-time', c.getPeakEpoch())
177 set('clients-average', str(int(c.getAverageClients())))
178
179 bitspeed = bytes_sent * 8 / uptime
180 set('consumption-bitrate', common.formatStorage(bitspeed) + 'bit/s')
181 set('consumption-totalbytes', common.formatStorage(bytes_sent) + 'Byte')
182 set('consumption-bitrate-raw', bitspeed)
183 set('consumption-totalbytes-raw', bytes_sent)
184
185 -class HTTPMedium(feedcomponent.FeedComponentMedium):
191
193 """
194 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
195 """
196 d = self.callRemote('authenticate', bouncerName, keycard)
197 d.addErrback(log.warningFailure)
198 return d
199
201 """
202 @rtype: L{twisted.internet.defer.Deferred}
203 """
204 return self.callRemote('removeKeycardId', bouncerName, keycardId)
205
206
209
212
215
218
221
224
225
227 implements(interfaces.IStreamingComponent)
228
229 checkOffset = True
230
231
232 logCategory = 'cons-http'
233
234 pipe_template = 'multifdsink name=sink ' + \
235 'sync=false ' + \
236 'recover-policy=3'
237 gsignal('client-removed', object, int, int, object)
238
239 componentMediumClass = HTTPMedium
240
242 reactor.debug = True
243 self.debug("HTTP streamer initialising")
244
245 self.caps = None
246 self.resource = None
247 self.mountPoint = None
248 self.burst_on_connect = False
249
250 self.description = None
251
252 self.type = None
253
254
255 self._pbclient = None
256 self._porterUsername = None
257 self._porterPassword = None
258 self._porterPath = None
259
260
261
262 self.port = None
263
264 self.iface = None
265
266 self._updateCallLaterId = None
267
268 self._pending_removals = {}
269
270 for i in ('stream-mime', 'stream-uptime', 'stream-bitrate',
271 'stream-totalbytes', 'clients-current', 'clients-max',
272 'clients-peak', 'clients-peak-time', 'clients-average',
273 'consumption-bitrate', 'consumption-totalbytes',
274 'stream-bitrate-raw', 'stream-totalbytes-raw',
275 'consumption-bitrate-raw', 'consumption-totalbytes-raw',
276 'stream-url'):
277 self.uiState.addKey(i, None)
278
281
284
286 props = self.config['properties']
287
288
289 self.fixRenamedProperties(props, [
290 ('issuer', 'issuer-class'),
291 ('mount_point', 'mount-point'),
292 ('porter_socket_path', 'porter-socket-path'),
293 ('porter_username', 'porter-username'),
294 ('porter_password', 'porter-password'),
295 ('user_limit', 'client-limit'),
296 ('bandwidth_limit', 'bandwidth-limit'),
297 ('burst_on_connect', 'burst-on-connect'),
298 ('burst_size', 'burst-size'),
299 ])
300
301 if props.get('type', 'master') == 'slave':
302 for k in 'socket-path', 'username', 'password':
303 if not 'porter-' + k in props:
304 msg = "slave mode, missing required property 'porter-%s'" % k
305 return defer.fail(errors.ConfigError(msg))
306
307 if 'burst-size' in props and 'burst-time' in props:
308 msg = 'both burst-size and burst-time set, cannot satisfy'
309 return defer.fail(errors.ConfigError(msg))
310
311
312 version = gstreamer.get_plugin_version('tcp')
313 if version < (0, 10, 9, 1):
314 m = messages.Error(T_(N_(
315 "Version %s of the '%s' GStreamer plug-in is too old.\n"),
316 ".".join(map(str, version)), 'multifdsink'))
317 m.add(T_(N_("Please upgrade '%s' to version %s."),
318 'gst-plugins-base', '0.10.10'))
319 self.addMessage(m)
320 self.setMood(moods.sad)
321
322 return defer.fail(
323 errors.ComponentSetupHandledError(
324 "multifdsink version not newer than 0.10.9.1"))
325
327 try:
328 sink.get_property('units-max')
329 return True
330 except TypeError:
331 return False
332
334 if self.burst_on_connect:
335 if self.burst_time and self.time_bursting_supported(sink):
336 self.debug("Configuring burst mode for %f second burst",
337 self.burst_time)
338
339
340 sink.set_property('sync-method', 4)
341 sink.set_property('burst-unit', 2)
342 sink.set_property('burst-value',
343 long(self.burst_time * gst.SECOND))
344
345
346
347
348 sink.set_property('time-min',
349 long((self.burst_time + 5) * gst.SECOND))
350
351 sink.set_property('unit-type', 2)
352 sink.set_property('units-soft-max',
353 long((self.burst_time + 8) * gst.SECOND))
354 sink.set_property('units-max',
355 long((self.burst_time + 10) * gst.SECOND))
356 elif self.burst_size:
357 self.debug("Configuring burst mode for %d kB burst",
358 self.burst_size)
359
360
361
362
363
364 sink.set_property('sync-method', 'burst-keyframe')
365 sink.set_property('burst-unit', 'bytes')
366 sink.set_property('burst-value', self.burst_size * 1024)
367
368
369
370
371 sink.set_property('bytes-min', (self.burst_size + 512) * 1024)
372
373
374
375
376
377
378
379 sink.set_property('buffers-soft-max',
380 (self.burst_size + 1024) / 4)
381 sink.set_property('buffers-max',
382 (self.burst_size + 2048) / 4)
383
384 else:
385
386 self.debug("simple burst-on-connect, setting sync-method 2")
387 sink.set_property('sync-method', 2)
388
389 sink.set_property('buffers-soft-max', 250)
390 sink.set_property('buffers-max', 500)
391 else:
392 self.debug("no burst-on-connect, setting sync-method 0")
393 sink.set_property('sync-method', 0)
394
395 sink.set_property('buffers-soft-max', 250)
396 sink.set_property('buffers-max', 500)
397
477
479 return '<MultifdSinkStreamer (%s)>' % self.name
480
482 return self.resource.maxclients
483
485 if self.caps:
486 return self.caps.get_structure(0).get_name()
487
489 mime = self.get_mime()
490 if mime == 'multipart/x-mixed-replace':
491 mime += ";boundary=ThisRandomString"
492 return mime
493
495 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
496
498 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider'
499 if self.plugs[socket]:
500 plug = self.plugs[socket][-1]
501 return plug.getStreamData()
502 else:
503 return {
504 'protocol': 'HTTP',
505 'description': self.description,
506 'url' : self.getUrl()
507 }
508
510 """
511 Return a tuple (deltaadded, deltaremoved, bytes_transferred,
512 current_clients, current_load) of our current bandwidth and user values.
513 The deltas are estimates of how much bitrate is added, removed
514 due to client connections, disconnections, per second.
515 """
516
517
518 deltaadded, deltaremoved = self.getLoadDeltas()
519
520 bytes_received = self.getBytesReceived()
521 uptime = self.getUptime()
522 bitrate = bytes_received * 8 / uptime
523
524 bytes_sent = self.getBytesSent()
525 clients_connected = self.getClients()
526 current_load = bitrate * clients_connected
527
528 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent,
529 clients_connected, current_load)
530
534
538
540 def set(k, v):
541 if self.uiState.get(k) != v:
542 self.uiState.set(k, v)
543
544
545 self.updateState(set)
546
551
553 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason)
554 if reason.value_name == 'GST_CLIENT_STATUS_ERROR':
555 self.warning('[fd %5d] Client removed because of write error' % fd)
556
557 self.emit('client-removed', sink, fd, reason, stats)
558 Stats.clientRemoved(self)
559 self.update_ui_state()
560
561
562
578
579
580
581
582
583
585 stats = sink.emit('get-stats', fd)
586 self._pending_removals[fd] = (stats, reason)
587
588
594
595
596
608
610 """
611 Provide a new set of porter login information, for when we're in slave
612 mode and the porter changes.
613 If we're currently connected, this won't disconnect - it'll just change
614 the information so that next time we try and connect we'll use the
615 new ones
616 """
617 if self.type == 'slave':
618 self._porterUsername = username
619 self._porterPassword = password
620
621 creds = credentials.UsernamePassword(self._porterUsername,
622 self._porterPassword)
623 self._pbclient.startLogin(creds, self.medium)
624
625
626 if path != self._porterPath:
627 self.debug("Changing porter login to use \"%s\"", path)
628 self._porterPath = path
629 self._pbclient.stopTrying()
630
631 self._pbclient.resetDelay()
632 reactor.connectWith(
633 fdserver.FDConnector, self._porterPath,
634 self._pbclient, 10, checkPID=False)
635 else:
636 raise errors.WrongStateError(
637 "Can't specify porter details in master mode")
638
640 root = resources.HTTPRoot()
641
642 mount = self.mountPoint[1:]
643 root.putChild(mount, self.resource)
644 if self.type == 'slave':
645
646
647
648
649
650
651
652
653
654
655
656
657
658 d1 = feedcomponent.ParseLaunchComponent.do_start(self,
659 *args, **kwargs)
660
661 d2 = defer.Deferred()
662 mountpoints = [self.mountPoint]
663 self._pbclient = porterclient.HTTPPorterClientFactory(
664 server.Site(resource=root), mountpoints, d2)
665
666 creds = credentials.UsernamePassword(self._porterUsername,
667 self._porterPassword)
668 self._pbclient.startLogin(creds, self.medium)
669
670 self.debug("Starting porter login at \"%s\"", self._porterPath)
671
672 reactor.connectWith(
673 fdserver.FDConnector, self._porterPath,
674 self._pbclient, 10, checkPID=False)
675
676 return defer.DeferredList([d1, d2])
677 else:
678
679 try:
680 self.debug('Listening on %d' % self.port)
681 iface = self.iface or ""
682 reactor.listenTCP(self.port, server.Site(resource=root),
683 interface=iface)
684 return feedcomponent.ParseLaunchComponent.do_start(self, *args,
685 **kwargs)
686 except error.CannotListenError:
687 t = 'Port %d is not available.' % self.port
688 self.warning(t)
689 m = messages.Error(T_(N_(
690 "Network error: TCP port %d is not available."), self.port))
691 self.addMessage(m)
692 self.setMood(moods.sad)
693 return defer.fail(errors.ComponentStartHandledError(t))
694
695 pygobject.type_register(MultifdSinkStreamer)
696