1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager-side objects for components
24
25 API Stability: semi-stable
26 """
27
28 import time
29
30 from twisted.spread import pb
31 from twisted.internet import reactor, defer, error
32 from twisted.python.failure import Failure
33
34 from flumotion.configure import configure
35
36 from flumotion.manager import base
37 from flumotion.common import errors, interfaces, keycards, log, config, planet
38 from flumotion.common import messages, common
39 from flumotion.twisted import flavors
40 from flumotion.twisted.defer import defer_generator_method
41 from flumotion.twisted.compat import implements
42 from flumotion.common.planet import moods
43
44 from flumotion.common.messages import N_
45 T_ = messages.gettexter('flumotion')
46
48 """
49 I am a Manager-side avatar for a component.
50 I live in the L{ComponentHeaven}.
51
52 Each component that logs in to the manager gets an avatar created for it
53 in the manager.
54
55 @cvar avatarId: the L{componentId<common.componentId>}
56 @type avatarId: str
57 @cvar jobState: job state of this avatar's component
58 @type jobState: L{flumotion.common.planet.ManagerJobState}
59 @cvar componentState: component state of this avatar's component
60 @type componentState: L{flumotion.common.planet.ManagerComponentState}
61 """
62
63 logCategory = 'comp-avatar'
64
66
67 base.ManagerAvatar.__init__(self, *args, **kwargs)
68
69 self.componentState = None
70 self.jobState = None
71
72
73
74 self._starting = False
75 self._beingSetup = False
76 self._providingClock = False
77
78 self._ports = {}
79
80 self._shutdown_requested = False
81
82 self._happydefers = []
83 self.feeder_names = []
84 self.eater_names = []
85
86
88 mood = '(unknown)'
89 if self.componentState:
90 moodValue = self.componentState.get('mood')
91 if moodValue is not None:
92 mood = moods.get(moodValue).name
93 return '<%s %s (mood %s)>' % (self.__class__.__name__,
94 self.avatarId, mood)
95
96
98 """
99 Clean up when detaching.
100 """
101 if self._ports:
102 self.vishnu.releasePortsOnWorker(self.getWorkerName(),
103 self._ports.values())
104
105 self._ports = {}
106
107 self.jobState = None
108
110 if not self.componentState:
111 return
112
113 if not self.componentState.get('mood') == mood.value:
114 self.debug('Setting mood to %r' % mood)
115 self.componentState.setMood(mood.value)
116
118 mood = moods.get(moodValue)
119 self._setMood(mood)
120
122 if not self.componentState:
123 return
124 return self.componentState.get('mood')
125
127 if not self.componentState:
128 return
129
130 self.componentState.append('messages', message)
131
132
133
134
136 if ignores:
137 if failure.check(*ignores):
138 return failure
139 self.warning("Unhandled remote call error: %s" %
140 failure.getErrorMessage())
141 self.warning("raising '%s'" % str(failure.type))
142 return failure
143
144
145
146
147
149 failure.trap(errors.PropertyError)
150 print "Ignore the following Traceback line, issue in Twisted"
151 return failure
152
167
168 d.addCallback(checkInitialMood)
169
170 d.addCallback(lambda _: self.jobState.addListener(self,
171 set=self.stateSet))
172
173 d.addCallback(lambda _: self.heaven.registerComponent(self))
174 d.addCallback(lambda _: self.vishnu.registerComponent(self))
175 return d
176
204
205
207 self.log("state set on %r: %s now %r" % (state, key, value))
208 if key == 'mood':
209 self.info('Mood changed to %s' % moods.get(value).name)
210
211 if value == moods.happy.value:
212 self.vishnu._depgraph.setComponentStarted(self.componentState)
213
214 self._starting = False
215
216 for d in self._happydefers:
217 d.callback(True)
218 self._happydefers = []
219
220
222
223
224
225 eater_names = []
226 for block in eater_config:
227 eater_name = block
228 if block.find(':') == -1:
229 eater_name = block + ':default'
230 eater_names.append(eater_name)
231 self.debug('parsed eater config, eaters %r' % eater_names)
232 self.eater_names = eater_names
233
235
236
237
238
239
240 feed_names = feeder_config
241
242 name = self.componentState.get('name')
243
244 self.feeder_names = map(lambda n: name + ':' + n, feed_names)
245 self.debug('parsed feeder config, feeders %r' % self.feeder_names)
246
247
249 """
250 Get a list of L{feedId<flumotion.common.common.feedId>}s
251 for feeds this component wants to eat from.
252
253 @return: a list of feedId's, or the empty list
254 @rtype: list of str
255 """
256 if not self.eater_names:
257 return []
258
259
260 return self.eater_names
261
263 """
264 Get a list of L{feedId<flumotion.common.common.feedId>}s that this
265 component has feeders for.
266
267 Obviously, the componentName part will be the same for all of them,
268 since it's the name of this component, but we return the feedId to be
269 similar to getEaters.
270
271 @return: a list of feedId's, or the empty list
272 @rtype: list of str
273 """
274
275
276 if not self.feeder_names:
277 self.warning('no feederNames key, so no feeders')
278 return []
279
280 return self.feeder_names
281
283 """
284 Returns the port on which a feed server for this component is
285 listening on.
286
287 @rtype: int
288 """
289 return self.vishnu.getWorkerFeedServerPort(self.getWorkerName())
290
292 """
293 Get the IP address of the manager as seen by the component.
294
295 @rtype: str
296 """
297 return self.jobState.get('manager-ip')
298
300 """
301 Return the name of the worker.
302
303 @rtype: str
304 """
305 return self.jobState.get('workerName')
306
308 """
309 Return the PID of the component.
310
311 @rtype: int
312 """
313 return self.jobState.get('pid')
314
316 """
317 Get the name of the component.
318
319 @rtype: str
320 """
321 return self.componentState.get('name')
322
324 """
325 Get the name of the component's parent.
326
327 @rtype: str
328 """
329 return self.componentState.get('parent').get('name')
330
332 """
333 Get the component type name of the component.
334
335 @rtype: str
336 """
337 return self.componentState.get('type')
338
340 """
341 Tell the avatar to stop the component.
342 """
343 d = self.mindCallRemote('stop')
344
345 d.addErrback(lambda x: None)
346 return d
347
349 """
350 Set up the component with the given config.
351 Proxies to
352 L{flumotion.component.component.BaseComponentMedium.remote_setup}
353
354 @type conf: dict
355 """
356 def _setupErrback(failure):
357 self._setMood(moods.sad)
358 return failure
359
360 d = self.mindCallRemote('setup', conf)
361 d.addErrback(_setupErrback)
362 return d
363
364
365
366
367
369 """
370 Tell the component to start, possibly linking to other components.
371 """
372 self.debug('start')
373 conf = self.componentState.get('config')
374 master = conf['clock-master']
375 clocking = None
376 if master != self.avatarId and master != None:
377 self.debug('Need to synchronize with clock master %r' % master)
378 d = self.heaven.getMasterClockInfo(master, self.avatarId)
379 yield d
380 try:
381 clocking = d.value()
382 self.debug('Got master clock info %r' % (clocking, ))
383 host, port, base_time = clocking
384
385
386
387
388
389 if (not self.heaven._componentIsLocal(self)
390 and host == '127.0.0.1'):
391 host = self.getRemoteManagerIP()
392 self.debug('Overriding clock master host to %s' % host)
393 clocking = (host, port, base_time)
394
395 if master == self.avatarId:
396 self.debug('we are the master, so reset to None')
397
398
399 clocking = None
400 except Exception, e:
401 self.error("Could not make component start, reason %s"
402 % log.getExceptionMessage(e))
403
404 self.debug('calling remote_start on component %r' % self)
405 d = self.mindCallRemote('start', clocking)
406 yield d
407 try:
408 d.value()
409 except errors.ComponentStartHandledError, e:
410 self.debug('already handled error while starting: %s' %
411 log.getExceptionMessage(e))
412 except Exception, e:
413 m = messages.Error(T_(N_("Could not start component.")),
414 debug = log.getExceptionMessage(e),
415 id="component-start")
416 self._addMessage(m)
417 self.warning("Could not make component start, reason %s"
418 % log.getExceptionMessage(e))
419 self._setMood(moods.sad)
420 raise
421 start = defer_generator_method(start)
422
423 - def eatFrom(self, fullFeedId, host, port):
426
427 - def feedTo(self, componentId, feedId, host, port):
430
432 """
433 Set a property on an element.
434
435 @param element: the element to set the property on
436 @type element: str
437 @param property: the property to set
438 @type property: str
439 @param value: the value to set the property to
440 @type value: mixed
441 """
442 if not element:
443 msg = "%s: no element specified" % self.avatarId
444 self.warning(msg)
445 raise errors.PropertyError(msg)
446 if not element in self.jobState.get('elements'):
447 msg = "%s: element '%s' does not exist" % (self.avatarId, element)
448 self.warning(msg)
449 raise errors.PropertyError(msg)
450 if not property:
451 msg = "%s: no property specified" % self.avatarId
452 self.warning(msg)
453 raise errors.PropertyError(msg)
454 self.debug("setting property '%s' on element '%s'" % (property, element))
455
456 d = self.mindCallRemote('setElementProperty', element, property, value)
457 d.addErrback(self._mindPropertyErrback)
458 d.addErrback(self._mindErrback, errors.PropertyError)
459 return d
460
462 """
463 Get a property of an element.
464
465 @param element: the element to get the property of
466 @type element: str
467 @param property: the property to get
468 @type property: str
469 """
470 if not element:
471 msg = "%s: no element specified" % self.avatarId
472 self.warning(msg)
473 raise errors.PropertyError(msg)
474
475
476
477
478 if not element in self.jobState.get('elements'):
479 msg = "%s: element '%s' does not exist" % (self.avatarId, element)
480 self.warning(msg)
481 raise errors.PropertyError(msg)
482 if not property:
483 msg = "%s: no property specified" % self.avatarId
484 self.warning(msg)
485 raise errors.PropertyError(msg)
486 self.debug("getting property %s on element %s" % (element, property))
487 d = self.mindCallRemote('getElementProperty', element, property)
488 d.addErrback(self._mindPropertyErrback)
489 d.addErrback(self._mindErrback, errors.PropertyError)
490 return d
491
493 """
494 Tell the component to reload itself.
495
496 @rtype: L{twisted.internet.defer.Deferred}
497 """
498 def _reloadComponentErrback(failure, self):
499 failure.trap(errors.ReloadSyntaxError)
500 self.warning(failure.getErrorMessage())
501 print "Ignore the following Traceback line, issue in Twisted"
502 return failure
503
504 d = self.mindCallRemote('reloadComponent')
505 d.addErrback(_reloadComponentErrback, self)
506 d.addErrback(self._mindErrback, errors.ReloadSyntaxError)
507 return d
508
509
511 """
512 Authenticate the given keycard.
513 Gets proxied to L{flumotion.component.bouncers.bouncer.""" \
514 """BouncerMedium.remote_authenticate}
515 The component should be a subclass of
516 L{flumotion.component.bouncers.bouncer.Bouncer}
517
518 @type keycard: L{flumotion.common.keycards.Keycard}
519 """
520 d = self.mindCallRemote('authenticate', keycard)
521 d.addErrback(self._mindErrback)
522 return d
523
525 """
526 Remove a keycard managed by this bouncer because the requester
527 has gone.
528
529 @type keycardId: str
530 """
531 self.debug('remotecalling removeKeycardId with id %s' % keycardId)
532 d = self.mindCallRemote('removeKeycardId', keycardId)
533 d.addErrback(self._mindErrback)
534 return d
535
537 """
538 Expire a keycard issued to this component because the bouncer decided
539 to.
540
541 @type keycardId: str
542 """
543 self.debug('remotecalling expireKeycard with id %s' % keycardId)
544 d = self.mindCallRemote('expireKeycard', keycardId)
545 d.addErrback(self._mindErrback)
546 return d
547
548
550 """
551 Called by the component to tell the manager that a given feed is
552 ready or not. Will notify other components depending on this
553 feeder, starting them if all of their dependencies are ready.
554
555 @param feedName: name of the feeder, e.g. "default".
556 @type feedName: str
557 @param isReady: True if the feed is now ready, False otherwise.
558 @type isReady: bool
559 @deprecated Don't call this!
560 """
561 assert isinstance(feedName, str)
562
564 """
565 Called by a component to tell the manager that it's shutting down
566 cleanly (and thus should go to sleeping, rather than lost or sad)
567 """
568 self.debug("shutdown is clean, shouldn't go to lost")
569 self._shutdown_requested = True
570
574
576 """
577 Remove a keycard on the given bouncer on behalf of a component's medium.
578
579 This is requested by a component that created the keycard.
580
581 @type bouncerName: str
582 @param keycardId: id of keycard to remove
583 @type keycardId: str
584 """
585 self.debug('asked to remove keycard %s on bouncer %s' % (
586 keycardId, bouncerName))
587 avatarId = '/atmosphere/%s' % bouncerName
588 if not self.heaven.hasAvatar(avatarId):
589 self.warning('No bouncer with id %s registered' % avatarId)
590 raise errors.UnknownComponentError(avatarId)
591
592 bouncerAvatar = self.heaven.getAvatar(avatarId)
593 return bouncerAvatar.removeKeycardId(keycardId)
594
596 """
597 Expire a keycard (and thus the requester's connection)
598 issued to the given requester.
599
600 This is called by the bouncer component that authenticated the keycard.
601
602
603 @param requesterId: name (avatarId) of the component that originally
604 requested authentication for the given keycardId
605 @type requesterId: str
606 @param keycardId: id of keycard to expire
607 @type keycardId: str
608 """
609
610 if not self.heaven.hasAvatar(requesterId):
611 self.warning('asked to expire keycard %s for requester %s, ' % (
612 keycardId, requesterId) +
613 'but no such component registered')
614 raise errors.UnknownComponentError(requesterId)
615
616 componentAvatar = self.heaven.getAvatar(requesterId)
617 return componentAvatar.expireKeycard(keycardId)
618
620 """
621 Request reservation a number of ports on a particular worker.
622 This can be called from a job if it needs some ports itself.
623
624 @param workerName: name of the worker to reserve ports on
625 @type workerName: str
626 @param numberOfPorts: the number of ports to reserve
627 @type numberOfPorts: int
628 """
629 ports = self.heaven.vishnu.reservePortsOnWorker(workerName,
630 numberOfPorts)
631 return ports
632
634 """
635 I handle all registered components and provide L{ComponentAvatar}s
636 for them.
637 """
638
639 implements(interfaces.IHeaven)
640 avatarClass = ComponentAvatar
641
642 logCategory = 'comp-heaven'
643
645
646 base.ManagerHeaven.__init__(self, vishnu)
647
648
649
650 self._clockMasterWaiters = {}
651 self._masterClockInfo = {}
652
653
655
656 host = componentAvatar.getClientAddress()
657
658 if host == '127.0.0.1':
659 return True
660 else:
661 return False
662
664 """
665 Remove a component avatar from the heaven.
666
667 @param componentAvatar: the component to remove
668 @type componentAvatar: L{flumotion.manager.component.ComponentAvatar}
669 """
670 self.removeAvatar(componentAvatar.avatarId)
671
673 """
674 Retrieve the information about the feeders this component's eaters
675 are eating from.
676
677 @param componentAvatar: the component
678 @type componentAvatar: L{flumotion.manager.component.ComponentAvatar}
679
680 @returns: list of fullFeedIds
681 """
682 componentId = componentAvatar.avatarId
683 eaterFeedIds = componentAvatar.getEaters()
684 self.debug('feeds we eat: %r' % eaterFeedIds)
685
686 retval = []
687 for feedId in eaterFeedIds:
688 (componentName, feedName) = common.parseFeedId(feedId)
689 flowName = common.parseComponentId(componentId)[0]
690 fullFeedId = common.fullFeedId(flowName, componentName, feedName)
691
692 retval.append(fullFeedId)
693
694 return retval
695
697 """
698 Retrieves the data of feeders (feed producer elements) for a component.
699
700 @param component: the component
701 @type component: L{flumotion.manager.component.ComponentAvatar}
702
703 @returns: tuple of (feedId, host, port) for each feeder
704 @rtype: tuple of (str, str, int) tuple
705 """
706
707
708
709
710 host = component.getClientAddress()
711 port = component.getFeedServerPort()
712 feedIds = component.getFeeders()
713 self.debug('returning data for feeders: %r', feedIds)
714 return map(lambda f: (f, host, port), feedIds)
715
717 state = componentAvatar.componentState
718 conf = state.get('config')
719
720
721 eatersData = self._getComponentEatersData(componentAvatar)
722 for fullFeedId in eatersData:
723 self.debug('connecting eater of feed %s' % fullFeedId)
724
725
726
727 connection = "upstream"
728
729 if connection == "upstream":
730 self.debug('connecting from eater to feeder')
731
732 (flowName, componentName, feedName) = common.parseFullFeedId(
733 fullFeedId)
734 avatarId = common.componentId(flowName, componentName)
735 feederAvatar = self.getAvatar(avatarId)
736 if not feederAvatar:
737 m = messages.Error(T_(
738 N_("Configuration problem.")),
739 debug="No component '%s'." % avatarId,
740 id="component-start-%s" % fullFeedId)
741
742 componentAvatar._addMessage(m)
743 componentAvatar._setMood(moods.sad)
744
745
746 host = feederAvatar.getClientAddress()
747 port = feederAvatar.getFeedServerPort()
748
749
750
751
752
753
754
755 eaterHost = componentAvatar.mind.broker.transport.getPeer().host
756 if eaterHost == host:
757 host = '127.0.0.1'
758
759 d = componentAvatar.eatFrom(fullFeedId, host, port)
760 yield d
761 try:
762 d.value()
763 except (error.ConnectError, error.ConnectionRefusedError), e:
764 m = messages.Error(T_(
765 N_("Could not connect component to %s:%d for feed %s."),
766 host, port, fullFeedId),
767 debug=log.getExceptionMessage(e, filename='component'),
768 id="component-start-%s" % fullFeedId)
769
770 componentAvatar._addMessage(m)
771 componentAvatar._setMood(moods.sad)
772 raise errors.ComponentStartHandledError(e)
773 elif connection == "downstream":
774 self.debug('connecting from feeder to eater')
775
776 (flowName, componentName, feedName) = common.parseFullFeedId(
777 fullFeedId)
778 feederAvatarId = common.componentId(flowName, componentName)
779 feederAvatar = self.getAvatar(feederAvatarId)
780
781 host = componentAvatar.getClientAddress()
782 port = componentAvatar.getFeedServerPort()
783 d = feederAvatar.feedTo(componentAvatar.avatarId,
784 common.feedId(componentName, feedName), host, port)
785 yield d
786 try:
787 d.value()
788 except error.ConnectionRefusedError, e:
789 m = messages.Error(T_(
790 N_("Could not connect to %s:%d for feed %s."),
791 host, port, fullFeedId),
792 debug=log.getExceptionMessage(e),
793 id="component-start-%s" % fullFeedId)
794 self._addMessage(m)
795 self._setMood(moods.sad)
796 raise errors.ComponentStartHandledError
797
798 componentAvatar.debug('starting component')
799 try:
800 componentAvatar.start()
801 except errors.ComponentStartHandledError, e:
802 pass
803 _startComponent = defer_generator_method(_startComponent)
804
806 """
807 I try to start nodes in the depgraph if they should be started. I am
808 a recursive method, because the depgraph's list of what should be
809 started may change when nodes start/stop.
810
811 @param result: only needed because this method is added as a callback
812 """
813
814
815
816 def handleFailure(failure, avatar, message, id_template):
817 log.warningFailure(failure, swallow=False)
818 if failure.check(errors.HandledException):
819 self.debug('failure %r already handled' % failure)
820 return
821 self.debug('showing error message for failure %r' % failure)
822 m = messages.Error(message,
823 id=id_template % componentAvatar.avatarId,
824 debug=log.getFailureMessage(failure))
825 avatar._addMessage(m)
826 avatar._setMood(moods.sad)
827
828 self.debug("tryWhatCanBeStarted")
829 deplist = self.vishnu._depgraph.whatShouldBeStarted()
830 self.debug("Listing deplist")
831
832 if not deplist:
833 self.debug("Nothing needs to be setup or started!")
834 return
835 for dep in deplist:
836 self.debug("Deplist: %r,%r" % (dep[0], dep[1]))
837
838
839
840 for dep, deptype in deplist:
841 if dep:
842 if deptype == "COMPONENTSETUP":
843 self.debug("Component %s to be setup" % dep.get("name"))
844 componentAvatar = self.getComponentAvatarForState(dep)
845 if componentAvatar:
846 if not componentAvatar._beingSetup:
847 componentAvatar._beingSetup = True
848
849 def componentSetupFailed(failure):
850 componentAvatar._beingSetup = False
851 handleFailure(failure, componentAvatar,
852 T_(N_("Could not setup component.")),
853 "component-setup-%s")
854 try:
855 d = self.setupComponent(componentAvatar)
856 except:
857
858
859 componentSetupFailed(Failure())
860 continue
861
862
863
864
865
866 def setupErrback(failure):
867 componentSetupFailed(failure)
868 raise errors.ComponentSetupHandledError(failure)
869 d.addCallbacks(self._tryWhatCanBeStarted,
870 setupErrback)
871 else:
872 self.debug(
873 "Component %s already on way to being setup",
874 dep.get("name"))
875 else:
876 self.debug(
877 "Component %s to be setup but has no avatar yet",
878 dep.get("name"))
879 elif deptype == "COMPONENTSTART":
880 self.debug("Component %s to be started" % dep.get("name"))
881 componentAvatar = self.getComponentAvatarForState(dep)
882 if not componentAvatar._starting:
883 componentAvatar._starting = True
884 happyd = defer.Deferred()
885
886
887 happyd.addCallback(lambda r, s: s.set(
888 'moodPending', None),
889 componentAvatar.componentState)
890
891
892
893 happyd.addCallback(self._tryWhatCanBeStarted)
894 componentAvatar._happydefers.append(happyd)
895
896
897 def componentStartupFailed(failure):
898 componentAvatar._starting = False
899 handleFailure(failure, componentAvatar,
900 T_(N_("Could not start component.")),
901 "component-start-%s")
902 try:
903 d = self._startComponent(componentAvatar)
904 except:
905
906
907 componentStartupFailed(Failure())
908 continue
909
910
911 def startErrback(failure):
912 componentStartupFailed(failure)
913 raise errors.ComponentStartHandledError(failure)
914 d.addErrback(startErrback)
915 else:
916 self.log("Component is already starting")
917 elif deptype == "CLOCKMASTER":
918 self.debug("Component %s to be clock master!",
919 dep.get("name"))
920 componentAvatar = self.getComponentAvatarForState(dep)
921 if componentAvatar:
922 if not componentAvatar._providingClock:
923 componentAvatar._providingClock = True
924
925 def componentMasterClockFailed(failure):
926 componentAvatar._providingClock = False
927 handleFailure(failure, componentAvatar,
928 T_(N_("Could not setup component's master clock.")),
929 "component-clock-%s")
930 try:
931 d = self.provideMasterClock(componentAvatar)
932 except:
933
934
935 componentMasterClockFailed(Failure())
936 continue
937
938
939
940
941
942 def clockMasterErrback(failure):
943 componentMasterClockFailed(failure)
944 raise errors.ComponentStartHandledError(failure)
945 d.addCallbacks(self._tryWhatCanBeStarted,
946 clockMasterErrback)
947 else:
948 self.debug(
949 "Component %s already on way to clock master",
950 dep.get("name"))
951 else:
952 self.debug("Unknown dependency type")
953
986
987 setupComponent = defer_generator_method(_setupComponent)
988
990 """
991 This function registers a component in the heaven.
992 It is triggered when the mind is attached.
993
994 @param componentAvatar: the component to register
995 @type componentAvatar: L{flumotion.manager.component.ComponentAvatar}
996 """
997 self.debug('heaven registering component %r' % componentAvatar)
998
999
1001 """
1002 This function unregisters a component in the heaven.
1003 It is triggered when the mind is detached.
1004
1005 @param componentAvatar: the component to unregister
1006 @type componentAvatar: L{flumotion.manager.component.ComponentAvatar}
1007 """
1008 componentAvatar.debug('unregistering component')
1009
1010 conf = componentAvatar.componentState.get('config')
1011 if conf['clock-master'] == componentAvatar.avatarId:
1012
1013 self.removeMasterClock(componentAvatar)
1014
1016 """
1017 Tell the given component to provide a master clock.
1018 Trigger all deferreds waiting on this componentAvatar to provide
1019 a master clock.
1020
1021 @type componentAvatar: L{ComponentAvatar}
1022
1023 @rtype: L{twisted.internet.defer.Deferred}
1024 """
1025 avatarId = componentAvatar.avatarId
1026 self.debug('provideMasterClock on component %s' % avatarId)
1027
1028 def setMasterClockInfo(result):
1029
1030
1031
1032 self._masterClockInfo[avatarId] = result
1033 self.vishnu._depgraph.setClockMasterStarted(
1034 componentAvatar.componentState)
1035
1036 componentAvatar._providingClock = False
1037 return result
1038
1039 def wakeClockMasterWaiters(result):
1040 self.info('Received master clock info from clock master %s' %
1041 avatarId)
1042 self.debug('got master clock info: %r' % (result, ))
1043
1044
1045 if avatarId in self._clockMasterWaiters:
1046 waiters = self._clockMasterWaiters[avatarId]
1047 del self._clockMasterWaiters[avatarId]
1048 for d, waiterId in waiters:
1049 self.debug(
1050 'giving master clock info to waiting component %s' %
1051 waiterId)
1052 d.callback(result)
1053
1054 workerName = componentAvatar.getWorkerName()
1055 port = self.vishnu.reservePortsOnWorker(workerName, 1)[0]
1056
1057 def failedToProvideMasterClock(failure):
1058
1059
1060
1061 if avatarId in self._masterClockInfo:
1062 self.warning('Failed to provide master clock info to a '
1063 'component waiting for it')
1064 else:
1065 self.warning('Failed to provide master clock itself')
1066 self.debug('Going to release port')
1067 self.vishnu.releasePortsOnWorker(workerName, [port])
1068 self.warning(failure.getErrorMessage())
1069
1070 if avatarId in self._masterClockInfo:
1071 self.warning('component %s already has master clock info: %r'
1072 % (avatarId, self._masterClockInfo[avatarId]))
1073 del self._masterClockInfo[avatarId]
1074 d = componentAvatar.mindCallRemote('provideMasterClock', port)
1075 d.addCallback(setMasterClockInfo)
1076 d.addCallback(wakeClockMasterWaiters)
1077 d.addErrback(failedToProvideMasterClock)
1078 return d
1079
1081 """
1082 Tell the given component to stop providing a master clock.
1083
1084 @type componentAvatar: L{ComponentAvatar}
1085 """
1086
1087 avatarId = componentAvatar.avatarId
1088 workerName = componentAvatar.getWorkerName()
1089
1090
1091
1092 if avatarId in self._clockMasterWaiters:
1093 waiters = self._clockMasterWaiters[avatarId]
1094 del self._clockMasterWaiters[avatarId]
1095 for d, waiterId in waiters:
1096 self.debug('telling waiting component %s that '
1097 'the clock master %s is gone' % (waiterId, avatarId))
1098 d.errback(errors.ComponentStartError(
1099 'clock master component start cancelled'))
1100
1101
1102 if avatarId in self._masterClockInfo:
1103 info = self._masterClockInfo[avatarId]
1104 if info:
1105 port = info[1]
1106 self.vishnu.releasePortsOnWorker(workerName, [port])
1107 else:
1108 self.debug('avatarId %r has None masterClockInfo' % avatarId)
1109 del self._masterClockInfo[avatarId]
1110 else:
1111 self.warning('component %s has no master clock info'
1112 % (avatarId,))
1113
1115 """
1116 Get the master clock information from the given clock master component.
1117
1118 @param avatarId: the id of the clock master
1119 @type avatarId: str
1120 @param waiterId: the id of the requesting component
1121 @type waiterId: str
1122
1123 @returns: a deferred firing an (ip, port, base_time) triple.
1124 @rtype: L{twisted.internet.defer.Deferred}
1125 """
1126 self.debug('getting master clock info for component %s' % avatarId)
1127
1128
1129 if avatarId in self._masterClockInfo:
1130 return defer.succeed(self._masterClockInfo[avatarId])
1131
1132 if not avatarId in self._clockMasterWaiters:
1133 self._clockMasterWaiters[avatarId] = []
1134
1135
1136 ret = defer.Deferred()
1137 self._clockMasterWaiters[avatarId].append((ret, waiterId))
1138 return ret
1139
1141 """
1142 Return a component avatar for the given state.
1143
1144 @type state: L{flumotion.common.planet.ManagerComponentState}
1145
1146 @rtype: L{ComponentAvatar}
1147 """
1148 if state in self.vishnu._componentMappers:
1149 return self.vishnu._componentMappers[state].avatar
1150 else:
1151 return None
1152