1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects to handle worker clients
24 """
25
26 import os
27 import signal
28 import sys
29 import exceptions
30
31 import gst
32 import gst.interfaces
33
34 from twisted.cred import portal
35 from twisted.internet import defer, reactor
36 from twisted.spread import pb
37 import twisted.cred.error
38 from twisted.internet import error
39
40 from flumotion.common import errors, interfaces, log, bundleclient
41 from flumotion.common import common, medium, messages, worker
42 from flumotion.twisted import checkers, fdserver, compat
43 from flumotion.twisted import pb as fpb
44 from flumotion.twisted import defer as fdefer
45 from flumotion.twisted.defer import defer_generator_method
46 from flumotion.twisted.compat import implements
47 from flumotion.configure import configure
48 from flumotion.worker import feed
49
50 JOB_SHUTDOWN_TIMEOUT = 5
51
52 factoryClass = fpb.ReconnectingFPBClientFactory
54 """
55 I am a client factory for the worker to log in to the manager.
56 """
57 logCategory = 'worker'
58 perspectiveInterface = interfaces.IWorkerMedium
59
61 """
62 @type brain: L{flumotion.worker.worker.WorkerBrain}
63 """
64 self._managerHost = brain.managerHost
65 self._managerPort = brain.managerPort
66 self.medium = brain.medium
67
68 factoryClass.__init__(self)
69
70 self.maxDelay = 10
71
81
82
84
85
86 def remoteDisconnected(remoteReference):
87 if reactor.killed:
88 self.log('Connection to manager lost due to shutdown')
89 else:
90 self.warning('Lost connection to manager, '
91 'will attempt to reconnect')
92
93 def loginCallback(reference):
94 self.info("Logged in to manager")
95 self.debug("remote reference %r" % reference)
96
97 self.medium.setRemoteReference(reference)
98 reference.notifyOnDisconnect(remoteDisconnected)
99
100 def alreadyConnectedErrback(failure):
101 failure.trap(errors.AlreadyConnectedError)
102 self.warning('A worker with the name "%s" is already connected.' %
103 failure.value)
104
105 def accessDeniedErrback(failure):
106 failure.trap(twisted.cred.error.UnauthorizedLogin)
107 self.warning('Access denied.')
108
109 def connectionRefusedErrback(failure):
110 failure.trap(twisted.internet.error.ConnectionRefusedError)
111 self.warning('Connection to %s:%d refused.' % (self._managerHost,
112 self._managerPort))
113
114 def NoSuchMethodErrback(failure):
115 failure.trap(twisted.spread.flavors.NoSuchMethod)
116
117 if failure.value.find('remote_getKeycardClasses') > -1:
118 self.warning(
119 "Manager %s:%d is older than version 0.3.0. "
120 "Please upgrade." % (self._managerHost, self._managerPort))
121 return
122
123 return failure
124
125 def loginFailedErrback(failure):
126 self.warning('Login failed, reason: %s' % str(failure))
127
128 d.addCallback(loginCallback)
129 d.addErrback(accessDeniedErrback)
130 d.addErrback(connectionRefusedErrback)
131 d.addErrback(alreadyConnectedErrback)
132 d.addErrback(NoSuchMethodErrback)
133 d.addErrback(loginFailedErrback)
134
136 """
137 I am a medium interfacing with the manager-side WorkerAvatar.
138
139 @ivar brain: the worker brain
140 @type brain: L{WorkerBrain}
141 """
142
143 logCategory = 'workermedium'
144
145 implements(interfaces.IWorkerMedium)
146
148 """
149 @type brain: L{WorkerBrain}
150 """
151 self.brain = brain
152 self._ports = ports
153
154
156 """
157 Gets the range of feed ports that this worker was configured to
158 use.
159
160 @rtype: list of int
161 @return: list of ports
162 """
163 return self._ports
164
166 """
167 Return the TCP port the Feed Server is listening on.
168
169 @rtype: int, or NoneType
170 @return: TCP port number, or None if there is no feed server
171 """
172 port = self.brain.feedServerPort
173 return port
174
175 - def remote_create(self, avatarId, type, moduleName, methodName, nice=0):
176 """
177 Start a component of the given type with the given nice level.
178 Will spawn a new job process to run the component in.
179
180 @param avatarId: avatar identification string
181 @type avatarId: str
182 @param type: type of the component to create
183 @type type: str
184 @param moduleName: name of the module to create the component from
185 @type moduleName: str
186 @param methodName: the factory method to use to create the component
187 @type methodName: str
188 @param nice: nice level
189 @type nice: int
190
191 @returns: a deferred fired when the process has started and created
192 the component
193 """
194
195
196
197
198
199
200 self.info('Starting component "%s" of type "%s"' % (avatarId, type))
201
202
203
204
205
206 self.debug('setting up bundles for %s' % moduleName)
207 d = self.bundleLoader.getBundles(moduleName=moduleName)
208 yield d
209
210 bundles = d.value()
211
212
213 d = self.brain.deferredCreate(avatarId)
214 if not d:
215 msg = ("Component '%s' has already received a create request"
216 % avatarId)
217 raise errors.ComponentCreateError(msg)
218
219
220 self.brain.kindergarten.play(avatarId, type, moduleName, methodName,
221 nice, bundles)
222
223 yield d
224
225 try:
226 result = d.value()
227 except errors.ComponentCreateError, e:
228 self.debug('create deferred for %s failed, forwarding error' %
229 avatarId)
230 raise
231 self.debug('create deferred for %s succeeded (%r)'
232 % (avatarId, result))
233 yield result
234 remote_create = defer_generator_method(remote_create)
235
237 """
238 Checks if one or more GStreamer elements are present and can be
239 instantiated.
240
241 @param elementNames: names of the Gstreamer elements
242 @type elementNames: list of str
243
244 @rtype: list of str
245 @returns: a list of instantiatable element names
246 """
247 self.debug('remote_checkElements: element names to check %r' % (
248 elementNames,))
249
250 list = []
251 for name in elementNames:
252 try:
253 gst.element_factory_make(name)
254 list.append(name)
255 except gst.PluginNotFoundError:
256 pass
257 self.debug('remote_checkElements: returning elements names %r' % list)
258 return list
259
261 """
262 Checks if the given module can be imported.
263
264 @param moduleName: name of the module to check
265 @type moduleName: str
266
267 @returns: None or Failure
268 """
269 self.debug('remote_checkImport: %s', moduleName)
270
271
272 __import__(moduleName)
273
275 """
276 Runs the given function in the given module with the given arguments.
277
278 @param module: module the function lives in
279 @type module: str
280 @param function: function to run
281 @type function: str
282
283 @returns: the return value of the given function in the module.
284 """
285 return self.runBundledFunction(module, function, *args, **kwargs)
286
288 """
289 I return a list of componentAvatarIds, I have. I am called by the
290 manager soon after I attach to it. This is needed on reconnects
291 so that the manager knows what components it needs to start on me.
292
293 @returns: a list of componentAvatarIds
294 """
295 return self.brain.kindergarten.getKidAvatarIds()
296
298 """
299 I am an abstraction of a job process started by the worker.
300
301 @cvar pid: PID of the child process
302 @type pid: int
303 @cvar avatarId: avatar identification string
304 @type avatarId: str
305 @cvar type: type of the component to create
306 @type type: str
307 @cvar moduleName: name of the module to create the component from
308 @type moduleName: str
309 @cvar methodName: the factory method to use to create the component
310 @type methodName: str
311 @cvar nice: the nice level to run the kid as
312 @type nice: int
313 @cvar bundles: ordered list of (bundleName, bundlePath) needed to
314 create the component
315 @type bundles: list of (str, str)
316 """
317 - def __init__(self, pid, avatarId, type, moduleName, methodName, nice,
318 bundles):
319 self.pid = pid
320 self.avatarId = avatarId
321 self.type = type
322 self.moduleName = moduleName
323 self.methodName = methodName
324 self.nice = nice
325 self.bundles = bundles
326
328 - def __init__(self, kindergarten, avatarId):
332
337
366
368 """
369 I spawn job processes.
370 I live in the worker brain.
371 """
372
373 logCategory = 'workerbrain'
374
375 - def __init__(self, options, socketPath, brain):
376 """
377 @param options: the optparse option instance of command-line options
378 @type options: dict
379 @param socketPath: the path of the Unix domain socket for PB
380 @type socketPath: str
381 @param brain: a reference to the worker brain
382 @type brain: L{WorkerBrain}
383 """
384 self.brain = brain
385 self.options = options
386
387 self._onShutdown = None
388
389
390 self._kids = {}
391 self._socketPath = socketPath
392
393 - def play(self, avatarId, type, moduleName, methodName, nice, bundles):
394 """
395 Create a kid and make it "play" by starting a job.
396 Starts a component with the given name, of the given type, with
397 the given nice level.
398
399 This will spawn a new flumotion-job process.
400
401 @param avatarId: avatarId the component should use to log in
402 @type avatarId: str
403 @param type: type of component to start
404 @type type: str
405 @param moduleName: name of the module to create the component from
406 @type moduleName: str
407 @param methodName: the factory method to use to create the component
408 @type methodName: str
409 @param nice: nice level
410 @type nice: int
411 @param bundles: ordered list of (bundleName, bundlePath) for this
412 component
413 @type bundles: list of (str, str)
414 """
415 p = JobProcessProtocol(self, avatarId)
416 executable = os.path.join(os.path.dirname(sys.argv[0]), 'flumotion-job')
417 if not os.path.exists(executable):
418 self.error("Trying to spawn job process, but '%s' does not "
419 "exist" % executable)
420
421 argv = [executable, avatarId, self._socketPath]
422
423 realexecutable = executable
424
425
426
427
428
429 if os.environ.has_key('FLU_VALGRIND_JOB'):
430 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',')
431 if avatarId in jobnames:
432 realexecutable = 'valgrind'
433
434
435
436 argv = ['valgrind', '--leak-check=full', '--num-callers=24',
437 '--leak-resolution=high', '--show-reachable=yes',
438 'python'] + argv
439
440 childFDs = {0: 0, 1: 1, 2: 2}
441 env = {}
442 env.update(os.environ)
443
444 env['FLU_DEBUG'] = log._FLU_DEBUG
445 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv,
446 childFDs=childFDs)
447
448 p.setPid(process.pid)
449
450 self._kids[avatarId] = \
451 Kid(process.pid, avatarId, type, moduleName, methodName, nice,
452 bundles)
453
455 """
456 Set a deferred to fire when we have no children
457 """
458 if not self._kids:
459 d.callback(None)
460 else:
461 self._onShutdown = d
462
465
467 return self._kids.values()
468
470 return self._kids.keys()
471
473 self.warning("Killing all children immediately")
474 for kid in self.getKids():
475 self.debug("Sending SIGKILL to pid %d", kid.pid)
476 common.killPid(kid.pid)
477
479 """
480 Remove the kid from the kindergarten based on the pid.
481 Called by the signal handler in the brain.
482
483 @returns: whether or not a kid with that pid was removed
484 @rtype: boolean
485 """
486 for path, kid in self._kids.items():
487 if kid.pid == pid:
488 self.debug('Removing kid with name %s and pid %d' % (
489 path, pid))
490 del self._kids[path]
491 if not self._kids and self._onShutdown:
492 self.debug("Last child exited")
493 self._onShutdown.callback(None)
494
495 return True
496
497 self.warning('Asked to remove kid with pid %d but not found' % pid)
498 return False
499
501
502
503
504
505
506 import tempfile
507 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.')
508 os.close(fd)
509
510 return name
511
512
514 """
515 I am the main object in the worker process, managing jobs and everything
516 related.
517 I live in the main worker process.
518
519 @ivar authenticator: authenticator worker used to log in to manager
520 @type authenticator L{flumotion.twisted.pb.Authenticator}
521 @ivar kindergarten:
522 @type kindergarten: L{Kindergarten}
523 @ivar medium:
524 @type medium: L{WorkerMedium}
525 @ivar jobHeaven:
526 @type jobHeaven: L{JobHeaven}
527 @ivar workerClientFactory:
528 @type workerClientFactory: L{WorkerClientFactory}
529 @ivar feedServerPort: TCP port the Feed Server is listening on
530 @type feedServerPort: int
531 """
532
533 compat.implements(interfaces.IFeedServerParent)
534
535 logCategory = 'workerbrain'
536
538 """
539 @param options: the optparsed dictionary of command-line options
540 @type options: an object with attributes
541 """
542 self.options = options
543 self.workerName = options.name
544
545 self.managerHost = options.host
546 self.managerPort = options.port
547 self.managerTransport = options.transport
548
549 self.authenticator = None
550
551 ports = []
552 if not self.options.randomFeederports:
553 ports = self.options.feederports[:-1]
554 self.medium = WorkerMedium(self, ports)
555 self._socketPath = _getSocketPath()
556 self.kindergarten = Kindergarten(options, self._socketPath, self)
557 self.jobHeaven = JobHeaven(self)
558 self.workerClientFactory = WorkerClientFactory(self)
559
560 self._port = None
561
562 self._jobServerFactory = None
563 self._jobServerPort = None
564 self._feedServerFactory = feed.feedServerFactory(self)
565
566 self._feedServerPort = None
567 self.feedServerPort = None
568
569 self._createDeferreds = {}
570
571 self._shutdownDeferreds = {}
572
573
575 """
576 Start listening on FeedServer (incoming eater requests) and
577 JobServer (through which we communicate with our children) ports
578
579 @returns: True if we successfully listened on both ports
580 """
581
582 try:
583 self._setupFeedServer()
584 except error.CannotListenError, e:
585 self.warning("Failed to listen on feed server port: %r", e)
586 return False
587
588 try:
589 self._jobServerFactory, self._jobServerPort = self._setupJobServer()
590 except error.CannotListenError, e:
591 self.warning("Failed to listen on job server port: %r", e)
592 return False
593
594 return True
595
596 - def login(self, authenticator):
597 self.authenticator = authenticator
598 self.workerClientFactory.startLogin(authenticator)
599
622
624 """
625 @returns: (port, portNumber)
626 """
627 port = None
628 if self.options.randomFeederports:
629 port = 0
630 else:
631 try:
632 port = self.options.feederports[-1]
633 except IndexError:
634 self.info(
635 'Not starting feed server because no port is configured')
636 return
637
638 self._feedServerPort = reactor.listenWith(
639 fdserver.PassableServerPort, port,
640 self._feedServerFactory)
641
642
643 self.feedServerPort = self._feedServerPort.getHost().port
644 self.debug('Listening for feed requests on TCP port %s' %
645 self.feedServerPort)
646
647
649 """
650 Clean up after setup()
651
652 @Returns: a L{twisted.internet.defer.Deferred} that fires when
653 the teardown is completed
654 """
655 self.debug("cleaning up port %r" % self._port)
656 dl = []
657 if self._jobServerPort:
658 dl.append(self._jobServerPort.stopListening())
659 if self._feedServerPort:
660 dl.append(self._feedServerPort.stopListening())
661
662 return defer.DeferredList(dl)
663
664 - def callRemote(self, methodName, *args, **kwargs):
666
680
682 """
683 Create and register a deferred for creating the given component.
684 This deferred will be fired when the JobAvatar has instructed the
685 job to create the component.
686
687 @rtype: L{twisted.internet.defer.Deferred}
688 """
689 self.debug('making create deferred for %s' % avatarId)
690
691 d = defer.Deferred()
692
693
694
695
696
697 if avatarId in self._createDeferreds:
698
699
700 self.info('already have a create deferred for %s', avatarId)
701 raise errors.ComponentAlreadyStartingError(avatarId)
702 elif avatarId in self._shutdownDeferreds:
703
704
705 self.debug('waiting for previous %s to shut down like it '
706 'said it would', avatarId)
707 def ensureShutdown(res,
708 shutdown=self._shutdownDeferreds[avatarId]):
709 shutdown.addCallback(lambda _: res)
710 return shutdown
711 d.addCallback(ensureShutdown)
712 elif avatarId in self.jobHeaven.avatars:
713
714 self.info('avatar named %s already running', avatarId)
715 raise errors.ComponentAlreadyRunningError(avatarId)
716 else:
717
718 pass
719
720 self.debug('registering deferredCreate for %s', avatarId)
721 self._createDeferreds[avatarId] = d
722 return d
723
725 """
726 Trigger a previously registered deferred for creating up the given
727 component.
728 """
729 self.debug('triggering create deferred for %s' % avatarId)
730 if not avatarId in self._createDeferreds:
731 self.warning('No create deferred registered for %s' % avatarId)
732 return
733
734 d = self._createDeferreds[avatarId]
735 del self._createDeferreds[avatarId]
736
737 d.callback(avatarId)
738
740 """
741 Notify the caller that a create has failed, and remove the create
742 from the list of pending creates.
743 """
744 self.debug('create deferred failed for %s' % avatarId)
745 if not avatarId in self._createDeferreds:
746 self.warning('No create deferred registered for %s' % avatarId)
747 return
748
749 d = self._createDeferreds[avatarId]
750 del self._createDeferreds[avatarId]
751 d.errback(exception)
752
754 """
755 Check if a deferred create has been registered for the given avatarId.
756 """
757 return avatarId in self._createDeferreds
758
760 """
761 Create and register a deferred for notifying the worker of a
762 clean job shutdown. This deferred will be fired when the job is
763 reaped.
764
765 @rtype: L{twisted.internet.defer.Deferred}
766 """
767 self.debug('making shutdown deferred for %s' % avatarId)
768
769 if avatarId in self._shutdownDeferreds:
770 self.warning('already have a shutdown deferred for %s',
771 avatarId)
772 return self._shutdownDeferreds[avatarId]
773 else:
774 self.debug('registering deferredShutdown for %s', avatarId)
775 d = defer.Deferred()
776 self._shutdownDeferreds[avatarId] = d
777 return d
778
780 """
781 Trigger a previously registered deferred for creating up the given
782 component.
783 """
784 self.debug('triggering shutdown deferred for %s', avatarId)
785 if not avatarId in self._shutdownDeferreds:
786 self.warning('No shutdown deferred registered for %s', avatarId)
787 return
788
789 d = self._shutdownDeferreds.pop(avatarId)
790 d.callback(avatarId)
791
793 """
794 Check if a deferred shutdown has been registered for the given avatarId.
795 """
796 return avatarId in self._shutdownDeferreds
797
798
799 - def feedToFD(self, componentId, feedName, fd, eaterId):
800 """
801 Called from the FeedAvatar to pass a file descriptor on to
802 the job running the component for this feeder.
803
804 @returns: whether the fd was successfully handed off to the component.
805 """
806 if componentId not in self.jobHeaven.avatars:
807 self.warning("No such component %s running", componentId)
808 return False
809
810 avatar = self.jobHeaven.avatars[componentId]
811 return avatar.sendFeed(feedName, fd, eaterId)
812
813 - def eatFromFD(self, componentId, feedId, fd):
814 """
815 Called from the FeedAvatar to pass a file descriptor on to
816 the job running the given component.
817
818 @returns: whether the fd was successfully handed off to the component.
819 """
820 if componentId not in self.jobHeaven.avatars:
821 self.warning("No such component %s running", componentId)
822 return False
823
824 avatar = self.jobHeaven.avatars[componentId]
825 return avatar.receiveFeed(feedId, fd)
826
828 """
829 I am a Realm inside the worker for forked jobs to log in to.
830 """
831 implements(portal.IRealm)
832
834 """
835 @type root: L{flumotion.worker.worker.JobHeaven}
836 """
837 self._root = root
838
839
840
841
842
843
851
853 """
854 I am an avatar for the job living in the worker.
855 """
856 logCategory = 'job-avatar'
857
859 """
860 @type heaven: L{flumotion.worker.worker.JobHeaven}
861 @type avatarId: str
862 """
863 self.avatarId = avatarId
864 self.logName = avatarId
865 self._heaven = heaven
866 self._mind = None
867 self.debug("created new JobAvatar")
868
870 """
871 Check if the avatar has a remote reference to the peer.
872
873 @rtype: boolean
874 """
875 return self._mind != None
876
878 """
879 @param mind: reference to the job's JobMedium on which we can call
880 @type mind: L{twisted.spread.pb.RemoteReference}
881
882 I am scheduled from the dispatcher's requestAvatar method.
883 """
884 self._mind = mind
885 self.log('Client attached mind %s' % mind)
886 host = self._heaven.brain.managerHost
887 port = self._heaven.brain.managerPort
888 transport = self._heaven.brain.managerTransport
889
890 kid = self._heaven.brain.kindergarten.getKid(self.avatarId)
891
892 d = self._mind.callRemote('bootstrap', self._heaven.getWorkerName(),
893 host, port, transport, self._heaven.getAuthenticator(), kid.bundles)
894
895 yield d
896 d.value()
897
898 self.debug(
899 "asking job to create component with avatarId %s, type %s" % (
900 kid.avatarId, kid.type))
901 d = self._mind.callRemote('create', kid.avatarId, kid.type,
902 kid.moduleName, kid.methodName, kid.nice)
903
904 yield d
905 try:
906 d.value()
907 self.debug('job started component with avatarId %s' % kid.avatarId)
908 self._heaven.brain.deferredCreateTrigger(kid.avatarId)
909 except errors.ComponentCreateError, e:
910 self.warning('could not create component %s of type %s: %r'
911 % (kid.avatarId, kid.type, e))
912 self._heaven.brain.deferredCreateFailed(kid.avatarId, e)
913 except Exception, e:
914 self.warning('unhandled remote error: type %s, message %s'
915 % (e.__class__.__name__, e))
916 self._heaven.brain.deferredCreateFailed(kid.avatarId, e)
917 attached = defer_generator_method(attached)
918
920 self.log('logout called, %s disconnected' % self.avatarId)
921 self._mind = None
922
924 """
925 returns: a deferred marking completed stop.
926 """
927 self.debug('stopping %s' % self.avatarId)
928 if not self._mind:
929 return defer.succeed(None)
930
931 return self._mind.callRemote('stop')
932
935
936 - def logTo(self, stdout, stderr):
937 """
938 Tell the feeder to log to the given file descriptors.
939 """
940 self.debug('Giving job new stdout and stderr')
941 if self._mind:
942 try:
943 self._mind.broker.transport.sendFileDescriptor(
944 stdout, "redirectStdout")
945 self._mind.broker.transport.sendFileDescriptor(
946 stderr, "redirectStderr")
947 except exceptions.RuntimeError, e:
948
949
950 self.debug("We got a Runtime Error %s sending file descriptors.",
951 log.getExceptionMessage(e))
952 return False
953
954 - def sendFeed(self, feedName, fd, eaterId):
955 """
956 Tell the feeder to send the given feed to the given fd.
957
958 @returns: whether the fd was successfully handed off to the component.
959 """
960 self.debug('Sending FD %d to component job to feed %s to fd' % (
961 fd, feedName))
962
963
964
965
966
967 if self._mind:
968 try:
969 self._mind.broker.transport.sendFileDescriptor(
970 fd, "sendFeed %s %s" % (feedName, eaterId))
971 return True
972 except exceptions.RuntimeError, e:
973
974
975 self.debug("We got a Runtime Error %s sending file descriptors.",
976 log.getExceptionMessage(e))
977 return False
978 self.debug('my mind is gone, trigger disconnect')
979 return False
980
981
983 """
984 Tell the feeder to receive the given feed from the given fd.
985
986 @returns: whether the fd was successfully handed off to the component.
987 """
988 self.debug('Sending FD %d to component job to eat %s from fd' % (
989 fd, feedId))
990 try:
991 self._mind.broker.transport.sendFileDescriptor(
992 fd, "receiveFeed %s" % feedId)
993 return True
994 except exceptions.RuntimeError, e:
995
996
997 self.debug("We got a Runtime Error %s sending file descriptors.",
998 log.getExceptionMessage(e))
999 return False
1000
1002 """
1003 This notification from the job process will be fired when it is
1004 shutting down, so that although the process might still be
1005 around, we know it's OK to accept new start requests for this
1006 avatar ID.
1007 """
1008 self.info("component %s shutting down cleanly", self.avatarId)
1009 self._heaven.brain.deferredShutdown(self.avatarId)
1010
1011
1013 """
1014 I am similar to but not quite the same as a manager-side Heaven.
1015 I manage avatars inside the worker for job processes spawned by the worker.
1016
1017 @ivar avatars: dict of avatarId -> avatar
1018 @type avatars: dict of str -> L{JobAvatar}
1019 @ivar brain: the worker brain
1020 @type brain: L{WorkerBrain}
1021 """
1022 logCategory = "job-heaven"
1024 """
1025 @type brain: L{WorkerBrain}
1026 """
1027 self.avatars = {}
1028 self.brain = brain
1029
1030 handler = signal.signal(signal.SIGHUP, self._HUPHandler)
1031 if handler == signal.SIG_DFL or handler == signal.SIG_IGN:
1032 self._oldHUPHandler = None
1033 else:
1034 self._oldHUPHandler = handler
1035
1037 if self._oldHUPHandler:
1038 self.log('got SIGHUP, calling previous handler %r',
1039 self._oldHUPHandler)
1040 self._oldHUPHandler(signum, frame)
1041 self.debug('telling kids about new log file descriptors')
1042 for avatar in self.avatars.values():
1043 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
1044
1049
1051 if avatarId not in self.avatars:
1052 self.warning("some programmer is telling me about an avatar "
1053 "I have no idea about: %s", avatarId)
1054 else:
1055 return self.avatars.pop(avatarId)
1056
1058 self.debug('Shutting down JobHeaven')
1059 self.debug('Stopping all jobs')
1060 dl = defer.DeferredList([x.stop() for x in self.avatars.values()])
1061 dl.addCallback(lambda result: self.debug('Stopped all jobs'))
1062 return dl
1063
1065 """
1066 Gets the authenticator that the worker used to log in to the manager.
1067
1068 @rtype: L{flumotion.twisted.pb.Authenticator}
1069 """
1070 return self.brain.authenticator
1071
1073 """
1074 Gets the name of the worker that spawns the process.
1075
1076 @rtype: str
1077 """
1078 return self.brain.workerName
1079