1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 """
24 Flumotion-launch: A gst-launch analog for Flumotion.
25
26 The goal of flumotion-launch is to provide an easy way for testing
27 flumotion components, without involving much of Flumotion's core code.
28
29 Flumotion-launch takes a terse gst-launch-like syntax, translates that
30 into a component graph, and starts the components. An example would be::
31
32 flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer
33
34 You can also set properties::
35
36 flumotion-launch videotest framerate=15/2
37
38 You can link specific feeders as well::
39
40 flumotion-launch firewire .audio ! vorbis-encoder
41 flumotion-launch firewire firewire0.audio ! vorbis-encoder
42
43 Components can be backreferenced using their names::
44
45 flumotion-launch videotest audiotest videotest0. ! ogg-muxer \
46 audiotest0. ! ogg-muxer0.
47
48 In addition, components can have plugs::
49
50 flumotion-launch http-streamer /apachelogger,logfile=/dev/stdout
51
52 Flumotion-launch explicitly avoids much of Flumotion's core logic. It
53 does not import flumotion.manager, flumotion.admin, or flumotion.worker.
54 There is no depgraph, no feed server, no job process. Although it might
55 be useful in the future to add a way to use the standard interfaces to
56 start components via admin, manager, worker, and job instances, this
57 low-level interface is useful in debugging problems and should be kept.
58 """
59
60
61 import optparse
62 import os
63 import sys
64
65 from twisted.python import reflect
66 from twisted.internet import reactor, defer
67
68 from flumotion.common import log, common, registry, errors, messages
69 from flumotion.twisted import flavors
70
71 from flumotion.launch import parse
72
73 from gettext import gettext as _
74
75 _headings = {
76 messages.ERROR: _('Error'),
77 messages.WARNING: _('Warning'),
78 messages.INFO: _('Note')
79 }
80
81
83 sys.stderr.write(x + '\n')
84 raise SystemExit(1)
85
86
129 d.addErrback(handledEb)
130 return d
131
135
136 - def start(self, clocking):
138
141
144
147
149 fds = {}
150 wrappersByName = dict([(wrapper.name, wrapper)
151 for wrapper in wrappers])
152 def starter(wrapper, feedName, write):
153 return lambda: wrapper.feedToFD(feedName, write)
154 for wrapper in wrappers:
155 for source in wrapper.config.get('source', []):
156 compName, feedName = source.split(':')
157 read, write = os.pipe()
158 start = starter(wrappersByName[compName], feedName, write)
159 fds[source] = (read, start)
160 return fds
161
166
168
169
170
171 def got_results(results):
172 success = True
173 for result, wrapper in zip(results, wrappers):
174 if not result[0]:
175 print ("Component %s failed to start, reason: %r"
176 % (wrapper, result[1]))
177 success = False
178 if not success:
179 raise errors.ComponentStartError()
180
181 def choose_clocking(unused):
182
183 need_sync = [(x.config['clock-master'], x) for x in wrappers
184 if x.config['clock-master'] is not None]
185 need_sync.sort()
186 need_sync = [x[1] for x in need_sync]
187
188 if need_sync:
189 def addNeedSync(clocking):
190 return need_sync, clocking
191 master = need_sync.pop(0)
192 print "Telling", master.name, "to provide the master clock."
193 d = master.provideMasterClock(7600 - 1)
194 d.addCallback(addNeedSync)
195 return d
196 else:
197 return None, None
198
199 def add_delay(val):
200 if delay:
201 print 'Delaying component startup by %f seconds...' % delay
202 return DeferredDelay(delay, val)
203 else:
204 return defer.succeed(val)
205
206 def do_start(synchronization, wrapper):
207 need_sync, clocking = synchronization
208
209
210 for source in wrapper.config.get('source', []):
211 read, start = fds[source]
212 wrapper.eatFromFD(source, read)
213 start()
214 if (not need_sync) or (wrapper not in need_sync) or (not clocking):
215 clocking = None
216 d = wrapper.start(clocking)
217 d.addCallback(lambda val: synchronization)
218 return d
219
220 def do_stop(failure):
221 for wrapper in wrappers:
222 wrapper.stop()
223 return failure
224
225 d = defer.DeferredList([wrapper.instantiate() for wrapper in wrappers])
226 d.addCallback(got_results)
227 d.addCallback(choose_clocking)
228 for wrapper in wrappers:
229 d.addCallback(add_delay)
230 d.addCallback(do_start, wrapper)
231 d.addErrback(do_stop)
232 return d
233
235 from flumotion.common import setup
236 setup.setupPackagePath()
237 from flumotion.configure import configure
238 log.debug('manager', 'Running Flumotion version %s' %
239 configure.version)
240 import twisted.copyright
241 log.debug('launch', 'Running against Twisted version %s' %
242 twisted.copyright.version)
243 from flumotion.project import project
244 for p in project.list():
245 log.debug('launch', 'Registered project %s version %s' % (
246 p, project.get(p, 'version')))
247
248 parser = optparse.OptionParser()
249 parser.add_option('-d', '--debug',
250 action="store", type="string", dest="debug",
251 help="set debug levels")
252 parser.add_option('', '--delay',
253 action="store", type="float", dest="delay",
254 help="set debug levels")
255 parser.add_option('-v', '--verbose',
256 action="store_true", dest="verbose",
257 help="be verbose")
258 parser.add_option('', '--version',
259 action="store_true", dest="version",
260 default=False,
261 help="show version information")
262
263 log.debug('worker', 'Parsing arguments (%r)' % ', '.join(args))
264 options, args = parser.parse_args(args)
265
266
267 if options.verbose:
268 options.debug = "*:3"
269
270
271 if options.version:
272 print common.version("flumotion-launch")
273 return 0
274
275 if options.debug:
276 log.setFluDebug(options.debug)
277
278 if options.delay:
279 delay = options.delay
280 else:
281 delay = 0.
282
283
284 configs = parse.parse_args(args[1:])
285
286
287 wrappers = [ComponentWrapper(config) for config in configs]
288
289
290 fds = make_pipes(wrappers)
291
292 reactor.running = False
293 reactor.failure = False
294 reactor.callLater(0, lambda: setattr(reactor, 'running', True))
295
296 d = start_components(wrappers, fds, delay)
297
298 def errback(failure):
299 log.debug('launch', log.getFailureMessage(failure))
300 print "Error occurred: %s" % failure.getErrorMessage()
301 failure.printDetailedTraceback()
302 reactor.failure = True
303 if reactor.running:
304 print "Stopping reactor."
305 reactor.stop()
306 d.addErrback(errback)
307
308 if not reactor.failure:
309 print 'Running the reactor. Press Ctrl-C to exit.'
310
311 log.debug('launch', 'Starting reactor')
312 reactor.run()
313
314 log.debug('launch', 'Reactor stopped')
315
316 if reactor.failure:
317 return 1
318 else:
319 return 0
320