1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 the job-side half of the worker-job connection
24 """
25
26 import os
27 import resource
28 import sys
29
30
31
32
33
34
35 from twisted.cred import credentials
36 from twisted.internet import reactor, defer
37 from twisted.python import failure
38 from twisted.spread import pb
39
40 from flumotion.common import config, errors, interfaces, log, registry, keycards
41 from flumotion.common import medium, package
42 from flumotion.common.reflectcall import createComponent
43 from flumotion.component import component
44
45 from flumotion.twisted import fdserver
46 from flumotion.twisted import pb as fpb
47 from flumotion.twisted import defer as fdefer
48
49 from flumotion.twisted.defer import defer_generator_method
50 from flumotion.twisted.compat import implements
51
53 """
54 I am a medium between the job and the worker's job avatar.
55 I live in the job process.
56
57 @cvar component: the component this is a medium for; created as part of
58 L{remote_create}
59 @type component: L{flumotion.component.component.BaseComponent}
60 """
61 logCategory = 'jobmedium'
62 remoteLogName = 'jobavatar'
63
64 implements(interfaces.IJobMedium)
65
67 self.avatarId = None
68 self.logName = None
69 self.component = None
70
71 self._workerName = None
72 self._managerHost = None
73 self._managerPort = None
74 self._managerTransport = None
75 self._managerKeycard = None
76 self._componentClientFactory = None
77
78 self._hasStoppedReactor = False
79
80
81 - def remote_bootstrap(self, workerName, host, port, transport, authenticator,
82 packagePaths):
83 """
84 I receive the information on how to connect to the manager. I also set
85 up package paths to be able to run the component.
86
87 Called by the worker's JobAvatar.
88
89 @param workerName: the name of the worker running this job
90 @type workerName: str
91 @param host: the host that is running the manager
92 @type host: str
93 @param port: port on which the manager is listening
94 @type port: int
95 @param transport: 'tcp' or 'ssl'
96 @type transport: str
97 @param authenticator: remote reference to the worker-side authenticator
98 @type authenticator: L{twisted.spread.pb.RemoteReference} to a
99 L{flumotion.twisted.pb.Authenticator}
100 @param packagePaths: ordered list of
101 (package name, package path) tuples
102 @type packagePaths: list of (str, str)
103 """
104 assert isinstance(workerName, str)
105 assert isinstance(host, str)
106 assert isinstance(port, int)
107 assert transport in ('ssl', 'tcp')
108 assert isinstance(authenticator, pb.RemoteReference)
109 assert isinstance(packagePaths, list)
110
111 self._workerName = workerName
112 self._managerHost = host
113 self._managerPort = port
114 self._managerTransport = transport
115 self._authenticator = fpb.RemoteAuthenticator(authenticator)
116
117 packager = package.getPackager()
118 for name, path in packagePaths:
119 self.debug('registering package path for %s' % name)
120 self.log('... from path %s' % path)
121 packager.registerPackagePath(path, name)
122
123 - def remote_create(self, avatarId, type, moduleName, methodName, nice=0):
124 """
125 I am called on by the worker's JobAvatar to create a component.
126
127 @param avatarId: avatarId for component to log in to manager
128 @type avatarId: str
129 @param type: type of component to start
130 @type type: str
131 @param moduleName: name of the module to create the component from
132 @type moduleName: str
133 @param methodName: the factory method to use to create the component
134 @type methodName: str
135 @param nice: the nice level
136 @type nice: int
137 """
138 self.avatarId = avatarId
139 self.logName = avatarId
140
141 self.component = self._createComponent(avatarId, type, moduleName,
142 methodName, nice)
143 self.component.setShutdownHook(self._componentStopped)
144
148
155
170
171
173 """
174 Shut down the job process completely, cleaning up the component
175 so the reactor can be left from.
176 """
177 if self._hasStoppedReactor:
178 self.debug("Not stopping reactor again, already shutting down")
179 else:
180 self._hasStoppedReactor = True
181 self.info("Stopping reactor in job process")
182 reactor.stop()
183
185 if not nice:
186 return
187
188 try:
189 os.nice(nice)
190 except OSError, e:
191 self.warning('Failed to set nice level: %s' % str(e))
192 else:
193 self.debug('Nice level set to %d' % nice)
194
196 soft, hard = resource.getrlimit(resource.RLIMIT_CORE)
197 if hard != resource.RLIM_INFINITY:
198 self.warning('Could not set unlimited core dump sizes, '
199 'setting to %d instead' % hard)
200 else:
201 self.debug('Enabling core dumps of unlimited size')
202
203 resource.setrlimit(resource.RLIMIT_CORE, (hard, hard))
204
206 """
207 Create a component of the given type.
208 Log in to the manager with the given avatarId.
209
210 @param avatarId: avatarId component will use to log in to manager
211 @type avatarId: str
212 @param type: type of component to start
213 @type type: str
214 @param moduleName: name of the module that contains the entry point
215 @type moduleName: str
216 @param methodName: name of the factory method to create the component
217 @type methodName: str
218 @param nice: the nice level to run with
219 @type nice: int
220 """
221 self.info('Creating component "%s" of type "%s"' % (avatarId, type))
222
223
224
225 self._setNice(nice)
226 self._enableCoreDumps()
227
228 try:
229 comp = createComponent(moduleName, methodName)
230 except Exception, e:
231 msg = "Exception %s during createComponent: %s" % (
232 e.__class__.__name__, " ".join(e.args))
233
234
235 if isinstance(e, errors.ComponentCreateError):
236 msg = e.args[0]
237 self.warning(
238 "raising ComponentCreateError(%s) and stopping job" % msg)
239
240
241
242
243
244
245
246 reactor.callLater(0.1, self.shutdown)
247 raise errors.ComponentCreateError(msg)
248
249 comp.setWorkerName(self._workerName)
250
251
252 self.debug('creating ComponentClientFactory')
253 managerClientFactory = component.ComponentClientFactory(comp)
254 self._componentClientFactory = managerClientFactory
255 self.debug('created ComponentClientFactory %r' % managerClientFactory)
256 self._authenticator.avatarId = avatarId
257 managerClientFactory.startLogin(self._authenticator)
258
259 host = self._managerHost
260 port = self._managerPort
261 transport = self._managerTransport
262 self.debug('logging in with authenticator %r' % self._authenticator)
263 if transport == "ssl":
264 from twisted.internet import ssl
265 self.info('Connecting to manager %s:%d with SSL' % (host, port))
266 reactor.connectSSL(host, port, managerClientFactory,
267 ssl.ClientContextFactory())
268 elif transport == "tcp":
269 self.info('Connecting to manager %s:%d with TCP' % (host, port))
270 reactor.connectTCP(host, port, managerClientFactory)
271 else:
272 self.warning('Unknown transport protocol %s' % self._managerTransport)
273
274 return comp
275
277 """
278 A pb.Broker subclass that handles FDs being passed (with associated data)
279 over the same connection as the normal PB data stream.
280 When an FD is seen, the FD should be added to a given eater or feeder
281 element.
282 """
283 - def __init__(self, connectionClass, **kwargs):
284 """
285 @param connectionClass: a subclass of L{twisted.internet.tcp.Connection}
286 """
287 pb.Broker.__init__(self, **kwargs)
288
289 self._connectionClass = connectionClass
290
292
293 self.debug('received fds %r, message %r' % (fds, message))
294 if message.startswith('sendFeed '):
295 def parseargs(_, feedName, eaterId=None):
296 return feedName, eaterId
297 feedName, eaterId = parseargs(*message.split(' '))
298 self.factory.medium.component.feedToFD(feedName, fds[0],
299 os.close, eaterId)
300 elif message.startswith('receiveFeed '):
301 feedId = message[len('receiveFeed '):]
302 self.factory.medium.component.eatFromFD(feedId, fds[0])
303 elif message == 'redirectStdout':
304 self.debug('told to rotate stdout to fd %d', fds[0])
305 os.dup2(fds[0], sys.stdout.fileno())
306 os.close(fds[0])
307 self.debug('rotated stdout')
308 elif message == 'redirectStderr':
309 self.debug('told to rotate stderr to fd %d', fds[0])
310 os.dup2(fds[0], sys.stderr.fileno())
311 os.close(fds[0])
312 self.info('rotated stderr')
313 else:
314 self.warning('Unknown message received: %r' % message)
315
317 """
318 I am a client factory that logs in to the WorkerBrain.
319 I live in the flumotion-job process spawned by the worker.
320
321 @cvar medium: the medium for the JobHeaven to access us through
322 @type medium: L{JobMedium}
323 """
324 logCategory = "job"
325 perspectiveInterface = interfaces.IJobMedium
326
328 """
329 @param id: the avatar id used for logging into the workerbrain
330 @type id: str
331 """
332 pb.PBClientFactory.__init__(self)
333
334 self.medium = JobMedium()
335 self.logName = id
336 self.login(id)
337
338
339 self.protocol = JobClientBroker
340
341
346
347
348 - def login(self, username):
363
364 login = defer_generator_method(login)
365
366
367
368
369
374