00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <unistd.h>
00034 #include <string>
00035 #include <sstream>
00036 #include <iostream>
00037
00038 using std::string;
00039 using std::ostringstream;
00040 using std::bad_alloc;
00041 using std::cout;
00042
00043 #include "BESInterface.h"
00044
00045 #include "BESStatusReturn.h"
00046 #include "TheBESKeys.h"
00047 #include "BESResponseHandler.h"
00048 #include "BESAggFactory.h"
00049 #include "BESAggregationServer.h"
00050 #include "BESReporterList.h"
00051
00052 #include "BESExceptionManager.h"
00053 #include "BESHandlerException.h"
00054 #include "BESMemoryException.h"
00055 #include "BESAggregationException.h"
00056
00057 #include "BESDataNames.h"
00058
00059 #include "BESDebug.h"
00060 #include "BESTransmitException.h"
00061
00062 #include "BESLog.h"
00063
00064 list < p_bes_init > BESInterface::_init_list;
00065 list < p_bes_end > BESInterface::_end_list;
00066
00067 BESInterface::BESInterface( ostream *output_stream )
00068 : _transmitter(0)
00069 {
00070 if( !output_stream )
00071 {
00072 string err = "output stream must be set in order to output responses" ;
00073 throw BESException( err, __FILE__, __LINE__ ) ;
00074 }
00075 _dhi.set_output_stream( output_stream ) ;
00076 }
00077
00078 BESInterface::~BESInterface()
00079 {
00080 }
00081
00109 int
00110 BESInterface::execute_request( const string &from )
00111 {
00112 _dhi.data[REQUEST_FROM] = from ;
00113
00114 pid_t thepid = getpid() ;
00115 ostringstream ss ;
00116 ss << thepid ;
00117 _dhi.data[SERVER_PID] = ss.str() ;
00118
00119 *(BESLog::TheLog()) << _dhi.data[SERVER_PID]
00120 << " from " << _dhi.data[REQUEST_FROM]
00121 << " [" << _dhi.data[DATA_REQUEST] << "]"
00122 << endl ;
00123
00124 int status = 0;
00125
00126 try {
00127 initialize();
00128 validate_data_request();
00129 build_data_request_plan();
00130 execute_data_request_plan();
00131 invoke_aggregation();
00132 }
00133 catch(BESException & ex) {
00134 status = exception_manager(ex);
00135 }
00136 catch(bad_alloc & b) {
00137 string serr = "BES out of memory";
00138 BESMemoryException ex(serr, __FILE__, __LINE__);
00139 status = exception_manager(ex);
00140 }
00141 catch(...) {
00142 string serr = "An undefined exception has been thrown";
00143 BESException ex(serr, __FILE__, __LINE__);
00144 status = exception_manager(ex);
00145 }
00146
00147 try {
00148 transmit_data();
00149 log_status();
00150 report_request();
00151 end_request();
00152 if (_dhi.error_info)
00153 delete _dhi.error_info;
00154 _dhi.error_info = 0;
00155 }
00156 catch(BESException & ex) {
00157 status = exception_manager(ex);
00158 }
00159 catch(bad_alloc & b) {
00160 string serr = "BES out of memory";
00161 BESMemoryException ex(serr, __FILE__, __LINE__);
00162 status = exception_manager(ex);
00163 }
00164 catch(...) {
00165 string serr = "An undefined exception has been thrown";
00166 BESException ex(serr, __FILE__, __LINE__);
00167 status = exception_manager(ex);
00168 }
00169
00170
00171
00172 if (_dhi.error_info) {
00173 _dhi.error_info->print(cout);
00174 }
00175
00176 return status;
00177 }
00178
00179 void
00180 BESInterface::add_init_callback(p_bes_init init)
00181 {
00182 _init_list.push_back(init);
00183 }
00184
00190 void
00191 BESInterface::initialize()
00192 {
00193 BESDEBUG("bes", "Initializing request: " << _dhi.data[DATA_REQUEST] << " ... ")
00194 bool do_continue = true;
00195 init_iter i = _init_list.begin();
00196
00197 for (; i != _init_list.end() && do_continue == true; i++) {
00198 p_bes_init p = *i;
00199 do_continue = p(_dhi);
00200 }
00201
00202 if (!do_continue) {
00203 BESDEBUG("bes", "FAILED" << endl)
00204 string se = "Initialization callback failed, exiting";
00205 throw BESException(se, __FILE__, __LINE__);
00206 } else {
00207 BESDEBUG("bes", "OK" << endl)
00208 }
00209 }
00210
00213 void
00214 BESInterface::validate_data_request()
00215 {
00216 }
00217
00226 void
00227 BESInterface::build_data_request_plan()
00228 {
00229 }
00230
00244 void
00245 BESInterface::execute_data_request_plan()
00246 {
00247 BESDEBUG("bes", "Executing request: " << _dhi.data[DATA_REQUEST] << " ... ")
00248 BESResponseHandler *rh = _dhi.response_handler;
00249 if (rh) {
00250 rh->execute(_dhi);
00251 } else {
00252 BESDEBUG("bes", "FAILED" << endl)
00253 string se = "The response handler \"" + _dhi.action
00254 + "\" does not exist";
00255 throw BESHandlerException(se, __FILE__, __LINE__);
00256 }
00257 BESDEBUG("bes", "OK" << endl)
00258 }
00259
00262 void
00263 BESInterface::invoke_aggregation()
00264 {
00265 if (_dhi.data[AGG_CMD] != "") {
00266 BESDEBUG("bes", "aggregating with: " << _dhi.data[AGG_CMD] << " ... ")
00267 BESAggregationServer *agg =
00268 BESAggFactory::TheFactory()->find_handler(_dhi.
00269 data[AGG_HANDLER]);
00270 if (agg) {
00271 agg->aggregate(_dhi);
00272 } else {
00273 BESDEBUG("bes", "FAILED" << endl)
00274 string se = "The aggregation handler " + _dhi.data[AGG_HANDLER]
00275 + "does not exist";
00276 throw BESAggregationException(se, __FILE__, __LINE__);
00277 }
00278 BESDEBUG("bes", "OK" << endl)
00279 }
00280 }
00281
00295 void
00296 BESInterface::transmit_data()
00297 {
00298 BESDEBUG("bes", "Transmitting request: " << _dhi.data[DATA_REQUEST] << endl)
00299 if (_transmitter) {
00300 if (_dhi.error_info) {
00301 BESDEBUG( "bes", " transmitting error info using transmitter ... " )
00302 _dhi.error_info->transmit(_transmitter, _dhi);
00303 } else if (_dhi.response_handler) {
00304 BESDEBUG( "bes", " transmitting response using transmitter ... " )
00305 _dhi.response_handler->transmit(_transmitter, _dhi);
00306 }
00307 } else {
00308 if (_dhi.error_info) {
00309 BESDEBUG( "bes", " transmitting error info using cout ... " )
00310 _dhi.error_info->print(cout);
00311 } else {
00312 BESDEBUG( "bes", " Unable to transmit the response ... FAILED " )
00313 string err = "Unable to transmit the response, no transmitter" ;
00314 throw BESTransmitException( err, __FILE__, __LINE__ ) ;
00315 }
00316 }
00317 BESDEBUG("bes", "OK" << endl)
00318 }
00319
00322 void
00323 BESInterface::log_status()
00324 {
00325 }
00326
00338 void
00339 BESInterface::report_request()
00340 {
00341 BESDEBUG("bes", "Reporting on request: " << _dhi.
00342 data[DATA_REQUEST] << " ... ")
00343 BESReporterList::TheList()->report(_dhi);
00344 BESDEBUG("bes", "OK" << endl)
00345 }
00346
00347 void
00348 BESInterface::add_end_callback(p_bes_end end)
00349 {
00350 _end_list.push_back(end);
00351 }
00352
00358 void
00359 BESInterface::end_request()
00360 {
00361 BESDEBUG("bes", "Ending request: " << _dhi.data[DATA_REQUEST] << " ... ")
00362 end_iter i = _end_list.begin();
00363 for (; i != _end_list.end(); i++) {
00364 p_bes_end p = *i;
00365 p(_dhi);
00366 }
00367 BESDEBUG("bes", "OK" << endl)
00368 }
00369
00372 void
00373 BESInterface::clean()
00374 {
00375 if (_dhi.response_handler)
00376 delete _dhi.response_handler;
00377 _dhi.response_handler = 0;
00378 }
00379
00392 int
00393 BESInterface::exception_manager(BESException & e)
00394 {
00395 return BESExceptionManager::TheEHM()->handle_exception(e, _dhi);
00396 }
00397
00406 void
00407 BESInterface::dump(ostream & strm) const
00408 {
00409 strm << BESIndent::LMarg << "BESInterface::dump - ("
00410 << (void *) this << ")" << endl;
00411 BESIndent::Indent();
00412
00413 if (_init_list.size()) {
00414 strm << BESIndent::LMarg << "termination functions:" << endl;
00415 BESIndent::Indent();
00416 init_iter i = _init_list.begin();
00417 for (; i != _init_list.end(); i++) {
00418 strm << BESIndent::LMarg << (void *) (*i) << endl;
00419 }
00420 BESIndent::UnIndent();
00421 } else {
00422 strm << BESIndent::LMarg << "termination functions: none" << endl;
00423 }
00424
00425 if (_end_list.size()) {
00426 strm << BESIndent::LMarg << "termination functions:" << endl;
00427 BESIndent::Indent();
00428 end_iter i = _end_list.begin();
00429 for (; i != _end_list.end(); i++) {
00430 strm << BESIndent::LMarg << (void *) (*i) << endl;
00431 }
00432 BESIndent::UnIndent();
00433 } else {
00434 strm << BESIndent::LMarg << "termination functions: none" << endl;
00435 }
00436
00437 strm << BESIndent::LMarg << "data handler interface:" << endl;
00438 BESIndent::Indent();
00439 _dhi.dump(strm);
00440 BESIndent::UnIndent();
00441
00442 if (_transmitter) {
00443 strm << BESIndent::LMarg << "transmitter:" << endl;
00444 BESIndent::Indent();
00445 _transmitter->dump(strm);
00446 BESIndent::UnIndent();
00447 } else {
00448 strm << BESIndent::LMarg << "transmitter: not set" << endl;
00449 }
00450 BESIndent::UnIndent();
00451 }