1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import os
18 import glob
19 import time
20
21 from flumotion.configure import configure
22 from flumotion.common import common, errors, log
23
24 """
25 Servicer object used in service scripts
26 """
28 """
29 I manage running managers and workers on behalf of a service script.
30 """
31
32 logCategory = 'servicer'
33
34 - def __init__(self, configDir=None, logDir=None, runDir=None):
35 """
36 @type configDir: string
37 @param configDir: overridden path to the configuration directory.
38 @type logDir: string
39 @param logDir: overridden path to the log directory.
40 @type runDir: string
41 @param runDir: overridden path to the run directory.
42 """
43 self.managersDir = os.path.join(configure.configdir, 'managers')
44 self.workersDir = os.path.join(configure.configdir, 'workers')
45 self._overrideDir = {
46 'logdir': logDir,
47 'rundir': runDir,
48 }
49
51
52
53 managers = []
54 workers = []
55
56 if not args:
57 managers = self.getManagers().keys()
58 managers.sort()
59 workers = self.getWorkers()
60 workers.sort()
61 return (managers, workers)
62
63 which = args[0]
64 if which not in ['manager', 'worker']:
65 raise errors.SystemError, 'Please specify either manager or worker'
66
67 if len(args) < 2:
68 raise errors.SystemError, 'Please specify which %s to %s' % (
69 which, command)
70
71 name = args[1]
72 if which == 'manager':
73 managers = self.getManagers()
74 if not managers.has_key(name):
75 raise errors.SystemError, 'No manager "%s"' % name
76 managers = [name, ]
77 elif which == 'worker':
78 workers = self.getWorkers()
79 if not name in workers:
80 raise errors.SystemError, 'No worker with name %s' % name
81 workers = [name, ]
82
83 return (managers, workers)
84
86 """
87 Return a list of override directories for configure.configure
88 suitable for appending to a command line.
89 """
90 args = []
91 for key, value in self._overrideDir.items():
92 if value:
93 args.append('--%s=%s' % (key, value))
94 return " ".join(args)
95
97 """
98 @returns: a dictionary of manager names -> flow names
99 """
100 managers = {}
101
102 self.log('getManagers()')
103 if not os.path.exists(self.managersDir):
104 return managers
105
106 for managerDir in glob.glob(os.path.join(self.managersDir, '*')):
107 flows = []
108
109 flowsDir = os.path.join(managerDir, 'flows')
110 if os.path.exists(flowsDir):
111 flowFiles = glob.glob(os.path.join(flowsDir, '*.xml'))
112 for flowFile in flowFiles:
113 filename = os.path.split(flowFile)[1]
114 name = filename.split(".xml")[0]
115 flows.append(name)
116 managerName = os.path.split(managerDir)[1]
117 self.log('Adding flows %r to manager %s' % (flows, managerName))
118 managers[managerName] = flows
119 self.log('returning managers: %r' % managers)
120 return managers
121
123 """
124 @returns: a list of worker names
125 """
126 workers = []
127
128 if not os.path.exists(self.workersDir):
129 return workers
130
131 for workerFile in glob.glob(os.path.join(self.workersDir, '*.xml')):
132 filename = os.path.split(workerFile)[1]
133 name = filename.split(".xml")[0]
134 workers.append(name)
135 workers.sort()
136 return workers
137
139 """
140 Start processes as given in the args.
141
142 If nothing specified, start all managers and workers.
143 If first argument is "manager", start given manager,
144 or all if none specified.
145 If first argument is "worker", start given worker,
146 or all if none specified.
147
148 @returns: an exit value reflecting the number of processes that failed
149 to start
150 """
151 (managers, workers) = self._parseManagersWorkers('start', args)
152 self.debug("Start managers %r and workers %r" % (managers, workers))
153 managersDict = self.getManagers()
154 exitvalue = 0
155
156 for name in managers:
157 if not self.startManager(name, managersDict[name]):
158 exitvalue += 1
159 for name in workers:
160 if not self.startWorker(name):
161 exitvalue += 1
162
163 return exitvalue
164
165 - def stop(self, args):
166 """
167 Stop processes as given in the args.
168
169 If nothing specified, stop all managers and workers.
170 If first argument is "manager", stop given manager,
171 or all if none specified.
172 If first argument is "worker", stop given worker,
173 or all if none specified.
174
175 @returns: an exit value reflecting the number of processes that failed
176 to stop
177 """
178 (managers, workers) = self._parseManagersWorkers('stop', args)
179 self.debug("Stop managers %r and workers %r" % (managers, workers))
180
181 exitvalue = 0
182
183 for name in workers:
184 if not self.stopWorker(name):
185 exitvalue += 1
186 for name in managers:
187 if not self.stopManager(name):
188 exitvalue += 1
189
190 return exitvalue
191
193 """
194 Give status on processes as given in the args.
195 """
196 (managers, workers) = self._parseManagersWorkers('status', args)
197 self.debug("Status managers %r and workers %r" % (managers, workers))
198 for type, list in [('manager', managers), ('worker', workers)]:
199 for name in list:
200 pid = common.getPid(type, name)
201 if not pid:
202 print "%s %s not running" % (type, name)
203 continue
204 if common.checkPidRunning(pid):
205 print "%s %s is running with pid %d" % (type, name, pid)
206 else:
207 print "%s %s dead (stale pid %d)" % (type, name, pid)
208
210 """
211 Clean up dead process pid files as given in the args.
212 """
213 (managers, workers) = self._parseManagersWorkers('clean', args)
214 self.debug("Clean managers %r and workers %r" % (managers, workers))
215 for type, list in [('manager', managers), ('worker', workers)]:
216 for name in list:
217 pid = common.getPid(type, name)
218 if not pid:
219
220 print "deleting bogus pid file for %s %s" % (type, name)
221 common.deletePidFile(type, name)
222 continue
223 if not common.checkPidRunning(pid):
224 self.debug("Cleaning up stale pid %d for %s %s" % (
225 pid, type, name))
226 print "deleting stale pid file for %s %s" % (type, name)
227 common.deletePidFile(type, name)
228
230
231
232
233
234 """
235 Create a default manager or worker config.
236 """
237 if len(args) == 0:
238 raise errors.SystemError, \
239 "Please specify 'manager' or 'worker' to create."
240 type = args[0]
241 if len(args) == 1:
242 raise errors.SystemError, \
243 "Please specify name of %s to create." % type
244 name = args[1]
245
246 port = 7531
247 if len(args) == 3:
248 port = int(args[2])
249
250 if type == 'manager':
251 self.createManager(name, port)
252 elif type == 'worker':
253 self.createWorker(name, managerPort=port, randomFeederports=True)
254 else:
255 raise errors.SystemError, \
256 "Please specify 'manager' or 'worker' to create."
257
259 """
260 Create a sample manager.
261
262 @returns: whether or not the config was created.
263 """
264 self.info("Creating manager %s" % name)
265 managerDir = os.path.join(self.managersDir, name)
266 if os.path.exists(managerDir):
267 raise errors.SystemError, \
268 "Manager directory %s already exists" % managerDir
269 os.makedirs(managerDir)
270
271 planetFile = os.path.join(managerDir, 'planet.xml')
272
273
274 handle = open(planetFile, 'w')
275 handle.write("""<planet>
276 <manager>
277 <debug>4</debug>
278 <host>localhost</host>
279 <port>%(port)d</port>
280 <transport>ssl</transport>
281 <!-- certificate path can be relative to $sysconfdir/flumotion,
282 or absolute -->
283 <!--
284 <certificate>default.pem</certificate>
285 -->
286 <component name="manager-bouncer" type="htpasswdcrypt-bouncer">
287 <property name="data"><![CDATA[
288 user:PSfNpHTkpTx1M
289 ]]></property>
290 </component>
291 </manager>
292 </planet>
293 """ % locals())
294 handle.close()
295
296
297 pemFile = os.path.join(configure.configdir, 'default.pem')
298 if not os.path.exists(pemFile):
299 os.system("%s %s" % (
300 os.path.join(configure.datadir, 'make-dummy-cert'), pemFile))
301
302 return True
303
304 - def createWorker(self, name, managerPort=7531, randomFeederports=False):
305 """
306 Create a sample worker.
307
308 @returns: whether or not the config was created.
309 """
310 os.makedirs(self.workersDir)
311 self.info("Creating worker %s" % name)
312 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
313 if os.path.exists(workerFile):
314 raise errors.SystemError, \
315 "Worker file %s already exists." % workerFile
316
317 feederports = " <!-- <feederports>8600-8639</feederports> -->"
318 if randomFeederports:
319 feederports = ' <feederports random="True" />'
320
321 handle = open(workerFile, 'w')
322 handle.write("""<worker>
323
324 <debug>4</debug>
325
326 <manager>
327 <host>localhost</host>
328 <port>%(managerPort)s</port>
329 </manager>
330
331 <authentication type="plaintext">
332 <username>user</username>
333 <password>test</password>
334 </authentication>
335
336 %(feederports)s
337
338 </worker>
339 """ % locals())
340 handle.close()
341
342 return True
343
344
346 """
347 Start the manager as configured in the manager directory for the given
348 manager name, together with the given flows.
349
350 @returns: whether or not the manager daemon started
351 """
352 self.info("Starting manager %s" % name)
353 self.debug("Starting manager with flows %r" % flowNames)
354 managerDir = os.path.join(self.managersDir, name)
355 planetFile = os.path.join(managerDir, 'planet.xml')
356 if not os.path.exists(planetFile):
357 raise errors.SystemError, \
358 "Planet file %s does not exist" % planetFile
359 self.info("Loading planet %s" % planetFile)
360
361 flowsDir = os.path.join(managerDir, 'flows')
362 flowFiles = []
363 for flowName in flowNames:
364 flowFile = os.path.join(flowsDir, "%s.xml" % flowName)
365 if not os.path.exists(flowFile):
366 raise errors.SystemError, \
367 "Flow file %s does not exist" % flowFile
368 flowFiles.append(flowFile)
369 self.info("Loading flow %s" % flowFile)
370
371 pid = common.getPid('manager', name)
372 if pid:
373 if common.checkPidRunning(pid):
374 raise errors.SystemError, \
375 "Manager %s is already running (with pid %d)" % (name, pid)
376 else:
377 raise errors.SystemError, \
378 "Manager %s is dead (stale pid %d)" % (name, pid)
379
380 dirOptions = self._getDirOptions()
381 command = "flumotion-manager %s -D --daemonize-to %s " \
382 "--service-name %s %s %s" % (
383 dirOptions, configure.daemondir, name, planetFile,
384 " ".join(flowFiles))
385 self.debug("starting process %s" % command)
386 retval = self.startProcess(command)
387
388 if retval == 0:
389 self.debug("Waiting for pid for manager %s" % name)
390 pid = common.waitPidFile('manager', name)
391 if pid:
392 self.info("Started manager %s with pid %d" % (name, pid))
393 return True
394 else:
395 self.warning("manager %s could not start" % name)
396 return False
397
398 self.warning("manager %s could not start (return value %d)" % (
399 name, retval))
400 return False
401
403 """
404 Start the worker as configured in the worker directory for the given
405 worker name.
406
407 @returns: whether or not the worker daemon started
408 """
409 self.info("Starting worker %s" % name)
410 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
411 if not os.path.exists(workerFile):
412 raise errors.SystemError, \
413 "Worker file %s does not exist" % workerFile
414
415 pid = common.getPid('worker', name)
416 if pid:
417 if common.checkPidRunning(pid):
418 raise errors.SystemError, \
419 "Worker %s is already running (with pid %d)" % (name, pid)
420 else:
421 raise errors.SystemError, \
422 "Worker %s is dead (stale pid %d)" % (name, pid)
423
424
425 self.info("Loading worker %s" % workerFile)
426
427 dirOptions = self._getDirOptions()
428 command = "flumotion-worker %s -D --daemonize-to %s " \
429 "--service-name %s %s" % (
430 dirOptions, configure.daemondir, name, workerFile)
431 self.debug("Running %s" % command)
432 retval = self.startProcess(command)
433
434 if retval == 0:
435 self.debug("Waiting for pid for worker %s" % name)
436 pid = common.waitPidFile('worker', name)
437 if pid:
438 self.info("Started worker %s with pid %d" % (name, pid))
439 return True
440 else:
441 self.warning("worker %s could not start" % name)
442 return False
443
444 self.warning("worker %s could not start (return value %d)" % (
445 name, retval))
446 return False
447
449 """
450 Start the given process and block.
451 Returns the exit status of the process, or -1 in case of another error.
452 """
453 status = os.system(command)
454 if os.WIFEXITED(status):
455 retval = os.WEXITSTATUS(status)
456 return retval
457
458
459 return -1
460
462 """
463 Stop the given manager if it is running.
464 """
465 self.info("Stopping manager %s" % name)
466 pid = common.getPid('manager', name)
467 if not pid:
468 return True
469
470
471 if not common.checkPidRunning(pid):
472 self.info("Manager %s is dead (stale pid %d)" % (name, pid))
473 return False
474
475 self.debug('Stopping manager %s with pid %d' % (name, pid))
476 if not self.stopProcess(pid):
477 return False
478
479 self.info('Stopped manager %s with pid %d' % (name, pid))
480 return True
481
483 """
484 Stop the given worker if it is running.
485 """
486 self.info("Stopping worker %s" % name)
487 pid = common.getPid('worker', name)
488 if not pid:
489 self.info("worker %s was not running" % name)
490 return True
491
492
493 if not common.checkPidRunning(pid):
494 self.info("Worker %s is dead (stale pid %d)" % (name, pid))
495 return False
496
497 self.debug('Stopping worker %s with pid %d' % (name, pid))
498 if not self.stopProcess(pid):
499 return False
500
501 self.info('Stopped worker %s with pid %d' % (name, pid))
502 return True
503
505 """
506 Stop the process with the given pid.
507 Wait until the pid has disappeared.
508 """
509 startClock = time.clock()
510 termClock = startClock + configure.processTermWait
511 killClock = termClock + configure.processKillWait
512
513 self.debug('stopping process with pid %d' % pid)
514 if not common.termPid(pid):
515 self.warning('No process with pid %d' % pid)
516 return False
517
518
519 while (common.checkPidRunning(pid)):
520 if time.clock() > termClock:
521 self.warning("Process with pid %d has not responded to TERM " \
522 "for %d seconds, killing" % (pid,
523 configure.processTermWait))
524 common.killPid(pid)
525 termClock = killClock + 1.0
526
527 if time.clock() > killClock:
528 self.warning("Process with pid %d has not responded to KILL " \
529 "for %d seconds, stopping" % (pid,
530 configure.processKillWait))
531 return False
532
533
534
535 return True
536
538 """
539 List all service parts managed.
540 """
541 managers = self.getManagers()
542 for name in managers.keys():
543 flows = managers[name]
544 print "manager %s" % name
545 if flows:
546 for flow in flows:
547 print " flow %s" % flow
548
549 workers = self.getWorkers()
550 for worker in workers:
551 print "worker %s" % worker
552