52 #include "BESSyntaxUserError.h"
53 #include "BESInternalError.h"
55 #include "TheBESKeys.h"
60 using namespace libdap;
63 static const unsigned long long BYTES_PER_MEG = 1048576ULL;
67 static const unsigned long long MAX_CACHE_SIZE_IN_MEGABYTES = (1ULL << 44);
85 BESCache3::get_instance(BESKeys *keys,
const string &cache_dir_key,
const string &prefix_key,
const string &size_key)
88 d_instance =
new BESCache3(keys, cache_dir_key, prefix_key, size_key);
108 d_instance =
new DAPCache3(cache_dir, prefix, size);
120 throw InternalErr(__FILE__, __LINE__,
"Tried to get the DAPCache3 instance, but it hasn't been created yet");
125 static inline string get_errno() {
126 char *s_err = strerror(errno);
130 return "Unknown error.";
134 static inline struct flock *lock(
int type) {
135 static struct flock lock;
137 lock.l_whence = SEEK_SET;
140 lock.l_pid = getpid();
145 inline void DAPCache3::m_record_descriptor(
const string &file,
int fd) {
146 DBG(cerr <<
"DAP Cache: recording descriptor: " << file <<
", " << fd << endl);
147 d_locks.insert(std::pair<string, int>(file, fd));
150 inline int DAPCache3::m_get_descriptor(
const string &file) {
151 FilesAndLockDescriptors::iterator i = d_locks.find(file);
153 DBG(cerr <<
"DAP Cache: getting descriptor: " << file <<
", " << fd << endl);
163 static void unlock(
int fd)
165 if (fcntl(fd, F_SETLK, lock(F_UNLCK)) == -1) {
166 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to unlock the file" + get_errno());
170 throw InternalErr(__FILE__, __LINE__,
"Could not close the (just) unlocked file.");
185 static bool getSharedLock(
const string &file_name,
int &ref_fd)
187 DBG(cerr <<
"getSharedLock: " << file_name <<endl);
190 if ((fd = open(file_name.c_str(), O_RDONLY)) < 0) {
196 throw InternalErr(__FILE__, __LINE__, get_errno());
200 struct flock *l = lock(F_RDLCK);
201 if (fcntl(fd, F_SETLKW, l) == -1) {
204 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
208 DBG(cerr <<
"getSharedLock exit: " << file_name <<endl);
227 static bool getExclusiveLock(
string file_name,
int &ref_fd)
229 DBG(cerr <<
"getExclusiveLock: " << file_name <<endl);
232 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
238 throw InternalErr(__FILE__, __LINE__, get_errno());
242 struct flock *l = lock(F_WRLCK);
243 if (fcntl(fd, F_SETLKW, l) == -1) {
246 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
250 DBG(cerr <<
"getExclusiveLock exit: " << file_name <<endl);
268 static bool getExclusiveLockNB(
string file_name,
int &ref_fd)
270 DBG(cerr <<
"getExclusiveLock_nonblocking: " << file_name <<endl);
273 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
279 throw InternalErr(__FILE__, __LINE__, get_errno());
283 struct flock *l = lock(F_WRLCK);
284 if (fcntl(fd, F_SETLK, l) == -1) {
287 DBG(cerr <<
"getExclusiveLock_nonblocking exit (false): " << file_name <<
" by: " << l->l_pid << endl);
294 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
300 DBG(cerr <<
"getExclusiveLock_nonblocking exit (true): " << file_name <<endl);
320 static bool createLockedFile(
string file_name,
int &ref_fd)
322 DBG(cerr <<
"createLockedFile: " << file_name <<endl);
325 if ((fd = open(file_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666)) < 0) {
331 throw InternalErr(__FILE__, __LINE__, get_errno());
335 struct flock *l = lock(F_WRLCK);
336 if (fcntl(fd, F_SETLKW, l) == -1) {
339 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
343 DBG(cerr <<
"createLockedFile exit: " << file_name <<endl);
351 void DAPCache3::m_check_ctor_params()
353 if (d_cache_dir.empty()) {
354 string err =
"The cache directory was not specified, must be non-empty";
360 int statret = stat(d_cache_dir.c_str(), &buf);
361 if (statret != 0 || !S_ISDIR(buf.st_mode)) {
363 int status = mkdir(d_cache_dir.c_str(), 0775);
365 string err =
"The cache directory " + d_cache_dir +
" does not exist or could not be created.";
370 if (d_prefix.empty()) {
371 string err =
"The cache file prefix was not specified, must not be empty";
375 if (d_max_cache_size_in_bytes <= 0) {
376 string err =
"The cache size was not specified, must be greater than zero";
384 if (d_max_cache_size_in_bytes > MAX_CACHE_SIZE_IN_MEGABYTES) {
385 std::ostringstream msg;
386 msg <<
"The specified cache size was larger than the max cache size of: " << MAX_CACHE_SIZE_IN_MEGABYTES
387 <<
" (was " << d_max_cache_size_in_bytes <<
").";
391 DBG(cerr <<
"DAP Cache: directory " << d_cache_dir <<
", prefix " << d_prefix
392 <<
", max size " << d_max_cache_size_in_bytes << endl );
396 void DAPCache3::m_initialize_cache_info()
404 if (d_max_cache_size_in_bytes > MAX_CACHE_SIZE_IN_MEGABYTES)
405 *(BESLog::TheLog()) <<
"Cache size too big in configuration file, set to max limit." << endl ;
409 d_max_cache_size_in_bytes = min(d_max_cache_size_in_bytes, MAX_CACHE_SIZE_IN_MEGABYTES);
410 d_max_cache_size_in_bytes *= BYTES_PER_MEG;
411 d_target_size = d_max_cache_size_in_bytes * 0.8;
413 m_check_ctor_params();
415 d_cache_info = d_cache_dir +
"/dap.cache.info";
419 if (createLockedFile(d_cache_info, d_cache_info_fd)) {
421 unsigned long long size = 0;
422 if (write(d_cache_info_fd, &size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
423 throw InternalErr(__FILE__, __LINE__,
"Could not write size info to the cache info file in startup!");
429 if ((d_cache_info_fd = open(d_cache_info.c_str(), O_RDWR)) == -1) {
430 throw InternalErr(__FILE__, __LINE__, get_errno());
434 DBG(cerr <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
451 BESCache3::BESCache3(BESKeys *keys,
const string &cache_dir_key,
const string &prefix_key,
const string &size_key) :
452 d_max_cache_size_in_bytes(0)
455 keys->get_value(cache_dir_key, d_cache_dir, found);
457 throw BESSyntaxUserError(
"The cache directory key " + cache_dir_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
460 keys->get_value(prefix_key, d_prefix, found);
462 throw BESSyntaxUserError(
"The prefix key " + prefix_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
465 string cache_size_str;
466 keys->get_value(size_key, cache_size_str, found);
468 throw BESSyntaxUserError(
"The size key " + size_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
470 std::istringstream is(cache_size_str);
471 is >> d_max_cache_size_in_bytes;
473 m_initialize_cache_info();
489 DAPCache3::DAPCache3(
const string &cache_dir,
const string &prefix,
unsigned long long size) :
490 d_cache_dir(cache_dir), d_prefix(prefix), d_max_cache_size_in_bytes(size)
492 m_initialize_cache_info();
515 if (target.at(0) ==
'/') {
516 target = src.substr(1, target.length() - 1);
518 string::size_type slash = 0;
519 while ((slash = target.find(
'/')) != string::npos) {
520 target.replace(slash, 1, 1, DAPCache3::DAP_CACHE_CHAR);
522 string::size_type last_dot = target.rfind(
'.');
523 if (last_dot != string::npos) {
524 target = target.substr(0, last_dot);
528 return d_cache_dir +
"/" + d_prefix + DAPCache3::DAP_CACHE_CHAR + target;
552 bool status = getSharedLock(target, fd);
554 DBG(cerr <<
"DAP Cache: read_lock: " << target <<
"(" << status <<
")" << endl);
557 m_record_descriptor(target, fd);
580 bool status = createLockedFile(target, fd);
582 DBG(cerr <<
"DAP Cache: create_and_lock: " << target <<
"(" << status <<
")" << endl);
585 m_record_descriptor(target, fd);
609 lock.l_type = F_RDLCK;
610 lock.l_whence = SEEK_SET;
613 lock.l_pid = getpid();
615 if (fcntl(fd, F_SETLKW, &lock) == -1) {
616 throw InternalErr(__FILE__, __LINE__, get_errno());
630 DBG(cerr <<
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
632 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_WRLCK)) == -1) {
633 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to lock the cache-control file" + get_errno());
642 DBG(cerr <<
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
644 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_RDLCK)) == -1) {
645 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to lock the cache-control file" + get_errno());
656 DBG(cerr <<
"DAP Cache: unlock: cache_info (fd: " << d_cache_info_fd <<
")" << endl);
658 if (fcntl(d_cache_info_fd, F_SETLK, lock(F_UNLCK)) == -1) {
659 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to unlock the cache-control file" + get_errno());
676 DBG(cerr <<
"DAP Cache: unlock file: " << file_name << endl);
678 unlock(m_get_descriptor(file_name));
688 DBG(cerr <<
"DAP Cache: unlock fd: " << fd << endl);
692 DBG(cerr <<
"DAP Cache: unlock " << fd <<
" Success" << endl);
710 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
711 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
714 unsigned long long current_size;
715 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
716 throw InternalErr(__FILE__, __LINE__,
"Could not get read size info from the cache info file!");
719 int statret = stat(target.c_str(), &buf);
721 current_size += buf.st_size;
723 throw InternalErr(__FILE__, __LINE__,
"Could not read the size of the new file: " + target +
" : " + get_errno());
725 DBG(cerr <<
"DAP Cache: cache size updated to: " << current_size << endl);
727 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
728 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
730 if(write(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
731 throw InternalErr(__FILE__, __LINE__,
"Could not write size info from the cache info file!");
748 return current_size > d_max_cache_size_in_bytes;
763 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
764 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
766 unsigned long long current_size;
767 if(read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
768 throw InternalErr(__FILE__, __LINE__,
"Could not get read size info from the cache info file!");
786 unsigned long long DAPCache3::m_collect_cache_dir_info(
CacheFiles &contents)
788 DIR *dip = opendir(d_cache_dir.c_str());
790 throw InternalErr(__FILE__, __LINE__,
"Unable to open cache directory " + d_cache_dir);
793 vector<string> files;
796 while ((dit = readdir(dip)) != NULL) {
797 string dirEntry = dit->d_name;
798 if (dirEntry.compare(0, d_prefix.length(), d_prefix) == 0) {
799 files.push_back(d_cache_dir +
"/" + dirEntry);
805 unsigned long long current_size = 0;
807 for (vector<string>::iterator file = files.begin(); file != files.end(); ++file) {
808 if (stat(file->c_str(), &buf) == 0) {
809 current_size += buf.st_size;
812 entry.
size = buf.st_size;
813 entry.
time = buf.st_atime;
817 throw InternalErr(__FILE__, __LINE__,
"Zero-byte file found in cache. " + *file);
819 contents.push_back(entry);
824 contents.sort(entry_op);
842 DBG(cerr <<
"purge - starting the purge" << endl);
848 unsigned long long computed_size = m_collect_cache_dir_info(contents);
850 if (BESISDEBUG(
"cache_contents" )) {
851 DBG(endl <<
"BEFORE Purge " << computed_size/BYTES_PER_MEG << endl );
852 CacheFiles::iterator ti = contents.begin();
853 CacheFiles::iterator te = contents.end();
854 for (; ti != te; ti++) {
855 DBG((*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
859 DBG(cerr <<
"purge - current and target size (in MB) " << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl );
866 CacheFiles::iterator i = contents.begin();
867 while (i != contents.end() && computed_size > d_target_size) {
872 if (i->name != new_file && getExclusiveLockNB(i->name, cfile_fd)) {
873 DBG(cerr <<
"purge: " << i->name <<
" removed." << endl );
875 if (unlink(i->name.c_str()) != 0)
876 throw InternalErr(__FILE__, __LINE__,
"Unable to purge the file " + i->name +
" from the cache: " + get_errno());
879 computed_size -= i->size;
884 DBG(cerr <<
"purge: " << i->name <<
" is in use." << endl );
889 DBG(cerr <<
"purge - current and target size (in MB) " << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl );
894 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
895 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
897 if(write(d_cache_info_fd, &computed_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
898 throw InternalErr(__FILE__, __LINE__,
"Could not write size info to the cache info file!");
900 if (BESISDEBUG(
"cache_contents" )) {
902 computed_size = m_collect_cache_dir_info(contents);
903 DBG(endl <<
"AFTER Purge " << computed_size/BYTES_PER_MEG << endl );
904 CacheFiles::iterator ti = contents.begin();
905 CacheFiles::iterator te = contents.end();
906 for (; ti != te; ti++) {
907 DBG((*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
932 DBG(cerr <<
"purge_file - starting the purge" << endl);
939 if (getExclusiveLock(file, cfile_fd)) {
941 unsigned long long size = 0;
943 if (stat(file.c_str(), &buf) == 0) {
947 DBG(cerr <<
"purge_file: " << file <<
" removed." << endl );
949 if (unlink(file.c_str()) != 0)
951 "Unable to purge the file " + file +
" from the cache: " + get_errno());
957 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
958 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
960 if (write(d_cache_info_fd, &cache_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
961 throw InternalErr(__FILE__, __LINE__,
"Could not write size info to the cache info file!");
981 strm << DapIndent::LMarg <<
"DAPCache3::dump - (" << (
void *)
this <<
")" << endl;
983 strm << DapIndent::LMarg <<
"cache dir: " << d_cache_dir << endl;
984 strm << DapIndent::LMarg <<
"prefix: " << d_prefix << endl;
985 strm << DapIndent::LMarg <<
"size (bytes): " << d_max_cache_size_in_bytes << endl;
986 DapIndent::UnIndent();
virtual bool get_read_lock(const string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual void dump(ostream &strm) const
dumps information about this object
virtual void purge_file(const string &file)
Purge a single file from the cache.
Implementation of a caching mechanism for compressed data. This cache uses simple advisory locking fo...
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big...
virtual unsigned long long get_cache_size()
Get the cache size. Read the size information from the cache info file and return it...
static DAPCache3 * get_instance()
virtual unsigned long long update_cache_info(const string &target)
Update the cache info file to include 'target'.
std::list< cache_entry > CacheFiles
A class for software fault reporting.
virtual void unlock_cache()
virtual bool create_and_lock(const string &target, int &fd)
Create a file in the cache and lock it for write access. If the file does not exist, make it, open it for read-write access and get an exclusive lock on it. The locking operation blocks, although that should never happen.
virtual void unlock_and_close(const string &target)
virtual void lock_cache_read()
virtual void update_and_purge(const string &new_file)
Purge files from the cache.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock. If the file has an exclusive write lock on it...
string get_cache_file_name(const string &src, bool mangle=true)
virtual void lock_cache_write()