1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 from flumotion.common import dag, log, registry, errors, common
23 from flumotion.common.planet import moods
24
26 """
27 I am an object representing a feeder in the DepGraph
28 """
29 - def __init__(self, feederName, component):
33
35 """
36 I am an object representing an eater in the DepGraph
37 """
38 - def __init__(self, eaterName, component):
44
46 """
47 I am a dependency graph for components. I also maintain boolean state
48 for each of the nodes.
49
50 I contain a DAG to help with resolving dependencies.
51 """
52 logCategory = "depgraph"
53
54 typeNames = ("WORKER", "JOB", "COMPONENTSETUP", "CLOCKMASTER",
55 "COMPONENTSTART")
56
58 self._dag = dag.DAG()
59 self._state = {}
60
66
70
71 - def _addEdge(self, parent, child, parentType, childType):
72 self.debug("Adding edge %r of type %s to %r of type %s" % (
73 parent, parentType, child, childType))
74 self._dag.addEdge(parent, child, parentType, childType)
75
76 - def _removeEdge(self, parent, child, parentType, childType):
77 self.debug("Removing edge %r of type %s to %r of type %s" % (
78 parent, parentType, child, childType))
79 self._dag.removeEdge(parent, child, parentType, childType)
80
82 """
83 I set a component to be the clock master in the dependency
84 graph. This component must have already been added to the
85 dependency graph.
86
87 @param component: the component to set as the clock master
88 @type component: L{flumotion.manager.component.ComponentAvatar}
89 """
90 if self._dag.hasNode(component, "JOB"):
91 self._addNode(component, "CLOCKMASTER")
92 self._addEdge(component, component, "COMPONENTSETUP",
93 "CLOCKMASTER")
94
95
96
97 startnodes = self._dag.getAllNodesByType("COMPONENTSTART")
98 for start in startnodes:
99
100 if start.get('parent') == component.get('parent'):
101 self._addEdge(component, start, "CLOCKMASTER",
102 "COMPONENTSTART")
103 else:
104 raise KeyError("Component %r has not been added" % component)
105
107 """
108 I add a component to the dependency graph.
109 This includes adding the worker (if not already added), the job,
110 the feeders and the eaters.
111
112 Requirement: worker must already be assigned to component
113
114 @param component: component object to add
115 @type component: L{flumotion.common.planet.ManagerComponentState}
116 """
117 if self._dag.hasNode(component, "JOB"):
118 self.debug('component %r already in depgraph, ignoring',
119 component)
120 return
121
122 self.debug('adding component %r to depgraph' % component)
123 self._addNode(component, "JOB")
124 self._addNode(component, "COMPONENTSTART")
125 self._addNode(component, "COMPONENTSETUP")
126 self._addEdge(component, component, "JOB", "COMPONENTSETUP")
127 workername = component.get('workerRequested')
128 if workername:
129 self.addWorker(workername)
130 self.setComponentWorker(component, workername)
131 self._addEdge(component, component, "COMPONENTSETUP",
132 "COMPONENTSTART")
133
135 """
136 I add a worker to the dependency graph.
137
138 @param worker: the worker to add
139 @type worker: str
140 """
141 self.debug('adding worker %s' % worker)
142 if not self._dag.hasNode(worker, "WORKER"):
143 self._addNode(worker, "WORKER")
144
146 """
147 I remove a component in the dependency graph, this includes removing
148 the JOB, COMPONENTSETUP, COMPONENTSTART, CLOCKMASTER.
149
150 @param component: the component to remove
151 @type component: L{flumotion.manager.component.ComponentAvatar}
152 """
153 self.debug('removing component %r from depgraph' % component)
154 for type in self.typeNames:
155 if self._dag.hasNode(component, type):
156 self._removeNode(component, type)
157 del self._state[(component, type)]
158
160 """
161 I remove a worker from the dependency graph.
162
163 @param worker: the worker to remove
164 @type worker: str
165 """
166 self.debug('removing worker %s' % worker)
167 if self._dag.hasNode(worker, "WORKER"):
168 self._dag.removeNode(worker, "WORKER")
169 del self._state[(worker, "WORKER")]
170
172 """
173 I assign a component to a specific worker.
174
175 @param component: the component
176 @type component: L{flumotion.common.planet.ManagerComponentState}
177 @param worker: the worker to set it to
178 @type worker: str
179 """
180 if self._dag.hasNode(worker, "WORKER") and (
181 self._dag.hasNode(component, "JOB")):
182 self._addEdge(worker, component, "WORKER", "JOB")
183 else:
184 raise KeyError("Worker %s or Component %r not in dependency graph" %
185 (worker, component))
186
188 """
189 I am called once a piece of configuration has been added,
190 so I can add edges to the DAG for each feed from the
191 feeding component to the eating component.
192
193 @raise errors.ComponentConfigError: if a component is
194 misconfigured and eats from
195 a non-existant component
196 """
197 toSetup = self._dag.getAllNodesByType("COMPONENTSETUP")
198
199 for eatingComponent in toSetup:
200
201 config = eatingComponent.get('config')
202
203 if not config.has_key('source'):
204
205 self.debug("Component %r has no eaters" % eatingComponent)
206 else:
207
208
209
210
211 list = config['source']
212
213
214
215
216 if isinstance(list, str):
217 list = [list, ]
218
219 for source in list:
220 feederFound = False
221 feederComponentName = source.split(':')[0]
222
223 for feedingComponent in toSetup:
224 if feedingComponent.get("name") == feederComponentName:
225 feederFound = True
226 try:
227 self._addEdge(feedingComponent, eatingComponent,
228 "COMPONENTSETUP", "COMPONENTSETUP")
229 except KeyError:
230
231
232
233 pass
234 try:
235 self._addEdge(feedingComponent, eatingComponent,
236 "COMPONENTSTART", "COMPONENTSTART")
237 except KeyError:
238 pass
239
240 if not feederFound:
241 raise errors.ComponentConfigError(eatingComponent,
242 "No feeder exists for eater %s" % source)
243
245 """
246 I return a list of things that can and should be started now.
247
248 @return: a list of nodes that should be started, in order
249 @rtype: list of (object, str)
250 """
251
252
253
254
255
256
257
258 toBeStarted = self._dag.sort()
259
260 for obj in toBeStarted[:]:
261 if obj in toBeStarted:
262 self.log("toBeStarted: checking if (%r, %r) needs starting",
263 obj[0], obj[1])
264 if self._state[obj]:
265 toBeStarted.remove(obj)
266 elif obj[1] == "WORKER":
267
268
269 worker_offspring = self._dag.getOffspringTyped(
270 obj[0], obj[1])
271 for offspring in worker_offspring:
272 if offspring in toBeStarted:
273 toBeStarted.remove(offspring)
274 toBeStarted.remove(obj)
275 elif obj[1] == "JOB":
276 job_offspring = self._dag.getOffspringTyped(obj[0], obj[1])
277 for offspring in job_offspring:
278 if offspring in toBeStarted:
279 toBeStarted.remove(offspring)
280 toBeStarted.remove(obj)
281 else:
282 offspring = self._dag.getOffspringTyped(obj[0], obj[1])
283 for child in offspring:
284 if child in toBeStarted:
285 toBeStarted.remove(child)
286
287 return toBeStarted
288
290 self.doLog(log.DEBUG, -2, "Setting state of (%r, %s) to %d" % (
291 object, type, value))
292 self._state[(object,type)] = value
293
294
295 if not value:
296 self.debug("Setting state of all (%r, %s)'s offspring to %d" %
297 (object, type, value))
298 offspring = self._dag.getOffspringTyped(object, type)
299 for kid in offspring:
300 self.debug("Setting state of offspring (%r) to %d", kid, value)
301 if kid[0] == object:
302 self._state[kid] = False
303
305 """
306 Set a COMPONENTSTART node to have state of True
307
308 @param component: the component to set COMPONENTSTART to True for
309 @type component: L{flumotion.common.planet.ManagerComponentState}
310 """
311 self._setState(component, "COMPONENTSTART", True)
312
314 """
315 Set a COMPONENTSTART node to have state of False
316
317 @param component: the component to set COMPONENTSTART to False for
318 @type component: L{flumotion.common.planet.ManagerComponentState}
319 """
320
321 self._setState(component, "COMPONENTSTART", False)
322
324 """
325 Set a COMPONENTSETUP node to have state of True
326
327 @param component: the component to set COMPONENTSETUP to True for
328 @type component: L{flumotion.common.planet.ManagerComponentState}
329 """
330
331 self._setState(component, "COMPONENTSETUP", True)
332
334 """
335 Set a COMPONENTSETUP node to have state of False
336
337 @param component: the component to set COMPONENTSETUP to True for
338 @type component: L{flumotion.common.planet.ManagerComponentState}
339 """
340
341 self._setState(component, "COMPONENTSETUP", False)
342
343
345 """
346 Set a JOB node to have state of True
347
348 @param component: the component to set JOB to True for
349 @type component: L{flumotion.common.planet.ManagerComponentState}
350 """
351 self._setState(component, "JOB", True)
352
354 """
355 Set a JOB node to have state of False
356
357 @param component: the component to set JOB to False for
358 @type component: L{flumotion.common.planet.ManagerComponentState}
359 """
360 self.doLog(log.DEBUG, -2, "Setting component's job %r to FALSE" %
361 component)
362 self._setState(component, "JOB", False)
363
365 """
366 Set a WORKER node to have state of True
367
368 @param worker: the component to set WORKER to True for
369 @type worker: str
370 """
371 self._setState(worker, "WORKER", True)
372
374 """
375 Set a WORKER node to have state of False
376
377 @param worker: the component to set WORKER to False for
378 @type worker: str
379 """
380 self._setState(worker, "WORKER", False)
381
383 """
384 Set a CLOCKMASTER node to have state of True
385
386 @param component: the component to set CLOCKMASTER to True for
387 @type component: {flumotion.common.planet.ManagerComponentState}
388 """
389 self._setState(component, "CLOCKMASTER", True)
390
392 """
393 Set a CLOCKMASTER node to have state of False
394
395 @param component: the component to set CLOCKMASTER to True for
396 @type component: {flumotion.common.planet.ManagerComponentState}
397 """
398 self._setState(component, "CLOCKMASTER", False)
399
401 """
402 Checks if component has a CLOCKMASTER node
403
404 @param component: the component to check if CLOCKMASTER node exists
405 @type component: {flumotion.common.planet.ManagerComponentState}
406 """
407 return self._dag.hasNode(component, "CLOCKMASTER")
408