1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects for components
24 """
25
26 import os
27 import sys
28 import time
29 import socket
30
31 import gobject
32
33 from twisted.internet import reactor, error, defer
34 from twisted.cred import error as crederror
35 from twisted.spread import pb
36 from twisted.python import reflect
37
38 from flumotion.common import interfaces, errors, log, planet, medium, pygobject
39 from flumotion.common import componentui, common, registry, messages, interfaces
40 from flumotion.common.planet import moods
41 from flumotion.configure import configure
42 from flumotion.twisted import credentials
43 from flumotion.twisted import pb as fpb
44 from flumotion.twisted.compat import implements
45 from flumotion.common.pygobject import gsignal
46
47 from flumotion.common.messages import N_
48 T_ = messages.gettexter('flumotion')
49
51 """
52 I am a client factory for a component logging in to the manager.
53 """
54 logCategory = 'component'
55 perspectiveInterface = interfaces.IComponentMedium
74
75
77 def remoteDisconnected(remoteReference):
78 if reactor.killed:
79 self.log('Connection to manager lost due to shutdown')
80 else:
81 self.warning('Lost connection to manager, '
82 'will attempt to reconnect')
83
84 def loginCallback(reference):
85 self.info("Logged in to manager")
86 self.debug("remote reference %r" % reference)
87 self._previously_connected = True
88
89 self.medium.setRemoteReference(reference)
90 reference.notifyOnDisconnect(remoteDisconnected)
91
92 def accessDeniedErrback(failure):
93 failure.trap(crederror.UnauthorizedLogin)
94 self.warning('Access denied.')
95
96 def connectionRefusedErrback(failure):
97 failure.trap(error.ConnectionRefusedError)
98 self.warning('Connection to manager refused.')
99
100 def alreadyLoggedInErrback(failure):
101 failure.trap(errors.AlreadyConnectedError)
102 self.warning('Component with id %s is already logged in.',
103 self.medium.authenticator.avatarId)
104
105 def loginFailedErrback(failure):
106 self.warning('Login failed, reason: %s' % failure)
107
108 d.addCallback(loginCallback)
109 d.addErrback(accessDeniedErrback)
110 d.addErrback(connectionRefusedErrback)
111 d.addErrback(alreadyLoggedInErrback)
112 d.addErrback(loginFailedErrback)
113
114
118
119
121 """
122 I am a medium interfacing with a manager-side avatar.
123 I implement a Referenceable for the manager's avatar to call on me.
124 I have a remote reference to the manager's avatar to call upon.
125 I am created by the L{ComponentClientFactory}.
126
127 @cvar authenticator: the authenticator used to log in to manager
128 @type authenticator: L{flumotion.twisted.pb.Authenticator}
129 """
130
131 implements(interfaces.IComponentMedium)
132 logCategory = 'basecompmed'
133
135 """
136 @param component: L{flumotion.component.component.BaseComponent}
137 """
138 self.comp = component
139 self.authenticator = None
140
141
142 - def setup(self, config):
144
146 """
147 Return the manager IP as seen by us.
148 """
149 assert self.remote
150 peer = self.remote.broker.transport.getPeer()
151 try:
152 host = peer.host
153 except AttributeError:
154 host = peer[1]
155
156 res = socket.gethostbyname(host)
157 self.debug("getManagerIP(): we think the manager's IP is %r" % res)
158 return res
159
161 """
162 Return the IP of this component based on connection to the manager.
163
164 Note: this is insufficient in general, and should be replaced by
165 network mapping stuff later.
166 """
167 assert self.remote
168 host = self.remote.broker.transport.getHost()
169 self.debug("getIP(): using %r as our IP", host.host)
170 return host.host
171
173 """
174 Set the authenticator the client factory has used to log in to the
175 manager. Can be reused by the component's medium to make
176 feed connections which also get authenticated by the manager's
177 bouncer.
178
179 @type authenticator: L{flumotion.twisted.pb.Authenticator}
180 """
181 self.authenticator = authenticator
182
183
184
186 """
187 Return the state of the component, which will be serialized to a
188 L{flumotion.common.planet.ManagerJobState} object.
189
190 @rtype: L{flumotion.common.planet.WorkerJobState}
191 @returns: state of component
192 """
193
194
195 self.comp.state.set('manager-ip', self.getManagerIP())
196 return self.comp.state
197
199 """
200 Return the configuration of the component.
201
202 @rtype: dict
203 @returns: component's current configuration
204 """
205 try:
206 return self.comp.config
207 except AttributeError:
208 self.debug('getConfig(), but component is not set up yet')
209 return None
210
212 """
213 Set up the component and the component's medium with the given config,
214 in that order.
215 """
216 d = self.comp.setup(config)
217 d.addCallback(lambda r, c: self.setup(c), config)
218 return d
219
221 return self.comp.start(*args, **kwargs)
222
224 self.info('Stopping component')
225 return self.comp.stop()
226
228 """Reload modules in the component."""
229 import sys
230 from twisted.python.rebuild import rebuild
231 from twisted.python.reflect import filenameToModuleName
232 name = filenameToModuleName(__file__)
233
234
235
236
237 rebuild(sys.modules[name])
238
239
240 import flumotion.common.reload
241 rebuild(sys.modules['flumotion.common'])
242 try:
243 flumotion.common.reload.reload()
244 except SyntaxError, msg:
245 raise errors.ReloadSyntaxError(msg)
246 self._reloaded()
247
249 """Get a WorkerComponentUIState containing details needed to
250 present an admin-side UI state
251 """
252 return self.comp.uiState
253
254
257
259 method = getattr(self.comp, 'remote_' + methodName, None)
260 if method:
261 return method(*args, **kwargs)
262 msg = "%r doesn't have method remote_%s" % (self.comp, methodName)
263 self.warning(msg)
264 raise errors.MoMethodError(msg)
265
266 -class BaseComponent(common.InitMixin, log.Loggable, gobject.GObject):
267 """
268 I am the base class for all Flumotion components.
269
270 @ivar name: the name of the component
271 @type name: string
272 @ivar medium: the component's medium
273 @type medium: L{BaseComponentMedium}
274
275 @cvar componentMediumClass: the medium class to use for this component
276 @type componentMediumClass: child class of L{BaseComponentMedium}
277 """
278
279 logCategory = 'basecomp'
280 componentMediumClass = BaseComponentMedium
281
283
284
285 """
286 Subclasses should not override __init__ at all.
287
288 Instead, they should implement init(), which will be called
289 by this implementation automatically.
290
291 See L{flumotion.common.common.InitMixin} for more details.
292 """
293 gobject.GObject.__init__(self)
294
295
296 common.InitMixin.__init__(self)
297
298
300 """
301 A subclass should do as little as possible in its init method.
302 In particular, it should not try to access resources.
303
304 Failures during init are marshalled back to the manager through
305 the worker's remote_create method, since there is no component state
306 proxied to the manager yet at the time of init.
307 """
308 self.state = planet.WorkerJobState()
309
310 self.name = None
311
312
313 self.state.set('pid', os.getpid())
314 self.state.set('mood', moods.waking.value)
315
316 self.medium = None
317
318 self.uiState = componentui.WorkerComponentUIState()
319
320
321
322 self.baseTime = time.time()
323 self.lastTime = time.time()
324 self.lastClock = time.clock()
325
326 self.plugs = {}
327
328
329 self._cpuCallLater = reactor.callLater(5, self._updateCPUUsage)
330
331 self._shutdownHook = None
332
334 """
335 Subclasses can implement me to run any checks before the component
336 performs setup.
337
338 Messages can be added to the component state's 'messages' list key.
339 Any error messages added will trigger the component going to sad
340 an L{flumotion.common.errors.ComponentSetupError} being raised;
341 do_setup() will not be called.
342
343 In the event of a fatal problem that can't be expressed through an
344 error message, this method should set the mood to sad and raise the
345 error on its own.
346
347 self.config will be set before this is called.
348
349 @Returns: L{twisted.internet.defer.Deferred}
350 """
351 return defer.succeed(None)
352
354 """
355 Subclasses can implement me to set up the component before it is
356 started. It should set up the component, possibly opening files
357 and resources.
358 Non-programming errors should not be raised, but returned as a
359 failing deferred.
360
361 self.config will be set before this is called.
362
363 @Returns: L{twisted.internet.defer.Deferred}
364 """
365 return defer.succeed(None)
366
368 """
369 BaseComponent vmethod for starting up. If you override this
370 method, you are responsible for arranging that the component
371 becomes happy.
372
373 @Returns: L{twisted.internet.defer.Deferred}
374 """
375
376 self.setMood(moods.happy)
377 return defer.succeed(None)
378
380 """
381 BaseComponent vmethod for stopping.
382 The component should do any cleanup it needs, but must not set the
383 component's mood to sleeping.
384
385 @Returns: L{twisted.internet.defer.Deferred}
386 """
387 return defer.succeed(None)
388
389
390
391 - def setup(self, config, *args, **kwargs):
392 """
393 Sets up the component with the given config. Called by the manager
394 through the medium.
395
396 @Returns: L{twisted.internet.defer.Deferred}
397 @raise flumotion.common.errors.ComponentSetupError:
398 when an error happened during setup of the component
399 """
400 def setup_plugs():
401
402 reg = registry.getRegistry()
403
404 def load_bundles():
405 modules = {}
406 for plugs in config['plugs'].values():
407 for plug in plugs:
408 modules[plug['type']] = True
409 for plugtype in modules.keys():
410
411 entry = reg.getPlug(plugtype).getEntry()
412 modules[plugtype] = entry.getModuleName()
413 if not modules:
414 return defer.succeed(True)
415 elif not self.medium:
416 self.warning('Not connected to a medium, cannot '
417 'load bundles -- assuming all modules '
418 'are available')
419 return defer.succeed(True)
420 else:
421 loader = self.medium.bundleLoader
422 return loader.getBundles(moduleName=modules.values())
423
424 def make_plugs():
425 for socket, plugs in config['plugs'].items():
426 self.plugs[socket] = []
427 for plug in plugs:
428 entry = reg.getPlug(plug['type']).getEntry()
429 module = reflect.namedAny(entry.getModuleName())
430 proc = getattr(module, entry.getFunction())
431 instance = proc(plug)
432 self.plugs[socket].append(instance)
433
434 try:
435 d = load_bundles()
436 d.addCallback(lambda x: make_plugs())
437 return d
438 except Exception, e:
439 self.debug("Exception while loading bundles: %s" %
440 log.getExceptionMessage(e))
441 return defer.fail(e)
442
443 def checkErrorCallback(result):
444
445
446
447
448 current = self.state.get('mood')
449 if current == moods.sad.value:
450 self.warning('Running checks made the component sad.')
451 raise errors.ComponentSetupHandledError()
452
453 self.debug("setup() called with config %r", config)
454 self.setMood(moods.waking)
455 self._setConfig(config)
456
457 if self.medium:
458 self.medium.logName = self.getName()
459 d = setup_plugs()
460 d.addCallback(lambda r: self.do_check())
461 d.addCallback(checkErrorCallback)
462 d.addCallback(lambda r: self.do_setup())
463 def setupErrback(failure):
464
465 if failure.check(errors.ComponentSetupHandledError):
466 return failure
467
468 self.warning('Could not set up component: %s',
469 log.getFailureMessage(failure))
470 m = messages.Error(T_(N_("Could not setup component.")),
471 debug=log.getFailureMessage(failure),
472 id="component-setup-%s" % self.name)
473 self.state.append('messages', m)
474 self.setMood(moods.sad)
475 raise errors.ComponentSetupHandledError(
476 'Could not set up component')
477
478 d.addErrback(setupErrback)
479 return d
480
481 - def start(self, *args, **kwargs):
482 """
483 Tell the component to start. This is called when all its dependencies
484 are already started.
485
486 To hook onto this method, implement your own do_start method.
487 See BaseComponent.do_start() for what your do_start method is
488 responsible for doing.
489
490 Again, don't override this method. Thanks.
491 """
492 self.debug('BaseComponent.start')
493
494 def start_plugs():
495 for socket, plugs in self.plugs.items():
496 for plug in plugs:
497 self.debug('Starting plug %r on socket %s', plug, socket)
498 plug.start(self)
499
500 try:
501 start_plugs()
502 ret = self.do_start(*args, **kwargs)
503 assert isinstance(ret, defer.Deferred), \
504 "do_start %r must return a deferred" % self.do_start
505 self.debug('start: returning value %s' % ret)
506 return ret
507 except Exception, e:
508 self.debug("Exception during component do_start: %s" %
509 log.getExceptionMessage(e))
510 return defer.fail(e)
511
513 """
514 Set the shutdown hook for this component (replacing any previous hook).
515 When a component is stopped, then this hook will be fired.
516 """
517 self._shutdownHook = shutdownHook
518
520 """
521 Tell the component to stop.
522 The connection to the manager will be closed.
523 The job process will also finish.
524 """
525 self.debug('BaseComponent.stop')
526
527 def stop_plugs(ret):
528 for socket, plugs in self.plugs.items():
529 for plug in plugs:
530 self.debug('Stopping plug %r on socket %s', plug, socket)
531 plug.stop(self)
532 return ret
533
534 def fireShutdownHook(ret):
535 if self._shutdownHook:
536 self.debug('_stoppedCallback: firing shutdown hook')
537 self._shutdownHook()
538
539 self.setMood(moods.waking)
540 for message in self.state.get('messages'):
541 self.state.remove('messages', message)
542
543 if self._cpuCallLater:
544 self._cpuCallLater.cancel()
545 self._cpuCallLater = None
546
547 d = self.do_stop()
548 d.addCallback(stop_plugs)
549 d.addBoth(fireShutdownHook)
550 return d
551
552
553 - def emit(self, name, *args):
554 if 'uninitialized' in str(self):
555 self.warning('Uninitialized object!')
556
557 else:
558 gobject.GObject.emit(self, name, *args)
559
560
563
565 self.state.set('workerName', workerName)
566
569
573
575 """
576 Set the given mood on the component if it's different from the current
577 one.
578 """
579 current = self.state.get('mood')
580
581 if current == mood.value:
582 self.log('already in mood %r' % mood)
583 return
584 elif current == moods.sad.value:
585 self.info('tried to set mood to %r, but already sad :-(' % mood)
586 return
587
588 self.debug('MOOD changed to %r' % mood)
589 self.state.set('mood', mood.value)
590
592 """
593 Gets the mood on the component.
594
595 @rtype: int
596 """
597 return self.state.get('mood')
598
599
601 """
602 Add a message to the component.
603 If any of the messages is an error, the component will turn sad.
604
605 @type message: L{flumotion.common.messages.Message}
606 """
607 self.state.append('messages', message)
608 if message.level == messages.ERROR:
609 self.debug('error message, turning sad')
610 self.setMood(moods.sad)
611
613 """
614 Fix properties that have been renamed from a previous version,
615 and add a warning for them.
616
617 @param properties: properties; will be modified as a result.
618 @type properties: dict
619 @param list: list of (old, new) tuples of property names.
620 @type list: list of tuple of (str, str)
621 """
622 found = []
623 for old, new in list:
624 if properties.has_key(old):
625 found.append((old, new))
626
627 if found:
628 m = messages.Warning(T_(N_(
629 "Your configuration uses deprecated properties. "
630 "Please update your configuration and correct them.\n")),
631 id = "deprecated")
632 for old, new in found:
633 m.add(T_(N_(
634 "Please rename '%s' to '%s'.\n"),
635 old, new))
636 self.debug("Setting new property '%s' to %r", new,
637 properties[old])
638 properties[new] = properties[old]
639 del properties[old]
640 self.addMessage(m)
641
643 """
644 Call a remote method on all admin client views on this component.
645
646 This gets serialized through the manager and multiplexed to all
647 admin clients, and from there on to all views connected to each
648 admin client model.
649
650 Because there can be any number of admin clients that this call
651 will go out do, it does not make sense to have one return value.
652 This function will return None always.
653 """
654 if self.medium:
655 self.medium.callRemote("adminCallRemote", methodName,
656 *args, **kwargs)
657 else:
658 self.debug('asked to adminCallRemote(%s, *%r, **%r), but '
659 'no manager.'
660 % (methodName, args, kwargs))
661
662
669
671
672 nowTime = time.time()
673 nowClock = time.clock()
674 deltaTime = nowTime - self.lastTime
675 deltaClock = nowClock - self.lastClock
676 CPU = deltaClock/deltaTime
677 self.log('latest CPU use: %r' % CPU)
678 self.state.set('cpu', CPU)
679 deltaTime = nowTime - self.baseTime
680 deltaClock = nowClock
681 CPU = deltaClock/deltaTime
682 self.lastTime = nowTime
683 self.lastClock = nowClock
684
685 self._cpuCallLater = reactor.callLater(5, self._updateCPUUsage)
686
687 pygobject.type_register(BaseComponent)
688