52 static const unsigned long long BYTES_PER_MEG = 1048576ULL;
56 static const unsigned long long MAX_CACHE_SIZE_IN_MEGABYTES = (1ULL << 44);
77 d_instance =
new BESCache3(keys, cache_dir_key, prefix_key, size_key);
89 d_instance =
new BESCache3(cache_dir, prefix, size);
101 throw BESInternalError(
"Tried to get the BESCache3 instance, but it hasn't been created yet", __FILE__, __LINE__);
106 static inline string get_errno() {
107 char *s_err = strerror(errno);
111 return "Unknown error.";
117 static inline struct flock *lock(
int type) {
118 static struct flock lock;
120 lock.l_whence = SEEK_SET;
123 lock.l_pid = getpid();
128 inline void BESCache3::m_record_descriptor(
const string &file,
int fd) {
129 BESDEBUG(
"cache",
"BES Cache: recording descriptor: " << file <<
", " << fd << endl);
130 d_locks.insert(std::pair<string, int>(file, fd));
133 inline int BESCache3::m_get_descriptor(
const string &file) {
134 FilesAndLockDescriptors::iterator i = d_locks.find(file);
136 BESDEBUG(
"cache",
"BES Cache: getting descriptor: " << file <<
", " << fd << endl);
146 static void unlock(
int fd)
148 if (fcntl(fd, F_SETLK, lock(F_UNLCK)) == -1) {
149 throw BESInternalError(
"An error occurred trying to unlock the file" + get_errno(), __FILE__, __LINE__);
153 throw BESInternalError(
"Could not close the (just) unlocked file.", __FILE__, __LINE__);
168 static bool getSharedLock(
const string &file_name,
int &ref_fd)
170 BESDEBUG(
"cache_internal",
"getSharedLock: " << file_name <<endl);
173 if ((fd = open(file_name.c_str(), O_RDONLY)) < 0) {
183 struct flock *l = lock(F_RDLCK);
184 if (fcntl(fd, F_SETLKW, l) == -1) {
187 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
191 BESDEBUG(
"cache_internal",
"getSharedLock exit: " << file_name <<endl);
210 static bool getExclusiveLock(
string file_name,
int &ref_fd)
212 BESDEBUG(
"cache_internal",
"getExclusiveLock: " << file_name <<endl);
215 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
225 struct flock *l = lock(F_WRLCK);
226 if (fcntl(fd, F_SETLKW, l) == -1) {
229 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
233 BESDEBUG(
"cache_internal",
"getExclusiveLock exit: " << file_name <<endl);
251 static bool getExclusiveLockNB(
string file_name,
int &ref_fd)
253 BESDEBUG(
"cache_internal",
"getExclusiveLock_nonblocking: " << file_name <<endl);
256 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
266 struct flock *l = lock(F_WRLCK);
267 if (fcntl(fd, F_SETLK, l) == -1) {
270 BESDEBUG(
"cache_internal",
"getExclusiveLock_nonblocking exit (false): " << file_name <<
" by: " << l->l_pid << endl);
277 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
283 BESDEBUG(
"cache_internal",
"getExclusiveLock_nonblocking exit (true): " << file_name <<endl);
303 static bool createLockedFile(
string file_name,
int &ref_fd)
305 BESDEBUG(
"cache_internal",
"createLockedFile: " << file_name <<endl);
308 if ((fd = open(file_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666)) < 0) {
318 struct flock *l = lock(F_WRLCK);
319 if (fcntl(fd, F_SETLKW, l) == -1) {
322 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
326 BESDEBUG(
"cache_internal",
"createLockedFile exit: " << file_name <<endl);
334 void BESCache3::m_check_ctor_params()
336 if (d_cache_dir.empty()) {
337 string err =
"The cache directory was not specified, must be non-empty";
342 int statret = stat(d_cache_dir.c_str(), &buf);
343 if (statret != 0 || !S_ISDIR(buf.st_mode)) {
344 string err =
"The cache directory " + d_cache_dir +
" does not exist";
348 if (d_prefix.empty()) {
349 string err =
"The cache file prefix was not specified, must not be empty";
353 if (d_max_cache_size_in_bytes <= 0) {
354 string err =
"The cache size was not specified, must be greater than zero";
360 if (d_max_cache_size_in_bytes > MAX_CACHE_SIZE_IN_MEGABYTES) {
361 std::ostringstream msg;
362 msg <<
"The specified cache size was larger than the max cache size of: " << MAX_CACHE_SIZE_IN_MEGABYTES;
366 BESDEBUG(
"cache_internal",
"BES Cache: directory " << d_cache_dir
367 <<
", prefix " << d_prefix
368 <<
", max size " << d_max_cache_size_in_bytes << endl );
372 void BESCache3::m_initialize_cache_info()
374 d_max_cache_size_in_bytes = min(d_max_cache_size_in_bytes, MAX_CACHE_SIZE_IN_MEGABYTES);
375 if (d_max_cache_size_in_bytes > MAX_CACHE_SIZE_IN_MEGABYTES)
376 *(
BESLog::TheLog()) <<
"Cache size too big in configuration file, set to max limit." << endl ;
378 d_max_cache_size_in_bytes *= BYTES_PER_MEG;
379 d_target_size = d_max_cache_size_in_bytes * 0.8;
381 m_check_ctor_params();
383 d_cache_info = d_cache_dir +
"/bes.cache.info";
387 if (createLockedFile(d_cache_info, d_cache_info_fd)) {
389 unsigned long long size = 0;
390 if (write(d_cache_info_fd, &size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
391 throw BESInternalError(
"Could not write size info to the cache info file in startup!", __FILE__, __LINE__);
397 if ((d_cache_info_fd = open(d_cache_info.c_str(), O_RDWR)) == -1) {
402 BESDEBUG(
"cache_internal",
"d_cache_info_fd: " << d_cache_info_fd << endl);
419 BESCache3::BESCache3(
BESKeys *keys,
const string &cache_dir_key,
const string &prefix_key,
const string &size_key) :
420 d_max_cache_size_in_bytes(0)
423 keys->
get_value(cache_dir_key, d_cache_dir, found);
425 throw BESSyntaxUserError(
"The cache directory key " + cache_dir_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
428 keys->
get_value(prefix_key, d_prefix, found);
430 throw BESSyntaxUserError(
"The prefix key " + prefix_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
433 string cache_size_str;
434 keys->
get_value(size_key, cache_size_str, found);
436 throw BESSyntaxUserError(
"The size key " + size_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
438 std::istringstream is(cache_size_str);
439 is >> d_max_cache_size_in_bytes;
441 m_initialize_cache_info();
450 BESCache3::BESCache3(
const string &cache_dir,
const string &prefix,
unsigned long size) :
451 d_cache_dir(cache_dir), d_prefix(prefix), d_max_cache_size_in_bytes(size)
453 m_initialize_cache_info();
468 if (target.at(0) ==
'/') {
469 target = src.substr(1, target.length() - 1);
471 string::size_type slash = 0;
472 while ((slash = target.find(
'/')) != string::npos) {
473 target.replace(slash, 1, 1, BESCache3::BES_CACHE_CHAR);
475 string::size_type last_dot = target.rfind(
'.');
476 if (last_dot != string::npos) {
477 target = target.substr(0, last_dot);
480 return d_cache_dir +
"/" + d_prefix + BESCache3::BES_CACHE_CHAR + target;
504 bool status = getSharedLock(target, fd);
506 BESDEBUG(
"cache_internal",
"BES Cache: read_lock: " << target <<
"(" << status <<
")" << endl);
509 m_record_descriptor(target, fd);
532 bool status = createLockedFile(target, fd);
534 BESDEBUG(
"cache_internal",
"BES Cache: create_and_lock: " << target <<
"(" << status <<
")" << endl);
537 m_record_descriptor(target, fd);
561 lock.l_type = F_RDLCK;
562 lock.l_whence = SEEK_SET;
565 lock.l_pid = getpid();
567 if (fcntl(fd, F_SETLKW, &lock) == -1) {
582 BESDEBUG(
"cache_internal",
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
584 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_WRLCK)) == -1) {
585 throw BESInternalError(
"An error occurred trying to lock the cache-control file" + get_errno(), __FILE__, __LINE__);
594 BESDEBUG(
"cache_internal",
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
596 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_RDLCK)) == -1) {
597 throw BESInternalError(
"An error occurred trying to lock the cache-control file" + get_errno(), __FILE__, __LINE__);
608 BESDEBUG(
"cache_internal",
"BES Cache: unlock: cache_info (fd: " << d_cache_info_fd <<
")" << endl);
610 if (fcntl(d_cache_info_fd, F_SETLK, lock(F_UNLCK)) == -1) {
611 throw BESInternalError(
"An error occurred trying to unlock the cache-control file" + get_errno(), __FILE__, __LINE__);
628 BESDEBUG(
"cache_internal",
"BES Cache: unlock file: " << file_name << endl);
630 unlock(m_get_descriptor(file_name));
640 BESDEBUG(
"cache_internal",
"BES Cache: unlock fd: " << fd << endl);
644 BESDEBUG(
"cache_internal",
"BES Cache: unlock " << fd <<
" Success" << endl);
662 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
663 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
666 unsigned long long current_size;
667 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
668 throw BESInternalError(
"Could not get read size info from the cache info file!", __FILE__, __LINE__);
671 int statret = stat(target.c_str(), &buf);
673 current_size += buf.st_size;
675 throw BESInternalError(
"Could not read the size of the new file: " + target +
" : " + get_errno(), __FILE__, __LINE__);
677 BESDEBUG(
"cache_internal",
"BES Cache: cache size updated to: " << current_size << endl);
679 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
680 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
682 if(write(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
683 throw BESInternalError(
"Could not write size info from the cache info file!", __FILE__, __LINE__);
700 return current_size > d_max_cache_size_in_bytes;
715 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
716 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
718 unsigned long long current_size;
719 if(read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
720 throw BESInternalError(
"Could not get read size info from the cache info file!", __FILE__, __LINE__);
738 unsigned long long BESCache3::m_collect_cache_dir_info(
CacheFiles &contents)
740 DIR *dip = opendir(d_cache_dir.c_str());
742 throw BESInternalError(
"Unable to open cache directory " + d_cache_dir, __FILE__, __LINE__);
745 vector<string> files;
748 while ((dit = readdir(dip)) != NULL) {
749 string dirEntry = dit->d_name;
750 if (dirEntry.compare(0, d_prefix.length(), d_prefix) == 0) {
751 files.push_back(d_cache_dir +
"/" + dirEntry);
757 unsigned long long current_size = 0;
759 for (vector<string>::iterator file = files.begin(); file != files.end(); ++file) {
760 if (stat(file->c_str(), &buf) == 0) {
761 current_size += buf.st_size;
764 entry.
size = buf.st_size;
765 entry.
time = buf.st_atime;
769 throw BESInternalError(
"Zero-byte file found in cache. " + *file, __FILE__, __LINE__);
771 contents.push_back(entry);
776 contents.sort(entry_op);
794 BESDEBUG(
"cache_purge",
"purge - starting the purge" << endl);
800 unsigned long long computed_size = m_collect_cache_dir_info(contents);
803 BESDEBUG(
"cache_contents", endl <<
"BEFORE Purge " << computed_size/BYTES_PER_MEG << endl );
804 CacheFiles::iterator ti = contents.begin();
805 CacheFiles::iterator te = contents.end();
806 for (; ti != te; ti++) {
807 BESDEBUG(
"cache_contents", (*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
811 BESDEBUG(
"cache_purge",
"purge - current and target size (in MB) " << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl );
818 CacheFiles::iterator i = contents.begin();
819 while (i != contents.end() && computed_size > d_target_size) {
824 if (i->name != new_file && getExclusiveLockNB(i->name, cfile_fd)) {
825 BESDEBUG(
"cache_purge",
"purge: " << i->name <<
" removed." << endl );
827 if (unlink(i->name.c_str()) != 0)
828 throw BESInternalError(
"Unable to purge the file " + i->name +
" from the cache: " + get_errno(), __FILE__, __LINE__);
831 computed_size -= i->size;
836 BESDEBUG(
"cache_purge",
"purge: " << i->name <<
" is in use." << endl );
841 BESDEBUG(
"cache_purge",
"purge - current and target size (in MB) " << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl );
846 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
847 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
849 if(write(d_cache_info_fd, &computed_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
850 throw BESInternalError(
"Could not write size info to the cache info file!", __FILE__, __LINE__);
854 computed_size = m_collect_cache_dir_info(contents);
855 BESDEBUG(
"cache_contents", endl <<
"AFTER Purge " << computed_size/BYTES_PER_MEG << endl );
856 CacheFiles::iterator ti = contents.begin();
857 CacheFiles::iterator te = contents.end();
858 for (; ti != te; ti++) {
859 BESDEBUG(
"cache_contents", (*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
880 strm <<
BESIndent::LMarg <<
"BESCache3::dump - (" << (
void *)
this <<
")" << endl;
884 strm <<
BESIndent::LMarg <<
"size (bytes): " << d_max_cache_size_in_bytes << endl;
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual void dump(ostream &strm) const
dumps information about this object
#define BESISDEBUG(x)
macro used to determine if the specified debug context is set
std::list< cache_entry > CacheFiles
exception thrown if inernal error encountered
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big...
virtual void lock_cache_write()
Get an exclusive lock on the 'cache info' file.
virtual void unlock_and_close(const string &target)
Unlock the named file.
error thrown if there is a user syntax error in the request or any other user error ...
virtual void lock_cache_read()
Get a shared lock on the 'cache info' file.
mapping of key/value pairs defining different behaviors of an application.
Implementation of a caching mechanism for compressed data.
static ostream & LMarg(ostream &strm)
virtual unsigned long long update_cache_info(const string &target)
Update the cache info file to include 'target'.
virtual bool create_and_lock(const string &target, int &fd)
Create a file in the cache and lock it for write access.
void get_value(const string &s, string &val, bool &found)
Retrieve the value of a given key, if set.
virtual bool get_read_lock(const string &target, int &fd)
Get a read-only lock on the file if it exists.
string get_cache_file_name(const string &src)
Build the name of file that will holds the uncompressed data from 'src' in the cache.
virtual void update_and_purge(const string &new_file)
Purge files from the cache.
virtual void unlock_cache()
Unlock the cache info file.
#define BESDEBUG(x, y)
macro used to send debug information to the debug stream
virtual unsigned long long get_cache_size()
Get the cache size.
static BESCache3 * get_instance()
Get an instance of the BESCache3 object.