Package flumotion :: Package component :: Package misc :: Package httpserver :: Module cachedprovider
[hide private]

Source Code for Module flumotion.component.misc.httpserver.cachedprovider

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  import errno 
  23  import os 
  24  import stat 
  25  import tempfile 
  26  import threading 
  27  import time 
  28   
  29  from twisted.internet import defer, reactor, abstract 
  30   
  31  from flumotion.common import log, format, common, python 
  32  from flumotion.component.misc.httpserver import cachestats 
  33  from flumotion.component.misc.httpserver import fileprovider 
  34  from flumotion.component.misc.httpserver import localpath 
  35  from flumotion.component.misc.httpserver.fileprovider import FileClosedError 
  36  from flumotion.component.misc.httpserver.fileprovider import FileError 
  37  from flumotion.component.misc.httpserver.fileprovider import NotFoundError 
  38   
  39   
  40  SEEK_SET = 0 # os.SEEK_SET is not defined in python 2.4 
  41  DEFAULT_CACHE_SIZE = 1000 
  42  DEFAULT_CLEANUP_HIGH_WATERMARK = 1.0 
  43  DEFAULT_CLEANUP_LOW_WATERMARK = 0.6 
  44  FILE_COPY_BUFFER_SIZE = abstract.FileDescriptor.bufferSize 
  45  TEMP_FILE_POSTFIX = ".tmp" 
  46  MAX_LOGNAME_SIZE = 30 # maximum number of characters to use for logging a path 
  47  ID_CACHE_MAX_SIZE = 1024 
  48   
  49   
  50  LOG_CATEGORY = "fileprovider-localcached" 
  51   
  52  _errorLookup = {errno.ENOENT: NotFoundError, 
  53                  errno.EISDIR: fileprovider.CannotOpenError, 
  54                  errno.EACCES: fileprovider.AccessError} 
  55   
  56   
57 -class FileProviderLocalCachedPlug(fileprovider.FileProviderPlug, log.Loggable):
58 """ 59 60 WARNING: Currently does not work properly in combination with rate-control. 61 62 I'm caching the files taken from a mounted 63 network file system to a shared local directory. 64 Multiple instances can share the same cache directory, 65 but it's recommended to use slightly different values 66 for the property cleanup-high-watermark. 67 I'm using the directory access time to know when 68 the cache usage changed and keep an estimation 69 of the cache usage for statistics. 70 71 I'm creating a unique thread to do the file copying block by block, 72 for all files to be copied to the cache. 73 Using a thread instead of a reactor.callLater 'loop' allow for 74 higher copy throughput and do not slow down the mail loop when 75 lots of files are copied at the same time. 76 Simulations with real request logs show that using a thread 77 gives better results than the equivalent asynchronous implementation. 78 """ 79 80 logCategory = LOG_CATEGORY 81
82 - def __init__(self, args):
83 props = args['properties'] 84 self._sourceDir = props.get('path') 85 self._cacheDir = props.get('cache-dir', "/tmp") 86 cacheSizeInMB = int(props.get('cache-size', DEFAULT_CACHE_SIZE)) 87 self._cacheSize = cacheSizeInMB * 10 ** 6 # in bytes 88 self._cleanupEnabled = props.get('cleanup-enabled', True) 89 highWatermark = props.get('cleanup-high-watermark', 90 DEFAULT_CLEANUP_HIGH_WATERMARK) 91 highWatermark = max(0.0, min(1.0, float(highWatermark))) 92 lowWatermark = props.get('cleanup-low-watermark', 93 DEFAULT_CLEANUP_LOW_WATERMARK) 94 lowWatermark = max(0.0, min(1.0, float(lowWatermark))) 95 self._identifiers = {} # {path: identifier} 96 self._sessions = {} # {CopySession: None} 97 self._index = {} # {path: CopySession} 98 99 self.info("Cached file provider initialized") 100 self.debug("Source directory: '%s'", self._sourceDir) 101 self.debug("Cache directory: '%s'", self._cacheDir) 102 self.debug("Cache size: %d bytes", self._cacheSize) 103 self.debug("Cache cleanup enabled: %s", self._cleanupEnabled) 104 105 common.ensureDir(self._sourceDir, "source") 106 common.ensureDir(self._cacheDir, "cache") 107 108 self._cacheUsage = None # in bytes 109 self._cacheUsageLastUpdate = None 110 self._lastCacheTime = None 111 self._cacheMaxUsage = self._cacheSize * highWatermark # in bytes 112 self._cacheMinUsage = self._cacheSize * lowWatermark # in bytes 113 self.stats = cachestats.CacheStatistics() 114 115 # Initialize cache usage 116 self.updateCacheUsage() 117 118 # Startup copy thread 119 self._thread = CopyThread(self)
120
121 - def start(self, component):
122 self._thread.start()
123
124 - def stop(self, component):
125 self._thread.stop()
126
127 - def startStatsUpdates(self, updater):
128 #FIXME: This is temporary. Should be done with plug UI. 129 # Used for the UI to know which plug is used 130 updater.update("provider-name", "fileprovider-localcached") 131 self.stats.startUpdates(updater)
132
133 - def stopStatsUpdates(self):
134 self.stats.stopUpdates()
135
136 - def getRootPath(self):
137 if self._sourceDir is None: 138 return None 139 return LocalPath(self, self._sourceDir)
140 141 142 ## Protected Methods ## 143
144 - def getLogName(self, path, id=None):
145 """ 146 Returns a log name for a path, shortened to a maximum size 147 specified by the global variable MAX_LOGNAME_SIZE. 148 The log name will be the filename part of the path postfixed 149 by the id in brackets if id is not None. 150 """ 151 filename = os.path.basename(path) 152 basename, postfix = os.path.splitext(filename) 153 if id is not None: 154 postfix += "[%s]" % id 155 prefixMaxLen = MAX_LOGNAME_SIZE - len(postfix) 156 if len(basename) > prefixMaxLen: 157 basename = basename[:prefixMaxLen-1] + "*" 158 return basename + postfix
159
160 - def getIdentifier(self, path):
161 """ 162 Returns an identifier for a path. 163 The identifier is a digest of the path encoded in hex string. 164 The hash function used is SHA1. 165 It caches the identifiers in a dictionary indexed by path and with 166 a maximum number of entry specified by the constant ID_CACHE_MAX_SIZE. 167 """ 168 ident = self._identifiers.get(path, None) 169 if ident is None: 170 hash = python.sha1() 171 hash.update(path) 172 ident = hash.digest().encode("hex").strip('\n') 173 # Prevent the cache from growing endlessly 174 if len(self._identifiers) >= ID_CACHE_MAX_SIZE: 175 self._identifiers.clear() 176 self._identifiers[path] = ident 177 return ident
178
179 - def getCachePath(self, path):
180 ident = self.getIdentifier(path) 181 return os.path.join(self._cacheDir, ident)
182
183 - def getTempPath(self, path):
184 ident = self.getIdentifier(path) 185 return os.path.join(self._cacheDir, ident + TEMP_FILE_POSTFIX)
186
187 - def updateCacheUsageStatistics(self):
188 self.stats.onEstimateCacheUsage(self._cacheUsage, self._cacheSize)
189
190 - def updateCacheUsage(self):
191 """ 192 @returns: the cache usage, in bytes 193 """ 194 # Only calculate cache usage if the cache directory 195 # modification time changed since the last time we looked at it. 196 cacheTime = os.path.getmtime(self._cacheDir) 197 if ((self._cacheUsage is None) or (self._lastCacheTime < cacheTime)): 198 self._lastCacheTime = cacheTime 199 os.chdir(self._cacheDir) 200 201 # There's a possibility here that we got the filename from 202 # os.listdir, but before we get to os.stat, the file is gone. We'll 203 # get a OSError with a ENOENT errno and we should ignore that file, 204 # since we're just estimating the amount of space taken by the 205 # cache 206 sizes = [] 207 for f in os.listdir('.'): 208 try: 209 sizes.append(os.path.getsize(f)) 210 except OSError, e: 211 if e.errno == errno.ENOENT: 212 pass 213 else: 214 raise 215 216 self._cacheUsage = sum(sizes) 217 self.updateCacheUsageStatistics() 218 self._cacheUsageLastUpdate = time.time() 219 return self._cacheUsage
220
221 - def allocateCacheSpace(self, size):
222 """ 223 Try to reserve cache space. 224 225 If there is not enough space and the cache cleanup is enabled, 226 it will delete files from the cache starting with the ones 227 with oldest access time until the cache usage drops below 228 the fraction specified by the property cleanup-low-threshold. 229 230 Returns a 'tag' that should be used to 'free' the cache space 231 using releaseCacheSpace. 232 This tag is needed to better estimate the cache usage, 233 if the cache usage has been updated since cache space 234 has been allocated, freeing up the space should not change 235 the cache usage estimation. 236 237 @param size: size to reserve, in bytes 238 @type size: int 239 240 @returns: an allocation tag or None if the allocation failed. 241 @rtype: tuple 242 """ 243 usage = self.updateCacheUsage() 244 if (usage + size) < self._cacheMaxUsage: 245 self._cacheUsage += size 246 self.updateCacheUsageStatistics() 247 return (self._cacheUsageLastUpdate, size) 248 249 self.debug('cache usage will be %sbytes, need more cache', 250 format.formatStorage(usage + size)) 251 252 if not self._cleanupEnabled: 253 self.debug('not allowed to clean up cache, so cannot cache') 254 # No space available and cleanup disabled: allocation failed. 255 return None 256 257 # Update cleanup statistics 258 self.stats.onCleanup() 259 # List the cached files with file state 260 os.chdir(self._cacheDir) 261 262 files = [] 263 for f in os.listdir('.'): 264 # There's a possibility of getting an error on os.stat here. See 265 # similar comment in updateCacheUsage() 266 try: 267 files.append((f, os.stat(f))) 268 except OSError, e: 269 if e.errno == errno.ENOENT: 270 pass 271 else: 272 raise 273 274 # Calculate the cached file total size 275 usage = sum([d[1].st_size for d in files]) 276 # Delete the cached file starting by the oldest accessed ones 277 files.sort(key=lambda d: d[1].st_atime) 278 for path, info in files: 279 try: 280 os.remove(path) 281 usage -= info.st_size 282 except OSError, e: 283 if e.errno == errno.ENOENT: 284 # Already been deleted by another process, 285 # but subtract the size anyway 286 usage -= info.st_size 287 else: 288 self.warning("Error cleaning cached file: %s", str(e)) 289 if usage <= self._cacheMinUsage: 290 # We reach the cleanup limit 291 self.debug('cleaned up, cache use is now %sbytes', 292 format.formatStorage(usage)) 293 break 294 295 # Update the cache usage 296 self._cacheUsage = usage 297 self._cacheUsageLastUpdate = time.time() 298 if (usage + size) < self._cacheSize: 299 # There is enough space to allocate, allocation succeed 300 self._cacheUsage += size 301 self.updateCacheUsageStatistics() 302 return (self._cacheUsageLastUpdate, size) 303 # There is no enough space, allocation failed 304 self.updateCacheUsageStatistics() 305 return None
306
307 - def releaseCacheSpace(self, tag):
308 lastUpdate, size = tag 309 if lastUpdate == self._cacheUsageLastUpdate: 310 self._cacheUsage -= size 311 self.updateCacheUsageStatistics()
312
313 - def getCopySession(self, path):
314 return self._index.get(path, None)
315
316 - def createCopySession(self, path, file, info):
317 # First outdate existing session for the path 318 self.outdateCopySession(path) 319 # Then create a new one 320 session = CopySession(self, path, file, info) 321 self._index[path] = session 322 return session
323
324 - def outdateCopySession(self, path):
325 session = self._index.get(path, None) 326 if session is not None: 327 session.outdate()
328
329 - def removeCopySession(self, session):
330 path = session.sourcePath 331 if path in self._index: 332 del self._index[path] 333 self.disableSession(session)
334
335 - def activateSession(self, session):
336 self.debug("Starting Copy Session '%s' (%d)", 337 session.logName, len(self._sessions)) 338 if session in self._sessions: 339 return 340 self._sessions[session] = None 341 self._activateCopyLoop()
342
343 - def disableSession(self, session):
344 self.debug("Stopping Copy Session '%s' (%d)", 345 session.logName, len(self._sessions)) 346 if session in self._sessions: 347 del self._sessions[session] 348 if not self._sessions: 349 self._disableCopyLoop()
350
351 - def _activateCopyLoop(self):
352 self._thread.wakeup()
353
354 - def _disableCopyLoop(self):
355 self._thread.sleep()
356 357
358 -class LocalPath(localpath.LocalPath, log.Loggable):
359 360 logCategory = LOG_CATEGORY 361
362 - def __init__(self, plug, path):
363 localpath.LocalPath.__init__(self, path) 364 self.logName = plug.getLogName(path) 365 self.plug = plug
366
367 - def child(self, name):
368 childpath = self._getChildPath(name) 369 return LocalPath(self.plug, childpath)
370
371 - def open(self):
372 if not os.path.exists(self.path): 373 # Delete the cached file and outdate the copying session 374 self.plug.outdateCopySession(self.path) 375 self._removeCachedFile(self.path) 376 raise NotFoundError("Path '%s' not found" % self.path) 377 return CachedFile(self.plug, self.path, self.mimeType)
378 379 380 ## Private Methods ## 381
382 - def _removeCachedFile(self, sourcePath):
383 cachePath = self.plug.getCachePath(sourcePath) 384 try: 385 os.remove(cachePath) 386 self.debug("Deleted cached file '%s'", cachePath) 387 except OSError, e: 388 if e.errno != errno.ENOENT: 389 self.warning("Error deleting file: %s", str(e))
390 391
392 -class CopyThread(threading.Thread, log.Loggable):
393 394 logCategory = LOG_CATEGORY 395
396 - def __init__(self, plug):
397 threading.Thread.__init__(self) 398 self.plug = plug 399 self._running = True 400 self._event = threading.Event()
401
402 - def stop(self):
403 self._running = False 404 self._event.set() 405 self.join()
406
407 - def wakeup(self):
408 self._event.set()
409
410 - def sleep(self):
411 self._event.clear()
412
413 - def run(self):
414 while self._running: 415 sessions = self.plug._sessions.keys() 416 for session in sessions: 417 try: 418 session.doServe() 419 except Exception, e: 420 log.warning("Error during async file serving: %s", 421 log.getExceptionMessage(e)) 422 try: 423 session.doCopy() 424 except Exception, e: 425 log.warning("Error during file copy: %s", 426 log.getExceptionMessage(e)) 427 self._event.wait()
428 429
430 -class CopySessionCancelled(Exception):
431 pass
432 433
434 -class CopySession(log.Loggable):
435 """ 436 I'm serving a file at the same time I'm copying it 437 from the network file system to the cache. 438 If the client ask for data not yet copied, the source file 439 read operation is delegated the the copy thread as an asynchronous 440 operation because file seeking/reading is not thread safe. 441 442 The copy session have to open two times the temporary file, 443 one for read-only and one for write only, 444 because closing a read/write file change the modification time. 445 We want the modification time to be set to a known value 446 when the copy is finished even keeping read access to the file. 447 448 The session manage a reference counter to know how many TempFileDelegate 449 instances are using the session to delegate read operations. 450 This is done for two reasons: 451 - To avoid circular references by have the session manage 452 a list of delegate instances. 453 - If not cancelled, sessions should not be deleted 454 when no delegates reference them anymore. So weakref cannot be used. 455 """ 456 457 logCategory = LOG_CATEGORY 458
459 - def __init__(self, plug, sourcePath, sourceFile, sourceInfo):
460 self.plug = plug 461 self.logName = plug.getLogName(sourcePath, sourceFile.fileno()) 462 self.copying = None # Not yet started 463 self.sourcePath = sourcePath 464 self.tempPath = plug.getTempPath(sourcePath) 465 self.cachePath = plug.getCachePath(sourcePath) 466 # The size and modification time is not supposed to change over time 467 self.mtime = sourceInfo[stat.ST_MTIME] 468 self.size = sourceInfo[stat.ST_SIZE] 469 self._sourceFile = sourceFile 470 self._cancelled = False # True when a session has been outdated 471 self._wTempFile = None 472 self._rTempFile = None 473 self._allocTag = None # Tag used to identify cache allocations 474 self._waitCancel = None 475 # List of the pending read from source file 476 self._pending = [] # [(position, size, defer),] 477 self._refCount = 0 478 self._copied = 0 # None when the file is fully copied 479 self._correction = 0 # Used to take into account copies data for stats 480 self._startCopying()
481
482 - def outdate(self):
483 self.log("Copy session outdated") 484 self._cancelSession()
485
486 - def read(self, position, size, stats):
487 # If the temporary file is open for reading 488 if self._rTempFile: 489 # And the needed data is already downloaded 490 # Safe to read because it's not used by the copy thread 491 if (self._copied is None) or ((position + size) <= self._copied): 492 try: 493 self._rTempFile.seek(position) 494 data = self._rTempFile.read(size) 495 # Adjust the cache/source values to take copy into account 496 size = len(data) 497 # It's safe to use and modify self._correction even if 498 # it's used by the copy thread because the copy thread 499 # only add and the main thread only subtract. 500 # The only thing that could append it's a less accurate 501 # correction... 502 diff = min(self._correction, size) 503 self._correction -= diff 504 stats.onBytesRead(0, size, diff) 505 return data 506 except Exception, e: 507 self.warning("Failed to read from temporary file: %s", 508 log.getExceptionMessage(e)) 509 self._cancelSession() 510 # If the source file is not open anymore, we can't continue 511 if self._sourceFile is None: 512 raise FileError("File caching error, cannot proceed") 513 # Otherwise read the data directly from the source 514 try: 515 # It's safe to not use Lock, because simple type operations 516 # are thread safe, and even if the copying state change 517 # from True to False, _onCopyFinished will be called 518 # later in the same thread and will process pending reads. 519 if self.copying: 520 # If we are currently copying the source file, 521 # we defer the file read to the copying thread 522 # because we can't read a file from two threads. 523 d = defer.Deferred() 524 525 def updateStats(data): 526 stats.onBytesRead(len(data), 0, 0) 527 return data
528 529 d.addCallback(updateStats) 530 self._pending.append((position, size, d)) 531 return d 532 # Not copying, it's safe to read directly 533 self._sourceFile.seek(position) 534 data = self._sourceFile.read(size) 535 stats.onBytesRead(len(data), 0, 0) 536 return data 537 except IOError, e: 538 cls = _errorLookup.get(e.errno, FileError) 539 raise cls("Failed to read source file: %s" % str(e))
540
541 - def incRef(self):
542 self._refCount += 1
543
544 - def decRef(self):
545 self._refCount -= 1 546 # If there is only one client and the session has been cancelled, 547 # stop copying and and serve the source file directly 548 if (self._refCount == 1) and self._cancelled: 549 # Cancel the copy and close the writing temporary file. 550 self._cancelCopy(False, True) 551 # We close if not still copying source file 552 if (self._refCount == 0) and (self._wTempFile is None): 553 self.close()
554
555 - def close(self):
556 if self.plug is not None: 557 self.log("Closing copy session") 558 # Cancel the copy, close the source file and the writing temp file. 559 self._cancelCopy(True, True) 560 self._closeReadTempFile() 561 self.plug.removeCopySession(self) 562 self.plug = None
563
564 - def doServe(self):
565 if not (self.copying and self._pending): 566 # Nothing to do anymore. 567 return False 568 # We have pending source file read operations 569 position, size, d = self._pending.pop(0) 570 self._sourceFile.seek(position) 571 data = self._sourceFile.read(size) 572 # Call the deferred in the main thread 573 reactor.callFromThread(d.callback, data) 574 return len(self._pending) > 0
575
576 - def doCopy(self):
577 # Called in the copy thread context. 578 if not self.copying: 579 # Nothing to do anymore. 580 return False 581 # Copy a buffer from the source file to the temporary writing file 582 cont = True 583 try: 584 # It's safe to use self._copied, because it's only set 585 # by the copy thread during copy. 586 self._sourceFile.seek(self._copied) 587 self._wTempFile.seek(self._copied) 588 data = self._sourceFile.read(FILE_COPY_BUFFER_SIZE) 589 self._wTempFile.write(data) 590 self._wTempFile.flush() 591 except IOError, e: 592 self.warning("Failed to copy source file: %s", 593 log.getExceptionMessage(e)) 594 # Abort copy and cancel the session 595 self.copying = False 596 reactor.callFromThread(self.plug.disableSession, self) 597 reactor.callFromThread(self._cancelSession) 598 # Do not continue 599 cont = False 600 else: 601 size = len(data) 602 self._copied += size 603 self._correction += size 604 if size < FILE_COPY_BUFFER_SIZE: 605 # Stop copying 606 self.copying = False 607 reactor.callFromThread(self.plug.disableSession, self) 608 reactor.callFromThread(self._onCopyFinished) 609 cont = False 610 # Check for cancellation 611 if self._waitCancel: 612 # Copy has been cancelled 613 self.copying = False 614 reactor.callFromThread(self.plug.disableSession, self) 615 reactor.callFromThread(self._onCopyCancelled, *self._waitCancel) 616 return False 617 return cont
618 619 620 ## Private Methods ## 621
622 - def _allocCacheSpace(self):
623 # Retrieve a cache allocation tag, used to track the cache free space 624 tag = self.plug.allocateCacheSpace(self.size) 625 if tag is None: 626 return False 627 self._allocTag = tag 628 return True
629
630 - def _releaseCacheSpace(self):
631 if not (self._cancelled or self._allocTag is None): 632 self.plug.releaseCacheSpace(self._allocTag) 633 self._allocTag = None
634
635 - def _cancelSession(self):#
636 if not self._cancelled: 637 self.log("Canceling copy session") 638 # Not a valid copy session anymore 639 self._cancelled = True 640 # If there is no more than 1 client using the session, 641 # stop copying and and serve the source file directly 642 if self._refCount <= 1: 643 # Cancel and close the temp write file. 644 self._cancelCopy(False, True) 645
646 - def _startCopying(self):
647 self.log("Start copy session") 648 # First ensure there is not already a temporary file 649 self._removeTempFile() 650 # Reserve cache space, may trigger a cache cleanup 651 if not self._allocCacheSpace(): 652 # No free space, proxying source file directly 653 self._cancelSession() 654 return 655 self.plug.stats.onCopyStarted() 656 # Then open a transient temporary files 657 try: 658 fd, transientPath = tempfile.mkstemp(".tmp", LOG_CATEGORY) 659 self.log("Created transient file '%s'", transientPath) 660 self._wTempFile = os.fdopen(fd, "wb") 661 self.log("Opened temporary file for writing [fd %d]", 662 self._wTempFile.fileno()) 663 self._rTempFile = file(transientPath, "rb") 664 self.log("Opened temporary file for reading [fd %d]", 665 self._rTempFile.fileno()) 666 except IOError, e: 667 self.warning("Failed to open temporary file: %s", 668 log.getExceptionMessage(e)) 669 self._cancelSession() 670 return 671 # Truncate it to the source size 672 try: 673 self.log("Truncating temporary file to size %d", self.size) 674 self._wTempFile.truncate(self.size) 675 except IOError, e: 676 self.warning("Failed to truncate temporary file: %s", 677 log.getExceptionMessage(e)) 678 self._cancelSession() 679 return 680 # And move it to the real temporary file path 681 try: 682 self.log("Renaming transient file to '%s'", self.tempPath) 683 os.rename(transientPath, self.tempPath) 684 except IOError, e: 685 self.warning("Failed to rename transient temporary file: %s", 686 log.getExceptionMessage(e)) 687 # And start copying 688 self.debug("Start caching '%s' [fd %d]", 689 self.sourcePath, self._sourceFile.fileno()) 690 # Activate the copy 691 self.copying = True 692 self.plug.activateSession(self)
693
694 - def _cancelCopy(self, closeSource, closeTempWrite):
695 if self.copying: 696 self.log("Canceling file copy") 697 if self._waitCancel: 698 # Already waiting for cancellation. 699 return 700 self.debug("Cancel caching '%s' [fd %d]", 701 self.sourcePath, self._sourceFile.fileno()) 702 # Disable the copy, we do not modify copying directly 703 # to let the copying thread terminate current operations. 704 # The file close operation are deferred. 705 self._waitCancel = (closeSource, closeTempWrite) 706 return 707 # No pending copy, we can close the files 708 if closeSource: 709 self._closeSourceFile() 710 if closeTempWrite: 711 self._closeWriteTempFile()
712
713 - def _onCopyCancelled(self, closeSource, closeTempWrite):
714 self.log("Copy session cancelled") 715 # Called when the copy thread really stopped to read/write 716 self._waitCancel = None 717 self.plug.stats.onCopyCancelled(self.size, self._copied) 718 # Resolve all pending source read operations 719 for position, size, d in self._pending: 720 if self._sourceFile is None: 721 d.errback(CopySessionCancelled()) 722 else: 723 try: 724 self._sourceFile.seek(position) 725 data = self._sourceFile.read(size) 726 d.callback(data) 727 except Exception, e: 728 self.warning("Failed to read from source file: %s", 729 log.getExceptionMessage(e)) 730 d.errback(e) 731 self._pending = [] 732 # then we can safely close files 733 if closeSource: 734 self._closeSourceFile() 735 if closeTempWrite: 736 self._closeWriteTempFile()
737
738 - def _onCopyFinished(self):
739 # Called when the copy thread really stopped to read/write 740 self.debug("Finished caching '%s' [fd %d]", 741 self.sourcePath, self._sourceFile.fileno()) 742 self.plug.stats.onCopyFinished(self.size) 743 # Set the copy as finished to prevent the temporary file 744 # to be deleted when closed 745 self._copied = None 746 # Closing source and write files 747 self._closeSourceFile() 748 self._closeWriteTempFile() 749 # Setting the modification time on the temporary file 750 try: 751 mtime = self.mtime 752 atime = int(time.time()) 753 self.log("Setting temporary file modification time to %d", mtime) 754 # FIXME: Should use futimes, but it's not wrapped by python 755 os.utime(self.tempPath, (atime, mtime)) 756 except OSError, e: 757 if e.errno == errno.ENOENT: 758 # The file may have been deleted by another process 759 self._releaseCacheSpace() 760 else: 761 self.warning("Failed to update modification time of temporary " 762 "file: %s", log.getExceptionMessage(e)) 763 self._cancelSession() 764 try: 765 self.log("Renaming temporary file to '%s'", self.cachePath) 766 os.rename(self.tempPath, self.cachePath) 767 except OSError, e: 768 if e.errno == errno.ENOENT: 769 self._releaseCacheSpace() 770 else: 771 self.warning("Failed to rename temporary file: %s", 772 log.getExceptionMessage(e)) 773 self._cancelSession() 774 # Complete all pending source read operations with the temporary file. 775 for position, size, d in self._pending: 776 try: 777 self._rTempFile.seek(position) 778 data = self._rTempFile.read(size) 779 d.callback(data) 780 except Exception, e: 781 self.warning("Failed to read from temporary file: %s", 782 log.getExceptionMessage(e)) 783 d.errback(e) 784 self._pending = [] 785 if self._refCount == 0: 786 # We were waiting for the file to be copied to close it. 787 self.close()
788
789 - def _removeTempFile(self):
790 try: 791 os.remove(self.tempPath) 792 self.log("Deleted temporary file '%s'", self.tempPath) 793 # Inform the plug that cache space has been released 794 self._releaseCacheSpace() 795 except OSError, e: 796 if e.errno == errno.ENOENT: 797 if self._wTempFile is not None: 798 # Already deleted but inform the plug anyway 799 self._releaseCacheSpace() 800 else: 801 self.warning("Error deleting temporary file: %s", 802 log.getExceptionMessage(e))
803
804 - def _closeSourceFile(self):
805 if self._sourceFile is not None: 806 self.log("Closing source file [fd %d]", self._sourceFile.fileno()) 807 try: 808 try: 809 self._sourceFile.close() 810 finally: 811 self._sourceFile = None 812 except IOError, e: 813 self.warning("Failed to close source file: %s", 814 log.getExceptionMessage(e))
815
816 - def _closeReadTempFile(self):
817 if self._rTempFile is not None: 818 self.log("Closing temporary file for reading [fd %d]", 819 self._rTempFile.fileno()) 820 try: 821 try: 822 self._rTempFile.close() 823 finally: 824 self._rTempFile = None 825 except IOError, e: 826 self.warning("Failed to close temporary file for reading: %s", 827 log.getExceptionMessage(e))
828
829 - def _closeWriteTempFile(self):
830 if self._wTempFile is not None: 831 # If the copy is not finished, remove the temporary file 832 if not self._cancelled and self._copied is not None: 833 self._removeTempFile() 834 self.log("Closing temporary file for writing [fd %d]", 835 self._wTempFile.fileno()) 836 try: 837 try: 838 self._wTempFile.close() 839 finally: 840 self._wTempFile = None 841 except Exception, e: 842 self.warning("Failed to close temporary file for writing: %s", 843 log.getExceptionMessage(e))
844 845
846 -class TempFileDelegate(log.Loggable):
847 848 logCategory = LOG_CATEGORY 849
850 - def __init__(self, plug, session):
851 self.logName = plug.getLogName(session.sourcePath) 852 self.mtime = session.mtime 853 self.size = session.size 854 self._session = session 855 self._reading = False 856 self._position = 0 857 session.incRef()
858
859 - def tell(self):
860 return self._position
861
862 - def seek(self, offset):
863 self._position = offset
864
865 - def read(self, size, stats):
866 assert not self._reading, "Simultaneous read not supported" 867 d = self._session.read(self._position, size, stats) 868 if isinstance(d, defer.Deferred): 869 self._reading = True 870 return d.addCallback(self._cbGotData) 871 self._position += len(d) 872 return d
873
874 - def close(self):
875 if self._session is not None: 876 self._session.decRef() 877 self._session = None
878 879 880 ## Private Methods ## 881
882 - def _cbGotData(self, data):
883 self._reading = False 884 self._position += len(data) 885 return data
886 887
888 -class DirectFileDelegate(log.Loggable):
889 890 logCategory = LOG_CATEGORY 891 892 # Default values 893 _file = None 894
895 - def __init__(self, plug, path, file, info):
896 self.logName = plug.getLogName(path, file.fileno()) 897 self._file = file 898 # The size and modification time is not supposed to change over time 899 self.mtime = info[stat.ST_MTIME] 900 self.size = info[stat.ST_SIZE]
901
902 - def tell(self):
903 try: 904 return self._file.tell() 905 except IOError, e: 906 cls = _errorLookup.get(e.errno, FileError) 907 raise cls("Failed to tell position in file: %s" % str(e))
908
909 - def seek(self, offset):
910 try: 911 self._file.seek(offset, SEEK_SET) 912 except IOError, e: 913 cls = _errorLookup.get(e.errno, FileError) 914 raise cls("Failed to seek in cached file: %s" % str(e))
915
916 - def read(self, size):
917 try: 918 return self._file.read(size) 919 except IOError, e: 920 cls = _errorLookup.get(e.errno, FileError) 921 raise cls("Failed to read data from file: %s" % str(e))
922
923 - def close(self):
924 if self._file is not None: 925 try: 926 try: 927 self._file.close() 928 finally: 929 self._file = None 930 except IOError, e: 931 cls = _errorLookup.get(e.errno, FileError) 932 raise cls("Failed to close file: %s" % str(e))
933 934
935 -class CachedFileDelegate(DirectFileDelegate):
936
937 - def read(self, size, stats):
938 data = DirectFileDelegate.read(self, size) 939 stats.onBytesRead(0, len(data), 0) 940 return data
941
942 - def close(self):
943 if self._file is not None: 944 self.log("Closing cached file [fd %d]", self._file.fileno()) 945 DirectFileDelegate.close(self)
946 947
948 -class CachedFile(fileprovider.File, log.Loggable):
949 950 logCategory = LOG_CATEGORY 951 952 # Overriding parent class properties to become attribute 953 mimeType = None 954 955 # Default values 956 _delegate = None 957
958 - def __init__(self, plug, path, mimeType):
959 self.logName = plug.getLogName(path) 960 self.plug = plug 961 self._path = path 962 self.mimeType = mimeType 963 self.stats = cachestats.RequestStatistics(plug.stats) 964 self._delegate = self._selectDelegate()
965
966 - def __str__(self):
967 return "<CachedFile '%s'>" % self._path
968
969 - def getmtime(self):
970 if self._delegate is None: 971 raise FileClosedError("File closed") 972 return self._delegate.mtime
973
974 - def getsize(self):
975 if self._delegate is None: 976 raise FileClosedError("File closed") 977 return self._delegate.size
978
979 - def tell(self):
980 if self._delegate is None: 981 raise FileClosedError("File closed") 982 return self._delegate.tell()
983
984 - def seek(self, offset):
985 if self._delegate is None: 986 raise FileClosedError("File closed") 987 return self._delegate.seek(offset)
988
989 - def read(self, size):
990 if self._delegate is None: 991 raise FileClosedError("File closed") 992 try: 993 d = self._delegate.read(size, self.stats) 994 if isinstance(d, defer.Deferred): 995 return d 996 return defer.succeed(d) 997 except IOError, e: 998 cls = _errorLookup.get(e.errno, FileError) 999 return defer.fail(cls("Failed to read cached data: %s", str(e))) 1000 except: 1001 return defer.fail()
1002
1003 - def close(self):
1004 if self._delegate: 1005 self.stats.onClosed() 1006 self._delegate.close() 1007 self._delegate = None
1008
1009 - def __del__(self):
1010 self.close()
1011
1012 - def getLogFields(self):
1013 return self.stats.getLogFields()
1014 1015 1016 ## Private Methods ## 1017
1018 - def _open(self, path):
1019 """ 1020 @rtype: (file, statinfo) 1021 """ 1022 try: 1023 file = open(path, 'rb') 1024 fd = file.fileno() 1025 except IOError, e: 1026 cls = _errorLookup.get(e.errno, FileError) 1027 raise cls("Failed to open file '%s': %s" % (path, str(e))) 1028 try: 1029 info = os.fstat(fd) 1030 except OSError, e: 1031 cls = _errorLookup.get(e.errno, FileError) 1032 raise cls("Failed to stat file '%s': %s" % (path, str(e))) 1033 return file, info
1034
1035 - def _closeSourceFile(self, sourceFile):
1036 self.log("Closing source file [fd %d]", sourceFile.fileno()) 1037 try: 1038 sourceFile.close() 1039 except Exception, e: 1040 self.warning("Failed to close source file: %s", 1041 log.getExceptionMessage(e))
1042
1043 - def _selectDelegate(self):
1044 sourcePath = self._path 1045 cachedPath = self.plug.getCachePath(sourcePath) 1046 # Opening source file 1047 try: 1048 sourceFile, sourceInfo = self._open(sourcePath) 1049 self.log("Opened source file [fd %d]", sourceFile.fileno()) 1050 except NotFoundError: 1051 self.debug("Source file not found") 1052 self.plug.outdateCopySession(sourcePath) 1053 self._removeCachedFile(cachedPath) 1054 raise 1055 # Update the log name 1056 self.logName = self.plug.getLogName(self._path, sourceFile.fileno()) 1057 # Opening cached file 1058 try: 1059 cachedFile, cachedInfo = self._open(cachedPath) 1060 self.log("Opened cached file [fd %d]", cachedFile.fileno()) 1061 except NotFoundError: 1062 self.debug("Did not find cached file '%s'", cachedPath) 1063 return self._tryTempFile(sourcePath, sourceFile, sourceInfo) 1064 except FileError, e: 1065 self.debug("Failed to open cached file: %s", str(e)) 1066 self._removeCachedFile(cachedPath) 1067 return self._tryTempFile(sourcePath, sourceFile, sourceInfo) 1068 # Found a cached file, now check the modification time 1069 self.debug("Found cached file '%s'", cachedPath) 1070 sourceTime = sourceInfo[stat.ST_MTIME] 1071 cacheTime = cachedInfo[stat.ST_MTIME] 1072 if sourceTime != cacheTime: 1073 # Source file changed, remove file and start caching again 1074 self.debug("Cached file out-of-date (%d != %d)", 1075 sourceTime, cacheTime) 1076 self.stats.onCacheOutdated() 1077 self.plug.outdateCopySession(sourcePath) 1078 self._removeCachedFile(cachedPath) 1079 return self._cacheFile(sourcePath, sourceFile, sourceInfo) 1080 self._closeSourceFile(sourceFile) 1081 # We have a valid cached file, just delegate to it. 1082 self.debug("Serving cached file '%s'", cachedPath) 1083 delegate = CachedFileDelegate(self.plug, cachedPath, 1084 cachedFile, cachedInfo) 1085 self.stats.onStarted(delegate.size, cachestats.CACHE_HIT) 1086 return delegate
1087
1088 - def _removeCachedFile(self, cachePath):
1089 try: 1090 os.remove(cachePath) 1091 self.debug("Deleted cached file '%s'", cachePath) 1092 except OSError, e: 1093 if e.errno != errno.ENOENT: 1094 self.warning("Error deleting cached file: %s", str(e))
1095
1096 - def _tryTempFile(self, sourcePath, sourceFile, sourceInfo):
1097 session = self.plug.getCopySession(sourcePath) 1098 if session is None: 1099 self.debug("No copy sessions found") 1100 return self._cacheFile(sourcePath, sourceFile, sourceInfo) 1101 self.debug("Copy session found") 1102 if sourceInfo[stat.ST_MTIME] != session.mtime: 1103 self.debug("Copy session out-of-date (%d != %d)", 1104 sourceInfo[stat.ST_MTIME], session.mtime) 1105 self.stats.onCacheOutdated() 1106 session.outdate() 1107 return self._cacheFile(sourcePath, sourceFile, sourceInfo) 1108 self._closeSourceFile(sourceFile) 1109 # We have a valid session, just delegate to it. 1110 self.debug("Serving temporary file '%s'", session.tempPath) 1111 delegate = TempFileDelegate(self.plug, session) 1112 self.stats.onStarted(delegate.size, cachestats.TEMP_HIT) 1113 return delegate
1114
1115 - def _cacheFile(self, sourcePath, sourceFile, sourceInfo):
1116 session = self.plug.createCopySession(sourcePath, sourceFile, 1117 sourceInfo) 1118 self.debug("Serving temporary file '%s'", session.tempPath) 1119 delegate = TempFileDelegate(self.plug, session) 1120 self.stats.onStarted(delegate.size, cachestats.CACHE_MISS) 1121 return delegate
1122