1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Feed components, participating in the stream
24 """
25
26 import gst
27 import gst.interfaces
28 import gobject
29
30 from twisted.internet import reactor, defer
31 from twisted.spread import pb
32
33 from flumotion.configure import configure
34 from flumotion.component import component as basecomponent
35 from flumotion.common import common, interfaces, errors, log, pygobject, messages
36 from flumotion.common import gstreamer
37
38 from flumotion.common.planet import moods
39 from flumotion.common.pygobject import gsignal
40 from flumotion.twisted.compat import implements
41
42
43 from flumotion.worker import feed
44 from flumotion.common.messages import N_
45 T_ = messages.gettexter('flumotion')
46
48 """
49 I am a component-side medium for a FeedComponent to interface with
50 the manager-side ComponentAvatar.
51 """
52 implements(interfaces.IComponentMedium)
53 logCategory = 'feedcompmed'
54 remoteLogName = 'feedserver'
55
57 """
58 @param component: L{flumotion.component.feedcomponent.FeedComponent}
59 """
60 basecomponent.BaseComponentMedium.__init__(self, component)
61
62 self._feederFeedServer = {}
63
64 self._feederClientFactory = {}
65 self._eaterFeedServer = {}
66
67 self._eaterClientFactory = {}
68 self._eaterTransport = {}
69 self.logName = component.name
70
71 def on_feed_ready(component, feedName, isReady):
72 self.callRemote('feedReady', feedName, isReady)
73
74 def on_component_error(component, element_path, message):
75 self.callRemote('error', element_path, message)
76
77 self.comp.connect('feed-ready', on_feed_ready)
78 self.comp.connect('error', on_component_error)
79
80
81
82
83
84
85
88
91
93 """
94 Sets the GStreamer debugging levels based on the passed debug string.
95 """
96 self.debug('Setting GStreamer debug level to %s' % debug)
97 if not debug:
98 return
99
100 for part in debug.split(','):
101 glob = None
102 value = None
103 pair = part.split(':')
104 if len(pair) == 1:
105
106 value = int(pair[0])
107 elif len(pair) == 2:
108 glob, value = pair
109 else:
110 self.warning("Cannot parse GStreamer debug setting '%s'." %
111 part)
112 continue
113
114 if glob:
115 gst.debug_set_threshold_for_name(glob, value)
116 else:
117 gst.debug_set_default_threshold(value)
118
120 """
121 Tell the component the host and port for the FeedServer through which
122 it can connect a local eater to a remote feeder to eat the given
123 fullFeedId.
124
125 Called on by the manager-side ComponentAvatar.
126 """
127
128
129 flowName, componentName, feedName = common.parseFullFeedId(fullFeedId)
130 feedId = common.feedId(componentName, feedName)
131 self._feederFeedServer[feedId] = (fullFeedId, host, port)
132
133 return self.connectEater(feedId)
134
161
162 d.addCallback(sendFeedCb)
163 return d
164
165 d.addCallback(loginCb)
166 return d
167
169 """
170 Tell the component to feed the given feed to the receiving component
171 accessible through the FeedServer on the given host and port.
172
173 Called on by the manager-side ComponentAvatar.
174 """
175
176
177 self._eaterFeedServer[(componentId, feedId)] = (host, port)
178 client = feed.FeedMedium(self.comp)
179 factory = feed.FeedClientFactory(client)
180
181 self.debug('connecting to FeedServer on %s:%d' % (host, port))
182 reactor.connectTCP(host, port, factory)
183 d = factory.login(self.authenticator)
184 self._eaterClientFactory[(componentId, feedId)] = factory
185 def loginCb(remoteRef):
186 self.debug('logged in to feedserver, remoteRef %r' % remoteRef)
187 client.setRemoteReference(remoteRef)
188
189 self.debug(
190 'COMPONENT --> feedserver: receiveFeed(%s, %s)' % (
191 componentId, feedId))
192 d = remoteRef.callRemote('receiveFeed', componentId, feedId)
193
194 def receiveFeedCb(result):
195 self.debug(
196 'COMPONENT <-- feedserver: receiveFeed(%s, %s): %r' % (
197 componentId, feedId, result))
198 componentName, feedName = common.parseFeedId(feedId)
199 t = remoteRef.broker.transport
200 t.stopReading()
201 t.stopWriting()
202
203 key = (componentId, feedId)
204 self._eaterTransport[key] = t
205 remoteRef.broker.transport = None
206 fd = t.fileno()
207 self.debug('Telling component to feed feedName %s to fd %d'% (
208 feedName, fd))
209 self.comp.feedToFD(feedName, fd)
210
211 d.addCallback(receiveFeedCb)
212 return d
213
214 d.addCallback(loginCb)
215 return d
216
218 """
219 Tells the component to start providing a master clock on the given
220 UDP port.
221 Can only be called if setup() has been called on the component.
222
223 The IP address returned is the local IP the clock is listening on.
224
225 @returns: (ip, port, base_time)
226 @rtype: tuple of (str, int, long)
227 """
228 self.debug('remote_provideMasterClock(port=%r)' % port)
229 return self.comp.provide_master_clock(port)
230
231 - def remote_effect(self, effectName, methodName, *args, **kwargs):
232 """
233 Invoke the given methodName on the given effectName in this component.
234 The effect should implement effect_(methodName) to receive the call.
235 """
236 self.debug("calling %s on effect %s" % (methodName, effectName))
237 if not effectName in self.comp.effects:
238 raise errors.UnknownEffectError(effectName)
239 effect = self.comp.effects[effectName]
240 if not hasattr(effect, "effect_%s" % methodName):
241 raise errors.NoMethodError("%s on effect %s" % (methodName,
242 effectName))
243 method = getattr(effect, "effect_%s" % methodName)
244 try:
245 result = method(*args, **kwargs)
246 except TypeError:
247 msg = "effect method %s did not accept %s and %s" % (
248 methodName, args, kwargs)
249 self.debug(msg)
250 raise errors.RemoteRunError(msg)
251 self.debug("effect: result: %r" % result)
252 return result
253
254 from feedcomponent010 import FeedComponent
255
256 FeedComponent.componentMediumClass = FeedComponentMedium
257
259 'A component using gst-launch syntax'
260
261 DELIMITER = '@'
262
263
297
301
302
304 """
305 Method that must be implemented by subclasses to produce the
306 gstparse string for the component's pipeline. Subclasses should
307 not chain up; this method raises a NotImplemented error.
308
309 Returns: a new pipeline string representation.
310 """
311 raise NotImplementedError('subclasses should implement '
312 'get_pipeline_string')
313
323
324
326 """
327 Expand the given string to a full element name for an eater or feeder.
328 The full name is of the form eater:(sourceComponentName):(feedName)
329 or feeder:(componentName):feedName
330 """
331 if ' ' in block:
332 raise TypeError, "spaces not allowed in '%s'" % block
333 if not ':' in block:
334 raise TypeError, "no colons in'%s'" % block
335 if block.count(':') > 2:
336 raise TypeError, "too many colons in '%s'" % block
337
338 parts = block.split(':')
339
340 if parts[0] != 'eater' and parts[0] != 'feeder':
341 raise TypeError, "'%s' does not start with eater or feeder" % block
342
343
344 if not parts[1]:
345 if parts[0] == 'eater':
346 raise TypeError, "'%s' should specify feeder component" % block
347 parts[1] = self.name
348 if len(parts) == 2:
349 parts.append('')
350 if not parts[2]:
351 parts[2] = 'default'
352
353 return ":".join(parts)
354
356 """
357 Expand each @..@ block to use the full element name for eater or feeder.
358 The full name is of the form eater:(sourceComponentName):(feedName)
359 or feeder:(componentName):feedName
360 This also does some basic checking of the block.
361 """
362 assert block != ''
363
364
365 if block.count(self.DELIMITER) % 2 != 0:
366 raise TypeError, "'%s' contains an odd number of '%s'" % (block, self.DELIMITER)
367
368
369
370 blocks = block.split(self.DELIMITER)
371
372 for i in range(1, len(blocks) - 1, 2):
373 blocks[i] = self._expandElementName(blocks[i].strip())
374 return "@".join(blocks)
375
376 - def parse_tmpl(self, pipeline, names, template_func, format):
377 """
378 Expand the given pipeline string representation by substituting
379 blocks between '@' with a filled-in template.
380
381 @param pipeline: a pipeline string representation with variables
382 @param names: the element names to substitute for @...@ segments
383 @param template_func: function to call to get the template to use for
384 element factory info
385 @param format: the format to use when substituting
386
387 Returns: a new pipeline string representation.
388 """
389 assert pipeline != ''
390
391 deli = self.DELIMITER
392
393 if len(names) == 1:
394 part_name = names[0]
395 template = template_func(part_name)
396 named = template % {'name': part_name}
397 if pipeline.find(part_name) != -1:
398 pipeline = pipeline.replace(deli + part_name + deli, named)
399 else:
400 pipeline = format % {'tmpl': named, 'pipeline': pipeline}
401 else:
402 for part in names:
403 part_name = deli + part + deli
404 if pipeline.find(part_name) == -1:
405 raise TypeError, "%s needs to be specified in the pipeline '%s'" % (part_name, pipeline)
406
407 template = template_func(part)
408 pipeline = pipeline.replace(part_name,
409 template % {'name': part})
410 return pipeline
411
413 pipeline = " ".join(pipeline.split())
414 self.debug('Creating pipeline, template is %s' % pipeline)
415
416 eater_names = self.get_eater_names()
417 if pipeline == '' and not eater_names:
418 raise TypeError, "Need a pipeline or a eater"
419
420 if pipeline == '':
421 assert eater_names
422 pipeline = 'fakesink signal-handoffs=1 silent=1 name=sink'
423
424
425
426
427 eater_element_names = map(lambda n: "eater:" + n, eater_names)
428 feeder_element_names = map(lambda n: "feeder:" + n, self.feeder_names)
429 self.debug('we eat with eater elements %s' % eater_element_names)
430 self.debug('we feed with feeder elements %s' % feeder_element_names)
431 pipeline = self._expandElementNames(pipeline)
432
433 pipeline = self.parse_tmpl(pipeline, eater_element_names,
434 self.get_eater_template,
435 '%(tmpl)s ! %(pipeline)s')
436 pipeline = self.parse_tmpl(pipeline, feeder_element_names,
437 self.get_feeder_template,
438 '%(pipeline)s ! %(tmpl)s')
439 pipeline = " ".join(pipeline.split())
440
441 self.debug('pipeline for %s is %s' % (self.getName(), pipeline))
442 assert self.DELIMITER not in pipeline
443
444 return pipeline
445
461
464
466 """
467 Return a parse-launch description of a queue, if this component
468 wants an input queue on this eater, or None if not
469 """
470 return None
471
472
474 """
475 Tell the component to start.
476 Whatever is using the component is responsible for making sure all
477 eaters have received their file descriptor to eat from.
478
479 @param clocking: tuple of (ip, port, base_time) of a master clock,
480 or None not to slave the clock
481 @type clocking: tuple(str, int, long) or None.
482 """
483 self.debug('ParseLaunchComponent.start')
484 if clocking:
485 self.info('slaving to master clock on %s:%d with base time %d' %
486 clocking)
487
488 if clocking:
489 self.set_master_clock(*clocking)
490
491 self.link()
492
493 return defer.succeed(None)
494
496 """
497 I am a part of a feed component for a specific group
498 of functionality.
499
500 @ivar name: name of the effect
501 @type name: string
502 @ivar component: component owning the effect
503 @type component: L{FeedComponent}
504 """
505 logCategory = "effect"
506
508 """
509 @param name: the name of the effect
510 """
511 self.name = name
512 self.setComponent(None)
513
515 """
516 Set the given component as the effect's owner.
517
518 @param component: the component to set as an owner of this effect
519 @type component: L{FeedComponent}
520 """
521 self.component = component
522 self.setUIState(component and component.uiState or None)
523
525 """
526 Set the given UI state on the effect. This method is ideal for
527 adding keys to the UI state.
528
529 @param state: the UI state for the component to use.
530 @type state: L{flumotion.common.componentui.WorkerComponentUIState}
531 """
532 self.uiState = state
533
535 """
536 Get the component owning this effect.
537
538 @rtype: L{FeedComponent}
539 """
540 return self.component
541
599
600 signalid = queue.connect("underrun", _underrun_cb)
601