1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import signal
24
25 from twisted.python import failure
26 import twisted.copyright
27 from twisted.internet import reactor, protocol, defer
28 from flumotion.common import log as flog
29
30 """
31 Framework for writing automated integration tests.
32
33 This module provides a way of writing automated integration tests from
34 within Twisted's unit testing framework, trial. Test cases are
35 constructed as subclasses of the normal trial
36 L{twisted.trial.unittest.TestCase} class.
37
38 Integration tests look like normal test methods, except that they are
39 decorated with L{integration.test}, take an extra "plan" argument, and
40 do not return anything. For example:
41
42 from twisted.trial import unittest
43 from flumotion.twisted import integration
44
45 class IntegrationTestExample(unittest.TestCase):
46 @integration.test
47 def testEchoFunctionality(self, plan):
48 process = plan.spawn('echo', 'hello world')
49 plan.wait(process, 0)
50
51 This example will spawn a process, as if you typed "echo 'hello world'"
52 at the shell prompt. It then waits for the process to exit, expecting
53 the exit status to be 0.
54
55 The example illustrates two of the fundamental plan operators, spawn and
56 wait. "spawn" spawns a process. "wait" waits for a process to finish.
57 The other operators are "spawnPar", which spawns a number of processes
58 in parallel, "waitPar", which waits for a number of processes in
59 parallel, and "kill", which kills one or more processes via SIGTERM and
60 then waits for them to exit.
61
62 It is evident that this framework is most appropriate for testing the
63 integration of multiple processes, and is not suitable for in-process
64 tests. The plan that is built up is only executed after the test method
65 exits, via the L{integration.test} decorator; the writer of the
66 integration test does not have access to the plan's state.
67
68 Note that all process exits must be anticipated. If at any point the
69 integration tester receives SIGCHLD, the next operation must be a wait
70 for that process. If this is not the case, the test is interpreted as
71 having failed.
72
73 Also note that while the test is running, the stdout and stderr of each
74 spawned process is redirected into log files in a subdirectory of where
75 the test is located. For example, in the previous example, the following
76 files will be created:
77
78 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout
79 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr
80
81 In the case that multiple echo commands are run in the same plan, the
82 subsequent commands will be named as echo-1, echo-2, and the like. Upon
83 successful completion of the test case, the log directory will be
84 deleted.
85 """
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp()
108
109 -def log(format, *args):
110 flog.doLog(flog.LOG, None, 'integration', format, args, -2)
111 -def debug(format, *args):
112 flog.doLog(flog.DEBUG, None, 'integration', format, args, -2)
113 -def info(format, *args):
114 flog.doLog(flog.INFO, None, 'integration', format, args, -2)
116 flog.doLog(flog.WARN, None, 'integration', format, args, -2)
117 -def error(format, *args):
118 flog.doLog(flog.ERROR, None, 'integration', format, args, -2)
119
121 if os.sep in executable:
122 if os.access(os.path.abspath(executable), os.X_OK):
123 return os.path.abspath(executable)
124 elif os.getenv('PATH'):
125 for path in os.getenv('PATH').split(os.pathsep):
126 if os.access(os.path.join(path, executable), os.X_OK):
127 return os.path.join(path, executable)
128 raise CommandNotFoundException(executable)
129
131 - def __init__(self, process, expectedCode, actualCode):
132 Exception.__init__(self)
133 self.process = process
134 self.expected = expectedCode
135 self.actual = actualCode
137 return ('Expected exit code %r from %r, but got %r'
138 % (self.expected, self.process, self.actual))
139
145 return 'The process %r exited prematurely.' % self.process
146
152 return 'Command %r not found in the PATH.' % self.command
153
156 Exception.__init__(self)
157 self.processes = processes
159 return ('Processes still running at end of test: %r'
160 % (self.processes,))
161
166
168 return ('Timed out waiting for %r to exit with status %r'
169 % (self.process, self.status))
170
173 self.exitDeferred = defer.Deferred()
174 self.timedOut = False
175
177 return self.exitDeferred
178
179 - def timeout(self, process, status):
183
192
194 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED'
195
196 - def __init__(self, name, argv, testDir):
197 self.name = name
198 self.argv = (_which(argv[0]),) + argv[1:]
199 self.testDir = testDir
200
201 self.pid = None
202 self.protocol = None
203 self.state = self.NOT_STARTED
204 self._timeoutDC = None
205
206 log('created process object %r', self)
207
209 assert self.state == self.NOT_STARTED
210
211 self.protocol = ProcessProtocol()
212
213 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w')
214 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w')
215
216 childFDs = {1: stdout.fileno(), 2: stderr.fileno()}
217
218
219
220
221
222
223
224
225
226
227
228
229 info('spawning process %r, argv=%r', self, self.argv)
230 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL)
231 env = dict(os.environ)
232 env['FLU_DEBUG'] = '5'
233 process = reactor.spawnProcess(self.protocol, self.argv[0],
234 env=env, args=self.argv,
235 childFDs=childFDs)
236 signal.signal(signal.SIGTERM, termHandler)
237
238 stdout.close()
239 stderr.close()
240
241
242
243
244 self.pid = process.pid
245 self.state = self.STARTED
246
247 def got_exit(res):
248 self.state = self.STOPPED
249 info('process %r has stopped', self)
250 return res
251 self.protocol.getDeferred().addCallback(got_exit)
252
253 - def kill(self, sig=signal.SIGTERM):
254 assert self.state == self.STARTED
255 info('killing process %r, signal %d', self, sig)
256 os.kill(self.pid, sig)
257
258 - def wait(self, status, timeout=20):
268 d.addCallback(got_exit)
269 if self.state == self.STARTED:
270 self._timeoutDC = reactor.callLater(timeout,
271 self.protocol.timeout,
272 self,
273 status)
274 def cancel_timeout(res):
275 debug('cancelling timeout for %r', self)
276 if self._timeoutDC.active():
277 self._timeoutDC.cancel()
278 return res
279 d.addCallbacks(cancel_timeout, cancel_timeout)
280 return d
281
283 return '<Process %s in state %s>' % (self.name, self.state)
284
286
287
289 self.processes = []
290 self.timeout = 20
291
292 - def spawn(self, process):
297
303
304 - def kill(self, process):
308
309 - def wait(self, process, exitCode):
310 assert process in self.processes
311 def remove_from_processes_list(_):
312 self.processes.remove(process)
313 d = process.wait(exitCode, timeout=self.timeout)
314 d.addCallback(remove_from_processes_list)
315 return d
316
331 p.protocol.processEnded = callbacker(d)
332 p.kill(sig=signal.SIGKILL)
333 d = defer.DeferredList(dlist)
334 def error(_):
335 if failure:
336 return failure
337 else:
338 raise e
339 d.addCallback(error)
340 return d
341 return failure
342
343 - def run(self, ops, timeout=20):
344 self.timeout = timeout
345 d = defer.Deferred()
346 def run_op(_, op):
347
348
349 return op[0](*op[1:])
350 for op in ops:
351 d.addCallback(run_op, op)
352 d.addCallbacks(lambda _: self._checkProcesses(failure=None),
353 lambda failure: self._checkProcesses(failure=failure))
354
355
356
357
358
359 reactor.callLater(0, d.callback, None)
360 return d
361
363 - def __init__(self, testCase, testName):
364 self.name = testName
365 self.testCaseName = testCase.__class__.__name__
366 self.processes = {}
367 self.outputDir = self._makeOutputDir(os.getcwd())
368
369
370
371 self.vm = PlanExecutor()
372 self.ops = []
373 self.timeout = 20
374
376
377 try:
378 os.mkdir(testDir)
379 except OSError:
380 pass
381 tail = '%s-%s' % (self.testCaseName, self.name)
382 outputDir = os.path.join(testDir, tail)
383 os.mkdir(outputDir)
384 return outputDir
385
387 for root, dirs, files in os.walk(self.outputDir, topdown=False):
388 for name in files:
389 os.remove(os.path.join(root, name))
390 for name in dirs:
391 os.rmdir(os.path.join(root, name))
392 os.rmdir(self.outputDir)
393 self.outputDir = None
394
405
408
411
412 - def spawn(self, command, *args):
416
418 processes = []
419 self._appendOp(self.vm.checkExits, ())
420 for argv in argvs:
421 assert isinstance(argv, tuple), \
422 'all arguments to spawnPar must be tuples'
423 for arg in argv:
424 assert isinstance(arg, str), \
425 'all subarguments to spawnPar must be strings'
426 processes.append(self._allocProcess(argv))
427 for process in processes:
428 self._appendOp(self.vm.spawn, process)
429 return tuple(processes)
430
431 - def wait(self, process, status):
433
434 - def waitPar(self, *processStatusPairs):
439
440 - def kill(self, process, status=None):
444
449
451 testName = proc.__name__
452 def wrappedtest(self):
453 plan = Plan(self, testName)
454 proc(self, plan)
455 if twisted.copyright.version < '2.0':
456
457 info('using deferredResult for old trial')
458 from twisted.trial import unittest
459 return unittest.deferredResult(plan.execute())
460 else:
461 return plan.execute()
462 try:
463 wrappedtest.__name__ = testName
464 except Exception:
465
466 pass
467
468
469 wrappedtest.timeout = 666
470 return wrappedtest
471