00001
00002 #include <memory>
00003 #include <string>
00004 #include <vector>
00005
00006 #include "XrdSys/XrdSysPthread.hh"
00007
00008 #include "XrdHttp/XrdHttpExtHandler.hh"
00009 #include "XrdHttp/XrdHttpUtils.hh"
00010
00011 class XrdOucErrInfo;
00012 class XrdOucStream;
00013 class XrdSfsFile;
00014 class XrdSfsFileSystem;
00015 typedef void CURL;
00016
00017 namespace TPC {
00018 class State;
00019
00020 enum LogMask {
00021 Debug = 0x01,
00022 Info = 0x02,
00023 Warning = 0x04,
00024 Error = 0x08,
00025 All = 0xff
00026 };
00027
00028 class TPCHandler : public XrdHttpExtHandler {
00029 public:
00030 TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv);
00031 virtual ~TPCHandler();
00032
00033 virtual bool MatchesPath(const char *verb, const char *path);
00034 virtual int ProcessReq(XrdHttpExtReq &req);
00035
00036 virtual int Init(const char *cfgfile) {return 0;}
00037
00038 private:
00039
00040 struct TPCLogRecord {
00041
00042 TPCLogRecord() : status( -1 ),
00043 tpc_status(-1),
00044 streams( 1 ),
00045 bytes_transferred( -1 )
00046 {
00047 }
00048
00049 std::string log_prefix;
00050 std::string local;
00051 std::string remote;
00052 std::string name;
00053 int status;
00054 int tpc_status;
00055 unsigned streams;
00056 off_t bytes_transferred;
00057 };
00058
00059 int ProcessOptionsReq(XrdHttpExtReq &req);
00060
00061 static std::string GetAuthz(XrdHttpExtReq &req);
00062
00063
00064 int RedirectTransfer(CURL *curl, const std::string &redirect_resource, XrdHttpExtReq &req,
00065 XrdOucErrInfo &error, TPCLogRecord &);
00066
00067 int OpenWaitStall(XrdSfsFile &fh, const std::string &resource, int mode,
00068 int openMode, const XrdSecEntity &sec,
00069 const std::string &authz);
00070
00071 #ifdef XRD_CHUNK_RESP
00072 int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
00073 bool &success, TPCLogRecord &);
00074
00075
00076
00077
00078
00079 int SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state);
00080 int SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
00081 off_t bytes_transferred);
00082
00083
00084 int RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
00085 TPCLogRecord &rec);
00086
00087
00088 int RunCurlWithStreams(XrdHttpExtReq &req, TPC::State &state,
00089 size_t streams, TPCLogRecord &rec);
00090 int RunCurlWithStreamsImpl(XrdHttpExtReq &req, TPC::State &state,
00091 size_t streams, std::vector<TPC::State*> streams_handles,
00092 TPCLogRecord &rec);
00093 #else
00094 int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
00095 const char *log_prefix);
00096 #endif
00097
00098 int ProcessPushReq(const std::string & resource, XrdHttpExtReq &req);
00099 int ProcessPullReq(const std::string &resource, XrdHttpExtReq &req);
00100
00101 bool ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt,
00102 std::string &path2, bool &path2_alt);
00103 bool Configure(const char *configfn, XrdOucEnv *myEnv);
00104 bool ConfigureLogger(XrdOucStream &Config);
00105
00106
00107 void logTransferEvent(LogMask lvl, const TPCLogRecord &record,
00108 const std::string &event, const std::string &message="");
00109
00110 static int m_marker_period;
00111 static size_t m_block_size;
00112 bool m_desthttps;
00113 std::string m_cadir;
00114 static XrdSysMutex m_monid_mutex;
00115 static uint64_t m_monid;
00116 XrdSysError m_log;
00117 XrdSfsFileSystem *m_sfs;
00118
00119
00120
00121
00122 #ifdef USE_PIPELINING
00123 static const int m_pipelining_multiplier = 16;
00124 #else
00125 static const int m_pipelining_multiplier = 1;
00126 #endif
00127 };
00128 }