1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager implementation and related classes
24
25 API Stability: semi-stable
26
27 @var LOCAL_IDENTITY: an identity for the manager itself; can be used
28 to compare against to verify that the manager
29 requested an action
30 @type LOCAL_IDENTITY: L{LocalIdentity}
31 """
32
33 __all__ = ['ManagerServerFactory', 'Vishnu']
34
35 import os
36
37 from twisted.internet import reactor, defer
38 from twisted.cred import error
39 from twisted.python import components, failure
40 from twisted.spread import pb
41 from twisted.cred import portal
42
43 from flumotion.common import bundle, config, errors, interfaces, log, registry
44 from flumotion.common import planet, common, dag, messages, reflectcall, server
45 from flumotion.common.identity import RemoteIdentity, LocalIdentity
46 from flumotion.common.planet import moods
47 from flumotion.configure import configure
48 from flumotion.manager import admin, component, worker, base, depgraph
49 from flumotion.twisted import checkers
50 from flumotion.twisted import portal as fportal
51 from flumotion.twisted.defer import defer_generator_method
52 from flumotion.twisted.compat import implements
53 from flumotion.common.messages import N_
54 T_ = messages.gettexter('flumotion')
55
56 LOCAL_IDENTITY = LocalIdentity('manager')
57
58 -def _find(list, value, proc=lambda x: x):
60
61 -def _first(list, proc=lambda x: x):
62 for x in list:
63 if proc(x): return x
64
65 -def _any(list, proc=lambda x: x):
66 return filter(proc, list)
67
69
70 def int(*args, **kwargs):
71 for p in procs:
72 if not p(*args, **kwargs): return False
73 return True
74 return int
75
76
77
79 """
80 I implement L{twisted.cred.portal.IRealm}.
81 I make sure that when a L{pb.Avatar} is requested through me, the
82 Avatar being returned knows about the mind (client) requesting
83 the Avatar.
84 """
85
86 implements(portal.IRealm)
87
88 logCategory = 'dispatcher'
89
91 """
92 @param computeIdentity: see L{Vishnu.computeIdentity}
93 @type computeIdentity: callable
94 """
95
96
97 self._interfaceHeavens = {}
98 self._avatarHeavens = {}
99 self._computeIdentity = computeIdentity
100
101
102
103
104
105
106
107
108
109
110
111
112
114 def got_avatar(avatar):
115
116
117
118
119 def cleanup(avatarId=avatarId, avatar=avatar, mind=mind):
120 self.removeAvatar(avatarId, avatar, mind)
121
122
123
124 reactor.callLater(0, avatar.attached, mind)
125 return (pb.IPerspective, avatar, cleanup)
126 def got_error(failure):
127
128
129
130
131
132 reactor.callLater(0, mind.broker.transport.loseConnection)
133 return failure
134
135 host = common.addressGetHost(mind.broker.transport.getPeer())
136 d = self.createAvatarFor(avatarId, keycard, host, ifaces)
137 d.addCallbacks(got_avatar, got_error)
138 return d
139
140
141
143 """
144 Remove an avatar because it logged out of the manager.
145
146 This function is registered by requestAvatar.
147 """
148 heaven = self._avatarHeavens[avatarId]
149 del self._avatarHeavens[avatarId]
150
151 avatar.detached(mind)
152 heaven.removeAvatar(avatarId)
153
155 """
156 Create an avatar from the heaven implementing the given interface.
157
158 @type avatarId: str
159 @param avatarId: the name of the new avatar
160 @type keycard: L{flumotion.common.keycards.Keycard}
161 @param keycard: the credentials being used to log in
162 @type remoteHost: str
163 @param remoteHost: the remote host
164 @type ifaces: tuple of interfaces linked to heaven
165 @param ifaces: a list of heaven interfaces to get avatar from,
166 including pb.IPerspective
167
168 @returns: a deferred that will fire an avatar from
169 the heaven managing the given interface.
170 """
171 def gotIdentity(identity):
172 for iface in ifaces:
173 heaven = self._interfaceHeavens.get(iface, None)
174 if heaven:
175 avatar = heaven.createAvatar(avatarId, identity)
176 self.debug('Created avatar %r for identity %r' % (
177 avatar, identity))
178 self._avatarHeavens[avatarId] = heaven
179 return avatar
180 raise errors.NoPerspectiveError("%s requesting iface %r",
181 avatarId, ifaces)
182
183 if not pb.IPerspective in ifaces:
184 raise errors.NoPerspectiveError(avatarId)
185 d = self._computeIdentity(keycard, remoteHost)
186 d.addCallback(gotIdentity)
187 return d
188
190 """
191 Register a Heaven as managing components with the given interface.
192
193 @type interface: L{twisted.python.components.Interface}
194 @param interface: a component interface to register the heaven with.
195 """
196 assert isinstance(heaven, base.ManagerHeaven)
197
198 self._interfaceHeavens[interface] = heaven
199
201 """
202 I am an object that ties together different objects related to a
203 component. I am used as values in a lookup hash in the vishnu.
204 """
206 self.state = None
207 self.id = None
208 self.avatar = None
209 self.jobState = None
210
212 """
213 I am the toplevel manager object that knows about all heavens and factories.
214
215 @cvar dispatcher: dispatcher to create avatars
216 @type dispatcher: L{Dispatcher}
217 @cvar workerHeaven: the worker heaven
218 @type workerHeaven: L{worker.WorkerHeaven}
219 @cvar componentHeaven: the component heaven
220 @type componentHeaven: L{component.ComponentHeaven}
221 @cvar adminHeaven: the admin heaven
222 @type adminHeaven: L{admin.AdminHeaven}
223 """
224
225 implements(server.IServable)
226
227 logCategory = "vishnu"
228
229 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
230
231
232 self.dispatcher = Dispatcher(self.computeIdentity)
233
234 self.workerHeaven = self._createHeaven(interfaces.IWorkerMedium,
235 worker.WorkerHeaven)
236 self.componentHeaven = self._createHeaven(interfaces.IComponentMedium,
237 component.ComponentHeaven)
238 self.adminHeaven = self._createHeaven(interfaces.IAdminMedium,
239 admin.AdminHeaven)
240
241 if configDir is not None:
242 self.configDir = configDir
243 else:
244 self.configDir = os.path.join(configure.configdir,
245 "managers", name)
246
247 self.bouncer = None
248
249 self.bundlerBasket = registry.getRegistry().makeBundlerBasket()
250
251 self._componentMappers = {}
252
253 self.state = planet.ManagerPlanetState()
254 self.state.set('name', name)
255
256 self.plugs = {}
257
258 self._depgraph = depgraph.DepGraph()
259
260
261
262
263 self.portal = fportal.BouncerPortal(self.dispatcher, None)
264
265 self.factory = pb.PBServerFactory(self.portal,
266 unsafeTracebacks=unsafeTracebacks)
267
268 self.connectionInfo = {}
269 self.setConnectionInfo(None, None, None)
270
271 self.configuration = None
272
276
278 """Returns the manager's configuration as a string suitable for
279 importing via loadConfiguration().
280 """
281 if self.configuration:
282 return self.configuration.export()
283 else:
284 return None
285
300
302 """
303 @param identity: L{flumotion.common.identity.Identity}
304 """
305 socket = 'flumotion.component.plugs.adminaction.AdminAction'
306 if self.plugs.has_key(socket):
307 for plug in self.plugs[socket]:
308 plug.action(identity, message, args, kw)
309
311 """
312 Compute a suitable identity for a remote host. First looks to
313 see if there is a
314 flumotion.component.plugs.identity.IdentityProvider plug
315 installed on the manager, falling back to user@host.
316
317 The identity is only used in the adminaction interface. An
318 example of its use is when you have an adminaction plug that
319 checks an admin's privileges before actually doing an action;
320 the identity object you use here might store the privileges that
321 the admin has.
322
323 @param keycard: the keycard that the remote host used to log in.
324 @type keycard: L{flumotion.common.keycards.Keycard}
325 @param remoteHost: the ip of the remote host
326 @type remoteHost: str
327
328 @rtype: a deferred that will fire a
329 L{flumotion.common.identity.RemoteIdentity}
330 """
331
332 socket = 'flumotion.component.plugs.identity.IdentityProvider'
333 if self.plugs.has_key(socket):
334 for plug in self.plugs[socket]:
335 identity = plug.computeIdentity(keycard, remoteHost)
336 if identity:
337 return identity
338 username = getattr(keycard, 'username', None)
339 return defer.succeed(RemoteIdentity(username, remoteHost))
340
367 def setupErrback(failure):
368 failure.trap(errors.ConfigError)
369 self.warning('Configuration error in manager bouncer: %s' %
370 failure.value.args[0])
371 d.addCallback(setupCallback)
372 d.addErrback(setupErrback)
373 return d
374
390
401
403 """
404 Add a component state for the given component config entry.
405
406 @rtype: L{flumotion.common.planet.ManagerComponentState}
407 """
408
409 self.debug('adding component %s to %s'
410 % (conf.name, parent.get('name')))
411
412 if identity != LOCAL_IDENTITY:
413 self.adminAction(identity, '_addComponent', (conf, parent), {})
414
415 state = planet.ManagerComponentState()
416 state.set('name', conf.name)
417 state.set('type', conf.getType())
418 state.set('workerRequested', conf.worker)
419 state.setMood(moods.sleeping.value)
420 state.set('config', conf.getConfigDict())
421
422 state.set('parent', parent)
423 parent.append('components', state)
424
425 avatarId = conf.getConfigDict()['avatarId']
426
427 if conf.getConfigDict()['version'] != configure.versionTuple:
428 m = messages.Warning(T_(N_("This component is configured for "
429 "Flumotion version %s, but you are running version %s.\n"
430 "Please update the configuration of the component.\n"),
431 common.versionTupleToString(conf.getConfigDict()['version']),
432 configure.version))
433 state.append('messages', m)
434
435
436 m = ComponentMapper()
437 m.state = state
438 m.id = avatarId
439 self._componentMappers[state] = m
440 self._componentMappers[avatarId] = m
441
442 return state
443
458
460 """
461 Add a new config object into the planet state.
462
463 @returns: a list of all components added
464 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
465 """
466
467 self.debug('syncing up planet state with config')
468 added = []
469
470 state = self.state
471 atmosphere = state.get('atmosphere')
472 for name, c in conf.atmosphere.components.items():
473 if name in [x.get('name') for x in atmosphere.get('components')]:
474 self.debug('atmosphere already has component %s' % name)
475 else:
476 added.append(self._addComponent(c, atmosphere, identity))
477
478 flows = dict([(x.get('name'), x) for x in state.get('flows')])
479 for f in conf.flows:
480 try:
481 flow = flows[f.name]
482 self.debug('checking existing flow %s' % f.name)
483 except KeyError:
484 self.info('creating flow "%s"' % f.name)
485 flow = planet.ManagerFlowState(name=f.name, parent=state)
486 state.append('flows', flow)
487
488 components = [x.get('name') for x in flow.get('components')]
489 for name, c in f.components.items():
490 if name in components:
491 self.debug('component %s already in flow %s'
492 % (c.name, f.name))
493 else:
494 added.append(self._addComponent(c, flow, identity))
495
496 for componentState in added:
497 self._updateFlowDependencies(componentState)
498
499 try:
500 self._depgraph.mapEatersToFeeders()
501 except errors.ComponentConfigError, e:
502 state = e.args[0]
503 debug = e.args[1]
504 message = messages.Error(T_(
505 N_("The component is misconfigured.")),
506 debug=debug)
507 state.append('messages', message)
508 state.setMood(moods.sad.value)
509 raise e
510
511 return added
512
514
515
516 componentsToStart = {}
517 for c in components:
518 workerId = c.get('workerRequested')
519 if not workerId in componentsToStart:
520 componentsToStart[workerId] = []
521 componentsToStart[workerId].append(c)
522 self.debug('_startComponents: componentsToStart %r' % componentsToStart)
523
524 for workerId, componentStates in componentsToStart.items():
525 self._workerCreateComponents(workerId, componentStates)
526
534
536 """
537 Load the configuration from the given XML, merging it on top of
538 the currently running configuration.
539
540 @param file: file to parse, either as an open file object,
541 or as the name of a file to open
542 @type file: str or file
543 @param identity: The identity making this request.. This is used by the
544 adminaction logging mechanism in order to say who is
545 performing the action.
546 @type identity: L{flumotion.common.identity.Identity}
547 """
548 self.debug('loading configuration')
549 self.configuration = conf = config.FlumotionConfigXML(file)
550 conf.parse()
551 return self._loadConfiguration(conf, identity)
552
554 """
555 Create a heaven of the given klass that will send avatars to clients
556 implementing the given medium interface.
557
558 @param interface: the medium interface to create a heaven for
559 @type interface: L{flumotion.common.interfaces.IMedium}
560 @param klass: the type of heaven to create
561 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven}
562 """
563 assert issubclass(interface, interfaces.IMedium)
564 heaven = klass(self)
565 self.dispatcher.registerHeaven(heaven, interface)
566 return heaven
567
577
580
582 """
583 Create the given component. This will currently also trigger
584 a start eventually when the component avatar attaches.
585
586 The component should be sleeping.
587 The worker it should be started on should be present.
588 """
589 m = componentState.get('mood')
590 if m != moods.sleeping.value:
591 raise errors.ComponentMoodError("%r not sleeping but %s" % (
592 componentState, moods.get(m).name))
593
594 p = componentState.get('moodPending')
595 if p != None:
596 raise errors.ComponentMoodError(
597 "%r already has a pending mood %s" % (
598 componentState, moods.get(p).name))
599
600
601 workerId = (componentState.get('workerName')
602 or componentState.get('workerRequested'))
603
604 if not workerId in self.workerHeaven.avatars:
605 raise errors.ComponentNoWorkerError(
606 "worker %s is not logged in" % workerId)
607 else:
608 return self._workerCreateComponents(workerId, [componentState])
609
611 """
612 Stop the given component.
613 If the component was sad, we clear its sad state as well,
614 since the stop was explicitly requested by the admin.
615
616 @type componentState: L{planet.ManagerComponentState}
617
618 @rtype: L{defer.Deferred}
619 """
620 self.debug('componentStop(%r)' % componentState)
621
622 for m in componentState.get('messages'):
623 self.debug('Removing message %r' % m)
624 componentState.remove('messages', m)
625
626
627
628 if (componentState.get('moodPending') != None and
629 componentState.get('moodPending') != moods.happy.value):
630 self.debug("Pending mood is %r", componentState.get('moodPending'))
631 raise errors.BusyComponentError(componentState)
632
633 avatar = self.getComponentMapper(componentState).avatar
634 if not avatar:
635
636
637
638 if componentState.get('mood') == moods.sad.value:
639 self.debug('asked to stop a sad component without avatar')
640 componentState.setMood(moods.sleeping.value)
641 componentState.set('moodPending', None)
642 return defer.succeed(None)
643 if componentState.get('mood') == moods.lost.value:
644 self.debug('asked to stop a lost component without avatar')
645 componentState.setMood(moods.sleeping.value)
646 componentState.set('moodPending', None)
647 return defer.succeed(None)
648
649 msg = 'asked to stop a component without avatar in mood %s' % \
650 moods.get(componentState.get('mood'))
651 self.warning(msg)
652 return defer.fail(errors.ComponentError(msg))
653
654 d = avatar.mindCallRemote('stop')
655 def cleanupAndDisconnectComponent(result):
656 avatar._starting = False
657 avatar._beingSetup = False
658 return avatar.disconnect()
659
660 def setSleeping(result):
661 if componentState.get('mood') == moods.sad.value:
662 self.debug('clearing sad mood after having stopped component')
663 componentState.setMood(moods.sleeping.value)
664
665 return result
666
667 d.addCallback(cleanupAndDisconnectComponent)
668 d.addCallback(setSleeping)
669
670 return d
671
673 """
674 Set the given message on the given component's state.
675 Can be called e.g. by a worker to report on a crashed component.
676 Sets the mood to sad if it is an error message.
677 """
678 if not avatarId in self._componentMappers:
679 self.warning('asked to set a message on non-mapped component %s' %
680 avatarId)
681 return
682
683 m = self._componentMappers[avatarId]
684 m.state.append('messages', message)
685 if message.level == messages.ERROR:
686 self.debug('Error message makes component sad')
687 m.state.setMood(moods.sad.value)
688
689
691
692 workerId = workerAvatar.avatarId
693 self.debug('vishnu.workerAttached(): id %s' % workerId)
694
695 self._depgraph.addWorker(workerId)
696 self._depgraph.setWorkerStarted(workerId)
697
698
699
700
701 components = [c for c in self._getComponentsToCreate()
702 if c.get('workerRequested') in (workerId, None)]
703
704
705
706
707 d = workerAvatar.getComponents()
708 def workerAvatarComponentListReceived(workerComponents):
709
710 lostComponents = list([c for c in self.getComponentStates()
711 if c.get('workerRequested') == workerId and \
712 c.get('mood') == moods.lost.value])
713 for comp in workerComponents:
714
715
716 if comp in self._componentMappers:
717 compState = self._componentMappers[comp].state
718 if compState in components:
719 components.remove(compState)
720 if compState in lostComponents:
721 lostComponents.remove(compState)
722
723 for compState in lostComponents:
724 self.info(
725 "Restarting previously lost component %s on worker %s",
726 self._componentMappers[compState].id, workerId)
727
728
729
730 compState.set('moodPending', None)
731 compState.setMood(moods.sleeping.value)
732
733 allComponents = components + lostComponents
734
735 if not allComponents:
736 self.debug(
737 "vishnu.workerAttached(): no components for this worker")
738 return
739
740 self._workerCreateComponents(workerId, allComponents)
741 d.addCallback(workerAvatarComponentListReceived)
742
744 """
745 Create the list of components on the given worker, sequentially, but
746 in no specific order.
747
748 @param workerId: avatarId of the worker
749 @type workerId: string
750 @param components: components to start
751 @type components: list of
752 L{flumotion.common.planet.ManagerComponentState}
753 """
754 self.debug("_workerCreateComponents: workerId %r, components %r" % (
755 workerId, components))
756
757 if not workerId in self.workerHeaven.avatars:
758 self.debug('worker %s not logged in yet, delaying '
759 'component start' % workerId)
760 return defer.succeed(None)
761
762 workerAvatar = self.workerHeaven.avatars[workerId]
763
764 d = defer.Deferred()
765
766 for c in components:
767 type = c.get('type')
768 conf = c.get('config')
769 self.debug('scheduling create of %s on %s'
770 % (conf['avatarId'], workerId))
771 d.addCallback(self._workerCreateComponentDelayed,
772 workerAvatar, c, type, conf)
773
774 d.addCallback(lambda result: self.debug(
775 '_workerCreateComponents(): completed setting up create chain'))
776
777
778 self.debug('_workerCreateComponents(): triggering create chain')
779 d.callback(None)
780
781 return d
782
798
799
800
801
803 self.debug('got avatarId %s for state %s' % (result, componentState))
804 m = self._componentMappers[componentState]
805 assert result == m.id, "received id %s is not the expected id %s" % (
806 result, m.id)
807
831
833
834 workerId = workerAvatar.avatarId
835 self.debug('vishnu.workerDetached(): id %s' % workerId)
836 self._depgraph.setWorkerStopped(workerId)
837
839
840
841
842
843
844
845
846
847
848 def verifyExistingComponentState(jobState, state):
849
850 state.setJobState(jobState)
851
852 if conf and state.get('config') != conf:
853 message = messages.Warning(T_(
854 N_("Component logged in with stale configuration. "
855 "Consider stopping this component and restarting "
856 "the manager.")),
857 debug=("Expected\n%r\n, but got\n%r;\n"
858 "updating internal state accordingly." %
859 (state.get('config'), conf)))
860 self.warning('updating internal component state for %r '
861 '(changing config from %r to %r)', state,
862 state.get('config'), conf)
863 state.set('config', conf)
864 state.append('messages', message)
865
866
867
868 def makeNewComponentState(conf):
869
870 state = planet.ManagerComponentState()
871 state.setJobState(jobState)
872
873 if conf:
874 flowName, compName = conf['parent'], conf['name']
875 else:
876
877
878
879 flowName, compName = common.parseComponentId(avatar.avatarId)
880 conf = {'name': compName,
881 'parent': flowName,
882 'type': 'unknown-component',
883 'avatarId': avatar.avatarId,
884 'properties': {}}
885
886 state.set('name', compName)
887 state.set('type', conf['type'])
888 state.set('workerRequested', jobState.get('workerName'))
889 state.set('config', conf)
890
891
892 if flowName == 'atmosphere':
893
894 flow = self.state.get('atmosphere')
895 else:
896 flow = _first(self.state.get('flows'),
897 lambda x: x.get('name') == flowName)
898 if not flow:
899 self.info('Creating flow "%s"' % flowName)
900 flow = planet.ManagerFlowState()
901 flow.set('name', flowName)
902 flow.set('parent', self.state)
903 self.state.append('flows', flow)
904
905 state.set('parent', flow)
906 flow.append('components', state)
907
908
909 m = ComponentMapper()
910 m.state = state
911 m.id = avatar.avatarId
912 self._componentMappers[m.state] = m
913 self._componentMappers[m.id] = m
914
915 (_success1, conf), (_success2, jobState) = deferredListResult
916 m = self.getComponentMapper(avatar.avatarId)
917
918 if m:
919 verifyExistingComponentState(jobState, m.state)
920 else:
921 makeNewComponentState(conf)
922
923 m = self.getComponentMapper(avatar.avatarId)
924
925
926 self._depgraph.addComponent(m.state)
927
928 m.avatar = avatar
929 self._componentMappers[m.avatar] = m
930 avatar.componentState = m.state
931 avatar.jobState = jobState
932 m.jobState = jobState
933 self._componentMappers[jobState] = m
934
944
946
947 self.debug("%s component detached", componentAvatar.avatarId)
948 self._depgraph.setJobStopped(componentAvatar.componentState)
949 componentAvatar.componentState.set('moodPending', None)
950
951
952 componentAvatar.componentState = None
953 componentAvatar.jobState = None
954
978
980
981
982 self.debug('unregisterComponent(%r): cleaning up state' %
983 componentAvatar)
984
985 if componentAvatar not in self._componentMappers:
986 self.warning("Component logging out that was incompletely logged "
987 " in, ignoring")
988 return
989
990 m = self._componentMappers[componentAvatar]
991
992
993 try:
994 del self._componentMappers[m.jobState]
995 except KeyError:
996 self.warning('Could not remove jobState for %r' % componentAvatar)
997 m.jobState = None
998
999 m.state.set('pid', None)
1000 m.state.set('cpu', None)
1001 m.state.set('workerName', None)
1002 m.state.set('moodPending', None)
1003
1004
1005 del self._componentMappers[m.avatar]
1006 m.avatar = None
1007
1018
1020 """
1021 Empty the planet of the given component.
1022
1023 @returns: a deferred that will fire when all listeners have been
1024 notified of the removal of the component.
1025 """
1026 self.debug('deleting component %r from state', componentState)
1027 c = componentState
1028 flow = componentState.get('parent')
1029 if (c.get('moodPending') != None
1030 or c.get('mood') is not moods.sleeping.value):
1031 raise errors.BusyComponentError(c)
1032
1033 self._depgraph.removeComponent(c)
1034
1035 del self._componentMappers[self._componentMappers[c].id]
1036 del self._componentMappers[c]
1037 return flow.remove('components', c)
1038
1040 """
1041 Empty the planet of a flow.
1042
1043 @returns: a deferred that will fire when the flow is removed.
1044 """
1045
1046
1047 flow = _find(self.state.get('flows'), flowName, lambda x: x.get('name'))
1048 components = flow.get('components')
1049
1050
1051 isBusy = lambda c: c.get('moodPending') != None
1052 isNotSleeping = lambda c: c.get('mood') is not moods.sleeping.value
1053 pred = _fint(isBusy, isNotSleeping)
1054 if _any(components, pred):
1055 raise errors.BusyComponentError(_first(components, pred))
1056
1057 for c in components:
1058 self._depgraph.removeComponent(c)
1059 del self._componentMappers[self._componentMappers[c].id]
1060 del self._componentMappers[c]
1061 yield flow.empty()
1062 yield self.state.remove('flows', flow)
1063 deleteFlow = defer_generator_method(deleteFlow)
1064
1066 """
1067 Empty the planet of all components, and flows.
1068
1069 @returns: a deferred that will fire when the planet is empty.
1070 """
1071
1072 components = self.getComponentStates()
1073
1074
1075 isPending = lambda c: c.get('moodPending') != None
1076 components = filter(isPending, components)
1077 if len(components) > 0:
1078 state = components[0]
1079 raise errors.BusyComponentError(state,
1080 "moodPending is %s" % moods.get(state.get('moodPending')))
1081
1082
1083 components = self.getComponentStates()
1084 isNotSleeping = lambda c: c.get('mood') is not moods.sleeping.value
1085 components = filter(isNotSleeping, components)
1086
1087
1088 d = defer.Deferred()
1089
1090 self.debug('need to stop %d components: %r' % (
1091 len(components), components))
1092
1093
1094
1095 for c in components:
1096 avatar = self._componentMappers[c].avatar
1097
1098
1099 if avatar:
1100 d.addCallback(lambda result, a: a.stop(), avatar)
1101 else:
1102 assert (c.get('mood') is moods.sad.value or
1103 c.get('mood') is moods.lost.value)
1104
1105 d.addCallback(self._emptyPlanetCallback)
1106
1107
1108 reactor.callLater(0, d.callback, None)
1109
1110 return d
1111
1113
1114
1115 components = self.getComponentStates()
1116 self.debug('_emptyPlanetCallback: need to delete %d components' %
1117 len(components))
1118
1119 for c in components:
1120
1121 self._depgraph.removeComponent(c)
1122
1123 if c.get('mood') is not moods.sleeping.value:
1124 self.warning('Component %s is not sleeping' % c.get('name'))
1125
1126 m = self._componentMappers[c]
1127 del self._componentMappers[m.id]
1128 del self._componentMappers[c]
1129
1130
1131 l = self._componentMappers.keys()
1132 if len(l) > 0:
1133 self.warning('mappers still has keys %r' % (repr(l)))
1134
1135 list = []
1136
1137 list.append(self.state.get('atmosphere').empty())
1138
1139 for f in self.state.get('flows'):
1140 self.debug('appending deferred for emptying flow %r' % f)
1141 list.append(f.empty())
1142 self.debug('appending deferred for removing flow %r' % f)
1143 list.append(self.state.remove('flows', f))
1144 self.debug('appended deferreds')
1145
1146 dl = defer.DeferredList(list)
1147 return dl
1148
1150 """
1151 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
1152 """
1153
1154 components = self.state.getComponents()
1155
1156
1157
1158
1159
1160 isSleeping = lambda c: c.get('mood') == moods.sleeping.value
1161 components = filter(isSleeping, components)
1162 return components
1163
1165
1166 if not workerName in self.workerHeaven.avatars:
1167 raise errors.ComponentNoWorkerError("Worker %s not logged in?"
1168 % workerName)
1169
1170 return self.workerHeaven.avatars[workerName]
1171
1174
1176 """
1177 Requests a number of ports on the worker named workerName. The
1178 ports will be reserved for the use of the caller until
1179 releasePortsOnWorker is called.
1180
1181 @returns: a list of ports as integers
1182 """
1183 return self._getWorker(workerName).reservePorts(numPorts)
1184
1186 """
1187 Tells the manager that the given ports are no longer being used,
1188 and may be returned to the allocation pool.
1189 """
1190 try:
1191 return self._getWorker(workerName).releasePorts(ports)
1192 except errors.ComponentNoWorkerError, e:
1193 self.warning('could not release ports: %r' % e.args)
1194
1196 """
1197 Look up an object mapper given the object.
1198
1199 @rtype: L{ComponentMapper} or None
1200 """
1201 if object in self._componentMappers.keys():
1202 return self._componentMappers[object]
1203
1204 return None
1205