Trees | Indices | Help |
---|
|
1 # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 6 # Copyright (C) 2010,2011 Flumotion Services, S.A. 7 # All rights reserved. 8 # 9 # This file may be distributed and/or modified under the terms of 10 # the GNU Lesser General Public License version 2.1 as published by 11 # the Free Software Foundation. 12 # This file is distributed without any warranty; without even the implied 13 # warranty of merchantability or fitness for a particular purpose. 14 # See "LICENSE.LGPL" in the source distribution for more information. 15 # 16 # Headers in this file shall remain intact. 17 18 """ 19 worker-side objects to handle worker clients 20 """ 21 22 import os 23 import sys 24 import signal 25 26 from twisted.cred import portal 27 from twisted.internet import defer, reactor 28 from twisted.spread import pb 29 from zope.interface import implements 30 31 from flumotion.common import errors, log 32 from flumotion.common import worker, startset 33 from flumotion.common.process import signalPid 34 from flumotion.twisted import checkers, fdserver 35 from flumotion.twisted import pb as fpb 36 37 __version__ = "$Rev$" 38 39 JOB_SHUTDOWN_TIMEOUT = 5 40 4143 # FIXME: there is mkstemp for sockets, so we have a small window 44 # here in which the socket could be created by something else 45 # I didn't succeed in preparing a socket file with that name either 46 47 # caller needs to delete name before using 48 import tempfile 49 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 50 os.close(fd) 51 52 return name53 5456 """ 57 I hold information about a job. 58 59 @cvar pid: PID of the child process 60 @type pid: int 61 @cvar avatarId: avatar identification string 62 @type avatarId: str 63 @cvar type: type of the component to create 64 @type type: str 65 @cvar moduleName: name of the module to create the component from 66 @type moduleName: str 67 @cvar methodName: the factory method to use to create the component 68 @type methodName: str 69 @cvar nice: the nice level to run the job as 70 @type nice: int 71 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 72 create the component 73 @type bundles: list of (str, str) 74 """ 75 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 76 'nice', 'bundles') 7787 8880 self.pid = pid 81 self.avatarId = avatarId 82 self.type = type 83 self.moduleName = moduleName 84 self.methodName = methodName 85 self.nice = nice 86 self.bundles = bundles90131 13292 self._startSet = startSet 93 self._deferredStart = startSet.createRegistered(avatarId) 94 worker.ProcessProtocol.__init__(self, heaven, avatarId, 95 'component', 96 heaven.getWorkerName())9799 heaven = self.loggable 100 heaven.brain.callRemote('componentAddMessage', self.avatarId, 101 message)102104 heaven = self.loggable 105 dstarts = self._startSet 106 signum = status.value.signal 107 108 # we need to trigger a failure on the create deferred 109 # if the job failed before logging in to the worker; 110 # otherwise the manager still thinks it's starting up when it's 111 # dead. If the job already attached to the worker however, 112 # the create deferred will already have callbacked. 113 deferred = dstarts.createRegistered(self.avatarId) 114 if deferred is self._deferredStart: 115 if signum: 116 reason = "received signal %d" % signum 117 else: 118 reason = "unknown reason" 119 text = ("Component '%s' has exited early (%s)." % 120 (self.avatarId, reason)) 121 dstarts.createFailed(self.avatarId, 122 errors.ComponentCreateError(text)) 123 124 if dstarts.shutdownRegistered(self.avatarId): 125 dstarts.shutdownSuccess(self.avatarId) 126 127 heaven.jobStopped(self.pid) 128 129 # chain up 130 worker.ProcessProtocol.processEnded(self, status)134 """ 135 I am similar to but not quite the same as a manager-side Heaven. 136 I manage avatars inside the worker for job processes spawned by the worker. 137 138 @ivar avatars: dict of avatarId -> avatar 139 @type avatars: dict of str -> L{base.BaseJobAvatar} 140 @ivar brain: the worker brain 141 @type brain: L{worker.WorkerBrain} 142 """ 143 144 logCategory = "job-heaven" 145 implements(portal.IRealm) 146 147 avatarClass = None 148281 ret.addCallback(stopListening) 282 return ret 283150 """ 151 @param brain: a reference to the worker brain 152 @type brain: L{worker.WorkerBrain} 153 """ 154 self.avatars = {} # componentId -> avatar 155 self.brain = brain 156 self._socketPath = _getSocketPath() 157 self._port = None 158 self._onShutdown = None # If set, a deferred to fire when 159 # our last child process exits 160 161 self._jobInfos = {} # processid -> JobInfo 162 163 self._startSet = startset.StartSet( 164 lambda x: x in self.avatars, 165 errors.ComponentAlreadyStartingError, 166 errors.ComponentAlreadyRunningError)167169 assert self._port is None 170 assert self.avatarClass is not None 171 # FIXME: we should hand a username and password to log in with to 172 # the job process instead of allowing anonymous 173 checker = checkers.FlexibleCredentialsChecker() 174 checker.allowPasswordless(True) 175 p = portal.Portal(self, [checker]) 176 f = pb.PBServerFactory(p) 177 try: 178 os.unlink(self._socketPath) 179 except OSError: 180 pass 181 182 # Rather than a listenUNIX(), we use listenWith so that we can specify 183 # our particular Port, which creates Transports that we know how to 184 # pass FDs over. 185 self.debug("Listening for FD's on unix socket %s", self._socketPath) 186 187 # listenWith is deprecated but the function never did much anyway 188 # 189 # port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 190 port = fdserver.FDPort(self._socketPath, f, reactor=reactor) 191 port.startListening() 192 193 self._port = port194 195 ### portal.IRealm method 196198 if pb.IPerspective in interfaces: 199 avatar = self.avatarClass(self, avatarId, mind) 200 assert avatarId not in self.avatars 201 self.avatars[avatarId] = avatar 202 return pb.IPerspective, avatar, avatar.logout 203 else: 204 raise NotImplementedError("no interface")205207 if avatarId in self.avatars: 208 del self.avatars[avatarId] 209 else: 210 self.warning("some programmer is telling me about an avatar " 211 "I have no idea about: %s", avatarId)212214 """ 215 Gets the name of the worker that spawns the process. 216 217 @rtype: str 218 """ 219 return self.brain.workerName220 223 226 229231 return self._jobInfos.keys()232234 self.debug('telling kids about new log file descriptors') 235 for avatar in self.avatars.values(): 236 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())237239 if pid in self._jobInfos: 240 self.debug('Removing job info for %d', pid) 241 del self._jobInfos[pid] 242 243 if not self._jobInfos and self._onShutdown: 244 self.debug("Last child exited") 245 self._onShutdown.callback(None) 246 else: 247 self.warning("some programmer is telling me about a pid " 248 "I have no idea about: %d", pid)249251 self.debug('Shutting down JobHeaven') 252 self.debug('Stopping all jobs') 253 for avatar in self.avatars.values(): 254 avatar.stop() 255 256 if self.avatars: 257 # If our jobs fail to shut down nicely within some period of 258 # time, shut them down less nicely 259 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 260 261 def cancelDelayedCall(res, dc): 262 # be nice to unit tests 263 if dc.active(): 264 dc.cancel() 265 return res266 267 self._onShutdown = defer.Deferred() 268 self._onShutdown.addCallback(cancelDelayedCall, dc) 269 ret = self._onShutdown 270 else: 271 # everything's gone already, return success 272 ret = defer.succeed(None) 273 274 def stopListening(_): 275 # possible for it to be None, if we haven't been told to 276 # listen yet, as in some test cases 277 if self._port: 278 port = self._port 279 self._port = None 280 return port.stopListening()285 self.warning("Killing all children immediately") 286 for pid in self.getJobPids(): 287 self.killJobByPid(pid, signum)288290 if pid not in self._jobInfos: 291 raise errors.UnknownComponentError(pid) 292 293 jobInfo = self._jobInfos[pid] 294 self.debug("Sending signal %d to job %s at pid %d", signum, 295 jobInfo.avatarId, jobInfo.pid) 296 signalPid(jobInfo.pid, signum)297299 for job in self._jobInfos.values(): 300 if job.avatarId == avatarId: 301 self.killJobByPid(job.pid, signum)302 303305 """ 306 I am an avatar for the job living in the worker. 307 """ 308 logCategory = 'job-avatar' 309365311 """ 312 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 313 @type avatarId: str 314 """ 315 fpb.Avatar.__init__(self, avatarId) 316 self._heaven = heaven 317 self.setMind(mind) 318 self.pid = None319321 """ 322 @param mind: reference to the job's JobMedium on which we can call 323 @type mind: L{twisted.spread.pb.RemoteReference} 324 """ 325 fpb.Avatar.setMind(self, mind) 326 self.haveMind()327 331333 self.log('logout called, %s disconnected', self.avatarId) 334 335 self._heaven.removeAvatar(self.avatarId)336 342344 try: 345 # FIXME: pay attention to the return value of 346 # sendFileDescriptor; is the same as the return value of 347 # sendmsg(2) 348 self.mind.broker.transport.sendFileDescriptor(fd, message) 349 return True 350 except OSError, e: 351 # OSError is what is thrown by the C code doing this 352 # when there are issues 353 self.warning("Error %s sending file descriptors", 354 log.getExceptionMessage(e)) 355 return False356358 """ 359 Tell the job to log to the given file descriptors. 360 """ 361 self.debug('Giving job new stdout and stderr') 362 if self.mind: 363 self._sendFileDescriptor(stdout, "redirectStdout") 364 self._sendFileDescriptor(stdout, "redirectStderr")
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Sun May 10 13:41:11 2015 | http://epydoc.sourceforge.net |