1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23
24 import gobject
25 import gst
26 import gst.interfaces
27 from twisted.internet.threads import deferToThread
28
29 from twisted.internet import defer
30 from flumotion.common import gstreamer, errors, log, messages
31 from flumotion.twisted import defer as fdefer
32
33 from flumotion.worker.checks import check
34
35 from flumotion.common.messages import N_
36 T_ = messages.gettexter('flumotion')
37
50
51 -def do_element_check(pipeline_str, element_name, check_proc, state=None,
52 set_state_deferred=False):
53 """
54 Parse the given pipeline and set it to the given state.
55 When the bin reaches that state, perform the given check function on the
56 element with the given name.
57
58 @param pipeline_str: description of the pipeline used to test
59 @param element_name: name of the element being checked
60 @param check_proc: a function to call with the GstElement as argument.
61 @param state: an unused keyword parameter that will be removed when
62 support for GStreamer 0.8 is dropped.
63 @param set_state_deferred: a flag to say whether the set_state is run in
64 a deferToThread
65 @type set_state_deferred: bool
66 @returns: a deferred that will fire with the result of check_proc, or
67 fail.
68 @rtype: L{twisted.internet.defer.Deferred}
69 """
70 def run_check(pipeline, resolution):
71 element = pipeline.get_by_name(element_name)
72 try:
73 retval = check_proc(element)
74 resolution.callback(retval)
75 except check.CheckProcError, e:
76 log.debug('check', 'CheckProcError when running %r: %r',
77 check_proc, e.data)
78 resolution.errback(errors.RemoteRunError(e.data))
79 except Exception, e:
80 log.debug('check', 'Unhandled exception while running %r: %r',
81 check_proc, e)
82 resolution.errback(errors.RemoteRunError(
83 log.getExceptionMessage(e)))
84
85
86 pipeline.set_state(gst.STATE_NULL)
87
88
89 def message_rcvd(bus, message, pipeline, resolution):
90 t = message.type
91 if t == gst.MESSAGE_STATE_CHANGED:
92 if message.src == pipeline:
93 old, new, pending = message.parse_state_changed()
94 if new == gst.STATE_PLAYING:
95 run_check(pipeline, resolution)
96 elif t == gst.MESSAGE_ERROR:
97 gerror, debug = message.parse_error()
98
99
100 pipeline.set_state(gst.STATE_NULL)
101 resolution.errback(errors.GStreamerGstError(message.src, gerror, debug))
102 elif t == gst.MESSAGE_EOS:
103 resolution.errback(errors.GStreamerError("Unexpected end of stream"))
104 else:
105 log.debug('check', 'message: %s: %s:' % (
106 message.src.get_path_string(),
107 message.type.value_nicks[1]))
108 if message.structure:
109 log.debug('check', 'message: %s' %
110 message.structure.to_string())
111 else:
112 log.debug('check', 'message: (no structure)')
113 return True
114
115 resolution = BusResolution()
116
117 log.debug('check', 'parsing pipeline %s' % pipeline_str)
118 try:
119 pipeline = gst.parse_launch(pipeline_str)
120 log.debug('check', 'parsed pipeline %s' % pipeline_str)
121 except gobject.GError, e:
122 resolution.errback(errors.GStreamerError(e.message))
123 return resolution.d
124
125 bus = pipeline.get_bus()
126 bus.add_signal_watch()
127 watch_id = bus.connect('message', message_rcvd, pipeline, resolution)
128
129 resolution.watch_id = watch_id
130 resolution.pipeline = pipeline
131 log.debug('check', 'setting state to playing')
132 if set_state_deferred:
133 d = deferToThread(pipeline.set_state, gst.STATE_PLAYING)
134 def stateChanged(res):
135 return resolution.d
136 d.addCallback(stateChanged)
137 return d
138 else:
139 pipeline.set_state(gst.STATE_PLAYING)
140 return resolution.d
141
143 """
144 Probe the firewire device.
145
146 Return a deferred firing a result.
147
148 The result is either:
149 - succesful, with a None value: no device found
150 - succesful, with a dictionary of width, height, and par as a num/den pair
151 - failed
152
153 @rtype: L{twisted.internet.defer.Deferred} of
154 L{flumotion.common.messages.Result}
155 """
156 result = messages.Result()
157
158 def do_check(demux):
159 pad = demux.get_pad('video')
160
161 if pad.get_negotiated_caps() == None:
162 raise errors.GStreamerError('Pipeline failed to negotiate?')
163
164 caps = pad.get_negotiated_caps()
165 s = caps.get_structure(0)
166 w = s['width']
167 h = s['height']
168 par = s['pixel-aspect-ratio']
169 result = dict(width=w, height=h, par=(par.num, par.denom))
170 log.debug('check', 'returning dict %r' % result)
171 return result
172
173
174 if not os.path.exists('/dev/raw1394'):
175 m = messages.Error(T_(N_("Device node /dev/raw1394 does not exist.")),
176 id=id)
177 result.add(m)
178 return defer.succeed(result)
179
180 pipeline = 'dv1394src name=source ! dvdemux name=demux ! fakesink'
181 d = do_element_check(pipeline, 'demux', do_check)
182
183 def errbackResult(failure):
184 log.debug('check', 'returning failed Result, %r' % failure)
185 m = None
186 if failure.check(errors.GStreamerGstError):
187 source, gerror, debug = failure.value.args
188 log.debug('check', 'GStreamer GError: %s (debug: %s)' % (
189 gerror.message, debug))
190 if gerror.domain == "gst-resource-error-quark":
191 if gerror.code == int(gst.RESOURCE_ERROR_NOT_FOUND):
192
193
194 version = gstreamer.get_plugin_version('1394')
195 if version >= (0,10,0,0) and version <= (0,10,2,0):
196 m = messages.Error(T_(
197 N_("Could not find or open the Firewire device. "
198 "Check the device node and its permissions.")))
199 else:
200 m = messages.Error(T_(
201 N_("No Firewire device found.")))
202 elif gerror.code == int(gst.RESOURCE_ERROR_OPEN_READ):
203 m = messages.Error(T_(
204 N_("Could not open Firewire device for reading. "
205 "Check permissions on the device.")))
206
207 if not m:
208 m = check.handleGStreamerDeviceError(failure, 'Firewire')
209
210 if not m:
211 m = messages.Error(T_(N_("Could not probe Firewire device.")),
212 debug=check.debugFailure(failure))
213
214 m.id = id
215 result.add(m)
216 return result
217 d.addCallback(check.callbackResult, result)
218 d.addErrback(errbackResult)
219
220 return d
221