1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import datetime
19 import cgi
20
21 from twisted.internet import defer, protocol, reactor
22 from twisted.python.util import InsensitiveDict
23 from twisted.web import http
24
25 from flumotion.common import log
26 from flumotion.common import errors
27 from flumotion.component.misc.httpserver.httpcached import common
28 from flumotion.component.misc.httpserver.httpcached import http_utils
29
30
31 LOG_CATEGORY = "stream-provider"
32
33 USER_AGENT = "FlumotionClient/0.1"
34
35
37 if ts:
38 return datetime.datetime.fromtimestamp(ts).isoformat()
39 return "???"
40
41
43 """
44 Provides information about a stream in a standard way.
45 The information is retrieved by parsing HTTP headers.
46 """
47
49 self.expires = None
50 self.mtime = None
51 self.length = 0
52 self.start = 0
53 self.size = 0
54 self.mimeType = None
55
56 headers = InsensitiveDict(headers)
57
58 encoding = headers.get("Transfer-Encoding", None)
59 if encoding == 'chunked':
60 raise errors.FlumotionError("Chunked transfer not supported")
61
62 expires = headers.get("Expires", None)
63 if expires is not None:
64 try:
65 self.expires = http.stringToDatetime(expires)
66 except:
67 self.expires = 0
68
69 lastmod = headers.get("Last-Modified", None)
70 if lastmod is not None:
71 self.mtime = http.stringToDatetime(lastmod)
72
73 range = headers.get("Content-Range", None)
74 length = headers.get("Content-Length", None)
75 if range is not None:
76 start, end, total = http.parseContentRange(range)
77 self.start = start
78 self.length = total
79 if length is not None:
80 self.size = int(length)
81 else:
82 self.size = end - start
83 elif length is not None:
84 self.length = int(length)
85 self.size = int(length)
86 else:
87 raise errors.FlumotionError("Can't get length/size from headers",
88 headers)
89
90 ctype = headers.get("Content-Type", None)
91 if ctype is not None:
92 self.mimeType, _pdict = cgi.parse_header(ctype)
93
94
96 """
97 Allows retrieval of data streams using HTTP 1.0.
98 """
99
100 logCategory = LOG_CATEGORY
101
102 - def __init__(self, connTimeout=0, idleTimeout=0):
103 self.connTimeout = connTimeout
104 self.idleTimeout = idleTimeout
105
106 - def retrieve(self, consumer, url, proxyAddress=None, proxyPort=None,
107 ifModifiedSince=None, ifUnmodifiedSince=None,
108 start=None, size=None):
109 self.log("Requesting %s%s%s%s%s%s",
110 size and (" %d bytes" % size) or "",
111 start and (" starting at %d" % start) or "",
112 (size or start) and " from " or "",
113 url.toString(),
114 ifModifiedSince and (" if modified since %s"
115 % ts2str(ifModifiedSince)) or "",
116 ifUnmodifiedSince and (" if not modified since %s"
117 % ts2str(ifUnmodifiedSince)) or "")
118
119 getter = StreamGetter(consumer, url,
120 ifModifiedSince, ifUnmodifiedSince,
121 start, size, self.idleTimeout)
122 getter.connect(proxyAddress, proxyPort, self.connTimeout)
123 return getter
124
125
126 -class StreamGetter(protocol.ClientFactory, http.HTTPClient, log.Loggable):
127 """
128 Retrieves a stream using HTTP 1.0.
129
130 This class is at the same time a Factory and a Protocol,
131 this can be done because it's a client and in twisted
132 client factories only create on protocol.
133
134 The outcome, the stream info and stream data is forwarded
135 to a common.StreamConsumer instance given at creating time.
136
137 It supports range requests and some conditional request types
138 (ifModified and ifUnmodified).
139 """
140
141 logCategory = LOG_CATEGORY
142
143 HTTP_METHOD = 'GET'
144
145 host = None
146 port = None
147
148 - def __init__(self, consumer, url,
149 ifModifiedSince=None, ifUnmodifiedSince=None,
150 start=None, size=None, timeout=0):
151 self.consumer = consumer
152 self.url = url
153
154 self.ifModifiedSince = ifModifiedSince
155 self.ifUnmodifiedSince = ifUnmodifiedSince
156
157 self.start = start
158 self.size = size
159 self.timeout = timeout
160
161 self.headers = {}
162 self.peer = None
163 self.status = None
164 self.info = None
165
166 self._connected = False
167 self._canceled = False
168 self._remaining = None
169 self._idlecheck = None
170
171 self.logName = common.log_id(self)
172
175
176
177
178 - def connect(self, proxyAddress=None, proxyPort=None, timeout=0):
191
193 if not self.paused and self.transport is not None:
194 self.pauseProducing()
195 self.log("Request paused for %s", self.url)
196
198 if self.paused and self.transport is not None:
199 self.resumeProducing()
200 self.log("Request resumed for %s", self.url)
201
203 if self._connected and self.transport is not None:
204 self.transport.loseConnection()
205 self._cancelIdleCheck()
206 self.log("Request canceled for %s", self.url)
207 self._canceled = True
208
209
210
212 assert self.peer is None, "Protocol already built"
213 self.peer = addr
214 return self
215
218
220 self.log("Connection made for %s", self.url)
221 self.sendCommand(self.HTTP_METHOD, self.url.location)
222 self.sendHeader('Host', self.url.host)
223 self.sendHeader('User-Agent', USER_AGENT)
224 self.sendHeader('Connection', "close")
225
226 if self.ifModifiedSince:
227 datestr = http.datetimeToString(self.ifModifiedSince)
228 self.sendHeader('If-Modified-Since', datestr)
229
230 if self.ifUnmodifiedSince:
231 datestr = http.datetimeToString(self.ifUnmodifiedSince)
232 self.sendHeader('If-Unmodified-Since', datestr)
233
234 if self.start or self.size:
235 start = self.start or 0
236 end = (self.size and (start + self.size - 1)) or None
237 rangeSpecs = "bytes=%s-%s" % (start, end or "")
238 self.sendHeader('Range', rangeSpecs)
239
240 self.endHeaders()
241
242 self._resetIdleCheck()
243
250
252 self._keepActive()
253 status = int(status_str)
254 self.status = status
255
256 if status in (http.OK, http.NO_CONTENT, http.PARTIAL_CONTENT):
257 return
258
259 if status == http.REQUESTED_RANGE_NOT_SATISFIABLE:
260 self._serverError(common.RANGE_NOT_SATISFIABLE,
261 "HTTP range not satisfiable")
262 if status == http.NOT_MODIFIED:
263 self._conditionFail(common.STREAM_NOT_MODIFIED,
264 "Stream not modified")
265 elif status == http.PRECONDITION_FAILED:
266 self._conditionFail(common.STREAM_MODIFIED, "Stream Modified")
267 elif status == http.NOT_FOUND:
268 self._streamNotAvailable(common.STREAM_NOTFOUND,
269 "Resource Not Found")
270 elif status == http.FORBIDDEN:
271 self._streamNotAvailable(common.STREAM_FORBIDDEN,
272 "Resource Forbidden")
273 if status in (http.MOVED_PERMANENTLY, http.FOUND):
274 self._serverError(common.NOT_IMPLEMENTED,
275 "HTTP redirection not supported")
276 else:
277 self._serverError(common.NOT_IMPLEMENTED,
278 "Unsupported HTTP response: %s (%s)"
279 % (message, status))
280
284
299
319
321 if self.info is not None:
322 if self._remaining == 0:
323 self.log("Request done, got %d bytes starting at %d from %s, "
324 "last modified on %s", self.info.size,
325 self.info.start, self.url.toString(),
326 ts2str(self.info.mtime))
327 self._streamDone()
328 return
329 if self.info:
330 self.log("Incomplete request, missing %d bytes from the expected "
331 "%d bytes starting at %d from %s", self._remaining,
332 self.info.size, self.info.start, self.url.toString())
333 else:
334 self.log("Incomplete request %s", self.url.toString())
335
339
340
341
343 self._updateCount += 1
344
348
350 if self._idlecheck:
351 self._idlecheck.cancel()
352 self._idlecheck = None
353 self._updateCount = 0
354
361
365
372
377
382
387
391
395
400
401
402 if __name__ == "__main__":
403 import sys
404
406 k, v = a.split('=', 1)
407 if v == 'None':
408 d[k] = None
409 try:
410 d[k] = int(v)
411 except:
412 d[k] = v
413
414
415 kwargs = {}
416 for a in sys.argv[1:]:
417 addarg(kwargs, a)
418
419 url = kwargs.pop('url')
420
422
426
430
434
436 print "Finished"
437 reactor.stop()
438
439 - def onInfo(self, getter, info):
445
446 - def onData(self, getter, data):
449
450
451 consumer = DummyConsumer()
452 requester = StreamRequester(5000, 5000)
453 requester.retrieve(consumer, http_utils.Url.fromString(url), **kwargs)
454 reactor.run()
455