1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import gst
23 import gobject
24
25 import os
26 import time
27
28 from twisted.internet import reactor, defer
29
30 from flumotion.component import component as basecomponent
31 from flumotion.common import common, errors, pygobject, messages
32 from flumotion.common import gstreamer, componentui
33 from flumotion.worker import feed
34
35 from flumotion.common.planet import moods
36 from flumotion.common.pygobject import gsignal
37
38 from flumotion.common.messages import N_
39 T_ = messages.gettexter('flumotion')
40
42 """
43 This class groups feeder-related information as used by a Feed Component.
44
45 @ivar feedId: id of the feed this is a feeder for
46 @ivar uiState: the serializable UI State for this feeder
47 """
56
58 """
59 The given client has connected on the given file descriptor, and is
60 being added to multifdsink. This is called solely from the reactor
61 thread.
62
63 @param clientId: id of the client of the feeder
64 @param fd: file descriptor representing the client
65 @param cleanup: callable to be called when the given fd is removed
66 """
67 if clientId not in self._clients:
68
69 client = FeederClient(clientId)
70 self._clients[clientId] = client
71 self.uiState.append('clients', client.uiState)
72
73 client = self._clients[clientId]
74 self._fdToClient[fd] = (client, cleanup)
75
76 client.connected(fd)
77
78 return client
79
81 """
82 The client has been entirely removed from multifdsink, and we may
83 now close its file descriptor.
84 The client object stays around so we can track over multiple
85 connections.
86
87 Called from GStreamer threads.
88
89 @type fd: file descriptor
90 """
91 (client, cleanup) = self._fdToClient.pop(fd)
92 client.disconnected()
93
94
95
96
97 reactor.callFromThread(cleanup, fd)
98
100 """
101 @rtype: list of all L{FeederClient}s ever seen, including currently
102 disconnected clients
103 """
104 return self._clients.values()
105
107 """
108 This class groups information related to the client of a feeder.
109 The client is identified by an id.
110 The information remains valid for the lifetime of the feeder, so it
111 can track reconnects of the client.
112
113 @ivar clientId: id of the client of the feeder
114 @ivar fd: file descriptor the client is currently using, or None.
115 """
117 self.uiState = componentui.WorkerComponentUIState()
118 self.uiState.addKey('clientId', clientId)
119 self.fd = None
120 self.uiState.addKey('fd', None)
121
122
123
124
125 for key in (
126 'bytesReadCurrent',
127 'bytesReadTotal',
128 'reconnects',
129 'lastConnect',
130 'lastDisconnect',
131 'lastActivity',
132 ):
133 self.uiState.addKey(key, 0)
134
135 for key in (
136 'buffersDroppedCurrent',
137 'buffersDroppedTotal',
138 ):
139 self.uiState.addKey(key, None)
140
141
142 self._buffersDroppedBefore = 0
143 self._bytesReadBefore = 0
144
146 """
147 @type stats: list
148 """
149 bytesSent = stats[0]
150
151
152
153 timeLastActivity = float(stats[4]) / gst.SECOND
154 if len(stats) > 5:
155
156 buffersDropped = stats[5]
157 else:
158
159
160 buffersDropped = 0
161
162 self.uiState.set('bytesReadCurrent', bytesSent)
163 self.uiState.set('buffersDroppedCurrent', buffersDropped)
164 self.uiState.set('bytesReadTotal', self._bytesReadBefore + bytesSent)
165 self.uiState.set('lastActivity', timeLastActivity)
166 if buffersDropped is not None:
167 self.uiState.set('buffersDroppedTotal',
168 self._buffersDroppedBefore + buffersDropped)
169
171 """
172 The client has connected on this fd.
173 Update related stats.
174
175 Called only from the reactor thread.
176 """
177 if not when:
178 when = time.time()
179 self.fd = fd
180 self.uiState.set('fd', fd)
181 self.uiState.set('lastConnect', when)
182 self.uiState.set('reconnects', self.uiState.get('reconnects', 0) + 1)
183
185 """
186 The client has disconnected.
187 Update related stats.
188
189 Called from GStreamer threads.
190 """
191 if not when:
192 when = time.time()
193 self.fd = None
194
195 def updateUIState():
196 self.uiState.set('fd', None)
197 self.uiState.set('lastDisconnect', when)
198
199
200 self._bytesReadBefore += self.uiState.get('bytesReadCurrent')
201 self.uiState.set('bytesReadCurrent', 0)
202 if self.uiState.get('buffersDroppedCurrent') is not None:
203 self._buffersDroppedBefore += self.uiState.get(
204 'buffersDroppedCurrent')
205 self.uiState.set('buffersDroppedCurrent', 0)
206 reactor.callFromThread(updateUIState)
207
209 """
210 This class groups eater-related information as used by a Feed Component.
211
212 @ivar eaterId: id of the feed this is eating from
213 @ivar uiState: the serializable UI State for this eater
214 """
216 self.eaterId = eaterId
217 self.uiState = componentui.WorkerComponentUIState()
218 self.uiState.addKey('eaterId')
219 self.uiState.set('eaterId', eaterId)
220
221 connectionDict = {
222 "timeTimestampDiscont": None,
223 "timestampTimestampDiscont": 0.0,
224
225 "lastTimestampDiscont": 0.0,
226 "totalTimestampDiscont": 0.0,
227 "countTimestampDiscont": 0,
228 "timeOffsetDiscont": None,
229 "offsetOffsetDiscont": 0,
230 "lastOffsetDiscont": 0,
231 "totalOffsetDiscont": 0,
232 "countOffsetDiscont": 0,
233
234 }
235 self.uiState.addDictKey('connection', connectionDict)
236
237 for key in (
238 'lastConnect',
239 'lastDisconnect',
240 'totalConnections',
241 'countTimestampDiscont',
242 'countOffsetDiscont',
243 ):
244 self.uiState.addKey(key, 0)
245 for key in (
246 'totalTimestampDiscont',
247 'totalOffsetDiscont',
248 ):
249 self.uiState.addKey(key, 0.0)
250 self.uiState.addKey('fd', None)
251
253 """
254 The eater has been connected.
255 Update related stats.
256 """
257 if not when:
258 when = time.time()
259
260 def updateUIState():
261 self.uiState.set('lastConnect', when)
262 self.uiState.set('fd', fd)
263 self.uiState.set('totalConnections',
264 self.uiState.get('totalConnections', 0) + 1)
265
266 self.uiState.setitem("connection", "countTimestampDiscont", 0)
267 self.uiState.setitem("connection", "timeTimestampDiscont", None)
268 self.uiState.setitem("connection", "lastTimestampDiscont", 0.0)
269 self.uiState.setitem("connection", "totalTimestampDiscont", 0.0)
270 self.uiState.setitem("connection", "countOffsetDiscont", 0)
271 self.uiState.setitem("connection", "timeOffsetDiscont", None)
272 self.uiState.setitem("connection", "lastOffsetDiscont", 0)
273 self.uiState.setitem("connection", "totalOffsetDiscont", 0)
274
275 reactor.callFromThread(updateUIState)
276
278 """
279 The eater has been disconnected.
280 Update related stats.
281 """
282 if not when:
283 when = time.time()
284
285 def updateUIState():
286 self.uiState.set('lastDisconnect', when)
287 self.uiState.set('fd', None)
288
289 reactor.callFromThread(updateUIState)
290
292 """
293 @param seconds: discont duration in seconds
294 @param timestamp: GStreamer timestamp of new buffer, in seconds.
295
296 Inform the eater of a timestamp discontinuity.
297 This is called from a bus message handler, so in the main thread.
298 """
299 uiState = self.uiState
300
301 c = uiState.get('connection')
302 uiState.setitem('connection', 'countTimestampDiscont',
303 c.get('countTimestampDiscont', 0) + 1)
304 uiState.set('countTimestampDiscont',
305 uiState.get('countTimestampDiscont', 0) + 1)
306
307 uiState.setitem('connection', 'timeTimestampDiscont', time.time())
308 uiState.setitem('connection', 'timestampTimestampDiscont', timestamp)
309 uiState.setitem('connection', 'lastTimestampDiscont', seconds)
310 uiState.setitem('connection', 'totalTimestampDiscont',
311 c.get('totalTimestampDiscont', 0) + seconds)
312 uiState.set('totalTimestampDiscont',
313 uiState.get('totalTimestampDiscont', 0) + seconds)
314
316 """
317 Inform the eater of an offset discontinuity.
318 This is called from a bus message handler, so in the main thread.
319 """
320 uiState = self.uiState
321
322 c = uiState.get('connection')
323 uiState.setitem('connection', 'countOffsetDiscont',
324 c.get('countOffsetDiscont', 0) + 1)
325 uiState.set('countOffsetDiscont',
326 uiState.get('countOffsetDiscont', 0) + 1)
327
328 uiState.setitem('connection', 'timeOffsetDiscont', time.time())
329 uiState.setitem('connection', 'offsetOffsetDiscont', offset)
330 uiState.setitem('connection', 'lastOffsetDiscont', units)
331 uiState.setitem('connection', 'totalOffsetDiscont',
332 c.get('totalOffsetDiscont', 0) + units)
333 uiState.set('totalOffsetDiscont',
334 uiState.get('totalOffsetDiscont', 0) + units)
335
337 """
338 I am a base class for all Flumotion feed components.
339
340 @cvar checkTimestamp: whether to check continuity of timestamps for eaters
341 @cvar checkOffset: whether to check continuity of offsets for eaters
342 """
343
344 FDSRC_TMPL = 'fdsrc name=%(name)s'
345 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay'
346 FEEDER_TMPL = 'gdppay ! multifdsink sync=false name=%(name)s buffers-max=500 buffers-soft-max=450 recover-policy=1'
347
348
349 BUFFER_PROBE_ADD_FREQUENCY = 5
350
351
352 BUFFER_CHECK_FREQUENCY = BUFFER_PROBE_ADD_FREQUENCY * 2.5
353
354 BUFFER_TIME_THRESHOLD = BUFFER_CHECK_FREQUENCY
355
356 logCategory = 'feedcomponent'
357
358 gsignal('feed-ready', str, bool)
359 gsignal('error', str, str)
360
361 _reconnectInterval = 3
362
363 checkTimestamp = False
364 checkOffset = False
365
366
368
369 self.state.addKey('eaterNames')
370 self.state.addKey('feederNames')
371
372
373 self._feeders = {}
374 self._eaters = {}
375 self.uiState.addListKey('feeders')
376 self.uiState.addListKey('eaters')
377
378 self.pipeline = None
379 self.pipeline_signals = []
380 self.bus_watch_id = None
381 self.files = []
382 self.effects = {}
383 self._probe_ids = {}
384 self._feeder_probe_cl = None
385
386 self.clock_provider = None
387
388 self.eater_names = []
389 self._eaterReconnectDC = {}
390
391 self.feedersFeeding = 0
392 self.feed_names = []
393 self.feeder_names = []
394
395 self._inactiveEaters = []
396
397
398 self._eaterStatus = {}
399
400
401 self._stateChangeDeferreds = {}
402
403 self._gotFirstNewSegment = {}
404
405
406 tcppluginversion = gstreamer.get_plugin_version('tcp')
407 self._get_stats_supported = tcppluginversion >= (0, 10, 11, 0)
408
409
410
411 vt = gstreamer.get_plugin_version('coreelements')
412 if not vt:
413 raise errors.MissingElementError('identity')
414 if not gstreamer.element_factory_has_property('identity',
415 'check-imperfect-timestamp'):
416 self.checkTimestamp = False
417 self.checkOffset = False
418 self.addMessage(
419 messages.Info(T_(N_(
420 "You will get more debugging information "
421 "if you upgrade to GStreamer 0.10.13 or later "
422 "as and when available."))))
423
425 """
426 Sets up component.
427 """
428 eater_config = self.config.get('source', [])
429 feeder_config = self.config.get('feed', [])
430
431 self.debug("feedcomponent.setup(): eater_config %r" % eater_config)
432 self.debug("feedcomponent.setup(): feeder_config %r" % feeder_config)
433
434
435 self.parseEaterConfig(eater_config)
436
437
438 self._inactiveEaters = self.eater_names[:]
439
440 for name in self.eater_names:
441 d = {
442 'lastTime': 0,
443 'lastConnectTime': 0,
444 'lastConnectD': None,
445 'checkEaterDC': None
446 }
447 self._eaterStatus[name] = d
448 self._eaters[name] = Eater(name)
449 self.uiState.append('eaters', self._eaters[name].uiState)
450 self._eaterReconnectDC['eater:' + name] = None
451
452
453 self.parseFeederConfig(feeder_config)
454 self.feedersWaiting = len(self.feeder_names)
455 for feederName in self.feeder_names:
456 self._feeders[feederName] = Feeder(feederName)
457 self.uiState.append('feeders',
458 self._feeders[feederName].uiState)
459
460 self.debug('setup() with %d eaters and %d feeders waiting' % (
461 len(self._inactiveEaters), self.feedersWaiting))
462
463 pipeline = self.create_pipeline()
464 self.set_pipeline(pipeline)
465
466 self.debug('setup() finished')
467
468 return defer.succeed(None)
469
470
472 """
473 Subclasses have to implement this method.
474
475 @rtype: L{gst.Pipeline}
476 """
477 raise NotImplementedError, "subclass must implement create_pipeline"
478
488
490 """
491 The eater for the given feedId is no longer active
492 By default, the component will go hungry.
493 """
494 self.info('Eater of %s is inactive' % feedId)
495 if feedId in self._inactiveEaters:
496 self.warning('Eater of %s was already inactive' % feedId)
497 else:
498 self._inactiveEaters.append(feedId)
499 self.setMood(moods.hungry)
500
502 """
503 The eater for the given feedId is now active and producing data.
504 By default, the component will go happy if all eaters are active.
505 """
506 self.info('Eater of %s is active' % feedId)
507 if feedId not in self._inactiveEaters:
508 self.warning('Eater of %s was already active' % feedId)
509 else:
510 self._inactiveEaters.remove(feedId)
511 if not self._inactiveEaters:
512 self.setMood(moods.happy)
513
514
515
516
518 """
519 Inform of a timestamp discontinuity for the given eater.
520 """
521 discont = curTs - (prevTs + prevDuration)
522 dSeconds = discont / float(gst.SECOND)
523 self.debug("we have a discont on feedId %s of %f s between %s and %s ",
524 feedId, dSeconds,
525 gst.TIME_ARGS(prevTs),
526 gst.TIME_ARGS(curTs))
527 self._eaters[feedId].timestampDiscont(dSeconds,
528 float(curTs) / float(gst.SECOND))
529
531 """
532 Inform of a timestamp discontinuity for the given eater.
533 """
534 discont = curOffset - prevOffsetEnd
535 self.debug(
536 "we have a discont on feedId %s of %d units between %d and %d ",
537 feedId, discont, prevOffsetEnd, curOffset)
538 self._eaters[feedId].offsetDiscont(discont, curOffset)
539
540
544
546 """
547 Notify the manager that an effect property has changed to a new value.
548
549 Admin clients will receive it as a propertyChanged message for
550 effectName:propertyName.
551 """
552 self.medium.callRemote("propertyChanged", self.name,
553 "%s:%s" % (effectName, propertyName), value)
554
556
557
558
559 eater_names = []
560 for block in eater_config:
561 eater_name = block
562 if block.find(':') == -1:
563 eater_name = block + ':default'
564 eater_names.append(eater_name)
565 self.debug('parsed eater config, eater feedIds %r' % eater_names)
566 self.eater_names = eater_names
567 self.state.set('eaterNames', self.eater_names)
568
570
571
572
573
574
575 self.feed_names = feeder_config
576
577
578
579 self.feeder_names = map(lambda n: self.name + ':' + n, self.feed_names)
580 self.debug('parsed feeder config, feeders %r' % self.feeder_names)
581 self.state.set('feederNames', self.feeder_names)
582
584 """
585 Return the list of feeder names this component eats from.
586
587 @returns: a list of "componentName:feedName" strings
588 """
589 return self.eater_names
590
592 """
593 Return the list of feedId's of feeders this component has.
594
595 @returns: a list of "componentName:feedName" strings
596 """
597 return self.feeder_names
598
600 """
601 Return the list of feedeNames for feeds this component has.
602
603 @returns: a list of "feedName" strings
604 """
605 return self.feed_names
606
609
611 if statechange not in self._stateChangeDeferreds:
612 self._stateChangeDeferreds[statechange] = []
613
614 d = defer.Deferred()
615 self._stateChangeDeferreds[statechange].append(d)
616
617 return d
618
619
621 if old == gst.STATE_NULL and new == gst.STATE_READY:
622 return gst.STATE_CHANGE_NULL_TO_READY
623 elif old == gst.STATE_READY and new == gst.STATE_PAUSED:
624 return gst.STATE_CHANGE_READY_TO_PAUSED
625 elif old == gst.STATE_PAUSED and new == gst.STATE_PLAYING:
626 return gst.STATE_CHANGE_PAUSED_TO_PLAYING
627 elif old == gst.STATE_PLAYING and new == gst.STATE_PAUSED:
628 return gst.STATE_CHANGE_PLAYING_TO_PAUSED
629 elif old == gst.STATE_PAUSED and new == gst.STATE_READY:
630 return gst.STATE_CHANGE_PAUSED_TO_READY
631 elif old == gst.STATE_READY and new == gst.STATE_NULL:
632 return gst.STATE_CHANGE_READY_TO_NULL
633 else:
634 return 0
635
637 t = message.type
638 src = message.src
639
640
641 if t == gst.MESSAGE_STATE_CHANGED:
642 old, new, pending = message.parse_state_changed()
643
644 if src == self.pipeline:
645 self.log('state change: %r %s->%s'
646 % (src, old.value_nick, new.value_nick))
647 if old == gst.STATE_PAUSED and new == gst.STATE_PLAYING:
648 self.setMood(moods.happy)
649
650 change = self._getStateChange(old,new)
651 if change in self._stateChangeDeferreds:
652 dlist = self._stateChangeDeferreds[change]
653 for d in dlist:
654 d.callback(None)
655 del self._stateChangeDeferreds[change]
656
657 elif src.get_name() in ['feeder:'+n for n in self.feeder_names]:
658 if old == gst.STATE_PAUSED and new == gst.STATE_PLAYING:
659 self.debug('feeder %s is now feeding' % src.get_name())
660 self.feedersWaiting -= 1
661 self.debug('%d feeders waiting' % self.feedersWaiting)
662
663 feed_name = src.get_name().split(':')[2]
664 self.emit('feed-ready', feed_name, True)
665 elif t == gst.MESSAGE_ERROR:
666 gerror, debug = message.parse_error()
667 self.warning('element %s error %s %s' %
668 (src.get_path_string(), gerror, debug))
669 self.setMood(moods.sad)
670
671 id = "%s-%s-%d" % (self.name, gerror.domain, gerror.code)
672 m = messages.Error(T_(N_(
673 "Internal GStreamer error.")),
674 debug="%s\n%s: %d\n%s" % (
675 gerror.message, gerror.domain, gerror.code, debug),
676 id=id, priority=40)
677 self.state.append('messages', m)
678
679
680 changes = [gst.STATE_CHANGE_NULL_TO_READY,
681 gst.STATE_CHANGE_READY_TO_PAUSED,
682 gst.STATE_CHANGE_PAUSED_TO_PLAYING]
683
684
685 curstate = self.pipeline.get_state()
686 if curstate == gst.STATE_NULL:
687 changes.append(gst.STATE_CHANGE_READY_TO_NULL)
688 if curstate <= gst.STATE_PAUSED:
689 changes.append(gst.STATE_CHANGE_PLAYING_TO_PAUSED)
690 if curstate <= gst.STATE_READY:
691 changes.append(gst.STATE_CHANGE_PAUSED_TO_READY)
692 for change in changes:
693 if change in self._stateChangeDeferreds:
694 self.log("We have an error, going to errback pending "
695 "state change defers")
696 dlist = self._stateChangeDeferreds[change]
697 for d in dlist:
698 d.errback(errors.ComponentStartHandledError(
699 gerror.message))
700 del self._stateChangeDeferreds[change]
701
702 elif t == gst.MESSAGE_EOS:
703 name = src.get_name()
704 if name in ['eater:' + n for n in self.eater_names]:
705 self.info('End of stream in eater %s' % src.get_name())
706 feedId = name[len('eater:'):]
707 self.eaterSetInactive(feedId)
708
709 self._reconnectEater(feedId)
710 else:
711 self.warning("We got an eos from %s", name)
712 elif t == gst.MESSAGE_ELEMENT:
713 if message.structure.get_name() == 'imperfect-timestamp':
714 identityName = src.get_name()
715 eaterName = identityName.split("-identity")[0]
716 feedId = eaterName[len('eater:'):]
717
718 self.log("we have an imperfect stream from %s" % src.get_name())
719
720 s = message.structure
721 self.eaterTimestampDiscont(feedId, s["prev-timestamp"],
722 s["prev-duration"], s["cur-timestamp"])
723 elif message.structure.get_name() == 'imperfect-offset':
724 identityName = src.get_name()
725 eaterName = identityName.split("-identity")[0]
726 feedId = eaterName[len('eater:'):]
727
728 self.log("we have an imperfect stream from %s" % src.get_name())
729
730 s = message.structure
731 self.eaterOffsetDiscont(feedId, s["prev-offset-end"],
732 s["cur-offset"])
733
734
735 else:
736 self.log('message received: %r' % message)
737
738 return True
739
740
776
778 if not self.pipeline:
779 return
780
781 if self.clock_provider:
782 self.clock_provider.set_property('active', False)
783 self.clock_provider = None
784 retval = self.pipeline.set_state(gst.STATE_NULL)
785 if retval != gst.STATE_CHANGE_SUCCESS:
786 self.warning('Setting pipeline to NULL failed')
787
789 self.debug("cleaning up")
790
791 assert self.pipeline != None
792
793 self.pipeline_stop()
794
795 map(self.pipeline.disconnect, self.pipeline_signals)
796 self.pipeline.get_bus().disconnect(self.bus_watch_id)
797 self.pipeline.get_bus().remove_signal_watch()
798 self.pipeline = None
799 self.pipeline_signals = []
800 self.bus_watch_id = None
801
802 if self._feeder_probe_cl:
803 self._feeder_probe_cl.cancel()
804 self._feeder_probe_cl = None
805
806
807 for feedId in self.eater_names:
808 status = self._eaterStatus[feedId]
809 if status['checkEaterDC']:
810 status['checkEaterDC'].cancel()
811 status['checkEaterDC'] = None
812
819
821 self.debug("Master clock set to %s:%d with base_time %s", ip, port,
822 gst.TIME_ARGS(base_time))
823
824 clock = gst.NetClientClock(None, ip, port, base_time)
825 self.pipeline.set_base_time(base_time)
826 self.pipeline.use_clock(clock)
827
829 """
830 Tell the component to provide a master clock on the given port.
831
832 @returns: (ip, port, base_time) triple.
833 """
834 def pipelinePaused(r):
835 clock = self.pipeline.get_clock()
836
837 self.pipeline.use_clock(clock)
838
839 self.clock_provider = gst.NetTimeProvider(clock, None, port)
840
841 self.clock_provider.set_property('active', False)
842
843 base_time = clock.get_time()
844 self.pipeline.set_base_time(base_time)
845
846 self.debug('provided master clock from %r, base time %s'
847 % (clock, gst.TIME_ARGS(base_time)))
848
849 if self.medium:
850
851
852
853 ip = self.medium.getIP()
854 else:
855 ip = "127.0.0.1"
856
857 return (ip, port, base_time)
858
859 if not self.pipeline:
860 self.warning('No self.pipeline, cannot provide master clock')
861
862
863
864 if self.clock_provider:
865 self.warning('already had a clock provider, removing it')
866 self.clock_provider = None
867
868
869 (ret, state, pending) = self.pipeline.get_state(0)
870 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING:
871 self.info ("Setting pipeline to PAUSED")
872
873 d = self._addStateChangeDeferred(gst.STATE_CHANGE_READY_TO_PAUSED)
874 d.addCallback(pipelinePaused)
875
876 self.pipeline.set_state(gst.STATE_PAUSED)
877 return d
878 else:
879 self.info ("Pipeline already started, retrieving clocking")
880
881 ip = self.state.get('manager-ip')
882 base_time = self.pipeline.get_base_time()
883 d = defer.Deferred()
884 d.callback((ip, port, base_time))
885 return d
886
887
888
922
944
953
955 """
956 Periodically scheduled buffer probe, that ensures that we're currently
957 actually having dataflow through our eater elements.
958
959 Called from GStreamer threads.
960
961 @param pad The gst.Pad srcpad for one eater in this component.
962 @param buffer A gst.Buffer that has arrived on this pad
963 @param feedId The feedId for the feed we're eating on this pad
964 @param firstTime Boolean, true if this is the first time this buffer
965 probe has been added for this eater.
966 """
967
968
969
970 method = self.log
971 if firstTime: method = self.debug
972 method('buffer probe on eater %s has timestamp %s' % (
973 feedId, gst.TIME_ARGS(buffer.timestamp)))
974
975
976
977
978
979 self._eaterStatus[feedId]['lastTime'] = time.time()
980 probeid = self._probe_ids.pop(feedId, None)
981 if probeid:
982 pad.remove_buffer_probe(probeid)
983
984
985 reactor.callFromThread(reactor.callLater,
986 self.BUFFER_PROBE_ADD_FREQUENCY,
987 self._add_buffer_probe, pad, feedId)
988
989
990
991 reactor.callFromThread(self._checkEater, feedId)
992
993 return True
994
1051
1053 eater = self._eaters[feedId]
1054 eater.disconnected()
1055
1056
1057 status = self._eaterStatus[feedId]
1058
1059
1060
1061
1062
1063 self._eaterStatus[feedId]['lastTime'] = 0
1064
1065 status['lastConnectTime'] = time.time()
1066 if status['lastConnectD']:
1067 self.debug('Cancel previous connection attempt ?')
1068
1069
1070 d = self.medium.connectEater(feedId)
1071 def connectEaterCb(result, status, eater):
1072 status['lastConnectD'] = None
1073 d.addCallback(connectEaterCb, status, eater)
1074 status['lastConnectD'] = d
1075
1077 """Get an element out of the pipeline.
1078
1079 If it is possible that the component has not yet been set up,
1080 the caller needs to check if self.pipeline is actually set.
1081 """
1082 assert self.pipeline
1083 element = self.pipeline.get_by_name(element_name)
1084 return element
1085
1087 'Gets a property of an element in the GStreamer pipeline.'
1088 self.debug("%s: getting property %s of element %s" % (self.getName(), property, element_name))
1089 element = self.get_element(element_name)
1090 if not element:
1091 msg = "Element '%s' does not exist" % element_name
1092 self.warning(msg)
1093 raise errors.PropertyError(msg)
1094
1095 self.debug('getting property %s on element %s' % (property, element_name))
1096 try:
1097 value = element.get_property(property)
1098 except (ValueError, TypeError):
1099 msg = "Property '%s' on element '%s' does not exist" % (property, element_name)
1100 self.warning(msg)
1101 raise errors.PropertyError(msg)
1102
1103
1104 if isinstance(value, gobject.GEnum):
1105 value = int(value)
1106
1107 return value
1108
1110 'Sets a property on an element in the GStreamer pipeline.'
1111 self.debug("%s: setting property %s of element %s to %s" % (
1112 self.getName(), property, element_name, value))
1113 element = self.get_element(element_name)
1114 if not element:
1115 msg = "Element '%s' does not exist" % element_name
1116 self.warning(msg)
1117 raise errors.PropertyError(msg)
1118
1119 self.debug('setting property %s on element %r to %s' %
1120 (property, element_name, value))
1121 pygobject.gobject_set_property(element, property, value)
1122
1123
1124 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
1125 """
1126 @param feedName: name of the feed to feed to the given fd.
1127 @type feedName: str
1128 @param fd: the file descriptor to feed to
1129 @type fd: int
1130 @param cleanup: the function to call when the FD is no longer feeding
1131 @type cleanup: callable
1132 """
1133 self.debug('FeedToFD(%s, %d)' % (feedName, fd))
1134 feedId = common.feedId(self.name, feedName)
1135
1136 if not self.pipeline:
1137 self.warning('told to feed %s to fd %d, but pipeline not '
1138 'running yet', feedId, fd)
1139 cleanup(fd)
1140
1141
1142 return
1143
1144 elementName = "feeder:%s" % feedId
1145 element = self.get_element(elementName)
1146 if not element:
1147 msg = "Cannot find feeder element named '%s'" % elementName
1148 id = "feedToFD-%s" % feedName
1149 m = messages.Warning(T_(N_("Internal Flumotion error.")),
1150 debug=msg, id=id, priority=40)
1151 self.state.append('messages', m)
1152 self.warning(msg)
1153 return False
1154
1155 clientId = eaterId or ('client-%d' % fd)
1156
1157 element.emit('add', fd)
1158 self._feeders[feedId].clientConnected(clientId, fd, cleanup)
1159
1161 """
1162 Called (as a signal callback) when the FD is no longer in use by
1163 multifdsink.
1164 This will call the registered callable on the fd.
1165
1166 Called from GStreamer threads.
1167 """
1168 self.debug("cleaning up fd %d", fd)
1169 feedId = ':'.join(sink.get_name().split(':')[1:])
1170 self._feeders[feedId].clientDisconnected(fd)
1171
1173 """
1174 Tell the component to eat the given feedId from the given fd.
1175 The component takes over the ownership of the fd, closing it when
1176 no longer eating.
1177
1178 @param feedId: feed id (componentName:feedName) to eat from through
1179 the given fd
1180 @type feedId: str
1181 @param fd: the file descriptor to eat from
1182 @type fd: int
1183 """
1184 self.debug('EatFromFD(%s, %d)' % (feedId, fd))
1185
1186 if not self.pipeline:
1187 self.warning('told to eat %s from fd %d, but pipeline not '
1188 'running yet', feedId, fd)
1189
1190
1191 os.close(fd)
1192 return
1193
1194 eaterName = "eater:%s" % feedId
1195 self.debug('looking up element %s' % eaterName)
1196 element = self.get_element(eaterName)
1197
1198
1199 (result, current, pending) = element.get_state(0L)
1200 if current not in [gst.STATE_NULL, gst.STATE_READY]:
1201 self.debug('eater %s in state %r, kidnapping it' % (
1202 eaterName, current))
1203
1204
1205
1206
1207
1208
1209
1210 srcpad = element.get_pad('src')
1211
1212 def _block_cb(pad, blocked):
1213 pass
1214 srcpad.set_blocked_async(True, _block_cb)
1215 self.unblock_eater(feedId)
1216
1217
1218 sinkpad = srcpad.get_peer()
1219 srcpad.unlink(sinkpad)
1220 self.pipeline.remove(element)
1221 self.log("setting to ready")
1222 element.set_state(gst.STATE_READY)
1223 self.log("setting to ready complete!!!")
1224 old = element.get_property('fd')
1225 os.close(old)
1226 element.set_property('fd', fd)
1227 self.pipeline.add(element)
1228 srcpad.link(sinkpad)
1229 element.set_state(gst.STATE_PLAYING)
1230
1231 srcpad.set_blocked_async(False, _block_cb)
1232 else:
1233 element.set_property('fd', fd)
1234
1235
1236 self._eaters[feedId].connected(fd)
1237
1239 """
1240 After this function returns, the stream lock for this eater must have
1241 been released. If your component needs to do something here, override
1242 this method.
1243 """
1244 pass
1245
1247 """
1248 An event probe used to consume unwanted EOS events on eaters.
1249
1250 Called from GStreamer threads.
1251 """
1252 if event.type == gst.EVENT_EOS:
1253 self.info(
1254 'End of stream on feed %s, disconnect will be triggered' %
1255 feedId)
1256
1257
1258
1259 return False
1260 return True
1261
1263 """
1264 An event probe used to consume unwanted duplicate newsegment events.
1265
1266 Called from GStreamer threads.
1267 """
1268 if event.type == gst.EVENT_NEWSEGMENT:
1269
1270
1271 if feedId in self._gotFirstNewSegment:
1272 self.info(
1273 "Subsequent new segment event received on depay on "
1274 " feed %s" % feedId)
1275
1276 return False
1277 else:
1278 self._gotFirstNewSegment[feedId] = True
1279 return True
1280
1281 pygobject.type_register(FeedComponent)
1282