1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Contains the base class for PB client-side mediums.
24 """
25
26 import time
27
28 from twisted.spread import pb
29 from twisted.internet import defer, reactor
30
31 from flumotion.twisted.defer import defer_generator_method
32 from flumotion.common import log, interfaces, bundleclient, errors, common
33 from flumotion.common import messages
34 from flumotion.configure import configure
35 from flumotion.twisted.compat import implements
36 from flumotion.twisted import pb as fpb
37
39 """
40 I am a base interface for PB clients interfacing with PB server-side
41 avatars.
42 Used by admin/worker/component to talk to manager's vishnu,
43 and by job to talk to worker's brain.
44
45 @ivar remote: a remote reference to the server-side object on
46 which perspective_(methodName) methods can be called
47 @type remote: L{twisted.spread.pb.RemoteReference}
48 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader}
49 """
50
51
52
53 implements(interfaces.IMedium)
54 logCategory = "basemedium"
55 remoteLogName = "baseavatar"
56
57 remote = None
58 bundleLoader = None
59
61 """
62 Set the given remoteReference as the reference to the server-side
63 avatar.
64
65 @param remoteReference: L{twisted.spread.pb.RemoteReference}
66 """
67 self.debug('%r.setRemoteReference: %r' % (self, remoteReference))
68 self.remote = remoteReference
69 def nullRemote(x):
70 self.debug('%r: disconnected from %r' % (self, self.remote))
71 self.remote = None
72 self.remote.notifyOnDisconnect(nullRemote)
73
74 self.bundleLoader = bundleclient.BundleLoader(self.remote)
75
76
77 tarzan = None
78 jane = None
79 try:
80 transport = remoteReference.broker.transport
81 tarzan = transport.getHost()
82 jane = transport.getPeer()
83 except Exception, e:
84 self.debug("could not get connection info, reason %r" % e)
85 if tarzan and jane:
86 self.debug("connection is from me on %s to manager on %s" % (
87 common.addressGetHost(tarzan),
88 common.addressGetHost(jane)))
89
91 """
92 Does the medium have a remote reference to a server-side avatar ?
93 """
94 return self.remote != None
95
97 """
98 Call the given method with the given arguments remotely on the
99 server-side avatar.
100
101 Gets serialized to server-side perspective_ methods.
102
103 @param level: the level we should log at (log.DEBUG, log.INFO, etc)
104 @type level: int
105 @param stackDepth: the number of stack frames to go back to get
106 file and line information, negative or zero.
107 @type stackDepth: non-positive int
108 @param name: name of the remote method
109 @type name: str
110 """
111 if level is not None:
112 debugClass = str(self.__class__).split(".")[-1].upper()
113 startArgs = [self.remoteLogName, debugClass, name]
114 format, debugArgs = log.getFormatArgs(
115 '%s --> %s: callRemote(%s, ', startArgs,
116 ')', (), args, kwargs)
117 logKwArgs = self.doLog(level, stackDepth - 1,
118 format, *debugArgs)
119
120 if not self.remote:
121 self.warning('Tried to callRemote(%s), but we are disconnected'
122 % name)
123 return defer.fail(errors.NotConnectedError())
124
125 def callback(result):
126 format, debugArgs = log.getFormatArgs(
127 '%s <-- %s: callRemote(%s, ', startArgs,
128 '): %s', (log.ellipsize(result), ), args, kwargs)
129 self.doLog(level, -1, format, *debugArgs, **logKwArgs)
130 return result
131
132 def errback(failure):
133 format, debugArgs = log.getFormatArgs(
134 '%s <-- %s: callRemote(%s, ', startArgs,
135 '): %r', (failure, ), args, kwargs)
136 self.doLog(level, -1, format, *debugArgs, **logKwArgs)
137 return failure
138
139 d = self.remote.callRemote(name, *args, **kwargs)
140 if level is not None:
141 d.addCallbacks(callback, errback)
142 return d
143
145 """
146 Call the given method with the given arguments remotely on the
147 server-side avatar.
148
149 Gets serialized to server-side perspective_ methods.
150 """
151 return self.callRemoteLogging(log.DEBUG, -1, name, *args,
152 **kwargs)
153
155 """
156 Runs the given function in the given module with the given arguments.
157
158 If we can't find the bundle for the given module, or if the
159 given module does not contain the requested function, we will
160 raise L{flumotion.common.errors.RemoteRunError} (perhaps a
161 poorly chosen error). If importing the module or running the
162 function raises an exception, that exception will be passed
163 through unmodified.
164
165 Callers that expect to return their result over a PB connection
166 should catch nonserializable exceptions so as to prevent nasty
167 backtraces in the logs.
168
169 @param module: module the function lives in
170 @type module: str
171 @param function: function to run
172 @type function: str
173
174 @returns: the return value of the given function in the module.
175 """
176 self.debug('remote runFunction(%r, %r)' % (module, function))
177 d = self.bundleLoader.loadModule(module)
178 yield d
179
180 try:
181 mod = d.value()
182 except errors.NoBundleError:
183 msg = 'Failed to find bundle for module %s' % module
184 self.warning(msg)
185 raise errors.RemoteRunError(msg)
186 except Exception, e:
187 self.warning('Exception raised while loading bundle for '
188 'module %s: %s', module, e)
189 raise
190
191 try:
192 proc = getattr(mod, function)
193 except AttributeError:
194 msg = 'No procedure named %s in module %s' % (function, module)
195 self.warning(msg)
196 raise errors.RemoteRunError(msg)
197
198 try:
199 self.debug('calling %s.%s(%r, %r)' % (
200 module, function, args, kwargs))
201 d = proc(*args, **kwargs)
202 except Exception, e:
203 self.warning('Exception raised while calling '
204 '%s.%s(*args=%r, **kwargs=%r): %s',
205 module, function, args, kwargs,
206 log.getExceptionMessage(e))
207 raise
208
209
210
211 yield d
212
213
214 try:
215 yield d.value()
216 except Exception, e:
217 self.warning('Deferred failure from '
218 '%s.%s(*args=%r, **kwargs=%r): %s',
219 module, function, args, kwargs,
220 log.getExceptionMessage(e))
221 raise
222 runBundledFunction = defer_generator_method(runBundledFunction)
223
257
269 if self._pingCheckDC:
270 self._pingCheckDC.cancel()
271 self._pingCheckDC = None
272
273 if self._pingDC:
274 self._pingDC.cancel()
275 self._pingDC = None
276
278 if self.remote:
279 self.remote.broker.transport.loseConnection()
280
286 self.remote.notifyOnDisconnect(stopPingingCb)
287
288 self.startPinging(self._disconnect)
289