xrootd
XrdClXRootDMsgHandler.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
27 
30 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClMessage.hh"
32 #include "XProtocol/XProtocol.hh"
33 #include "XrdCl/XrdClLog.hh"
34 #include "XrdCl/XrdClConstants.hh"
35 
36 #include <sys/uio.h>
37 
38 #include <list>
39 #include <memory>
40 
41 namespace XrdCl
42 {
43  class PostMaster;
44  class SIDManager;
45  class URL;
46  class LocalFileHandler;
47 
48  //----------------------------------------------------------------------------
49  // Single entry in the redirect-trace-back
50  //----------------------------------------------------------------------------
52  {
53  RedirectEntry( const URL &from, const URL &to ) : from( from ), to( to )
54  {
55 
56  }
57 
61 
62  std::string ToString( bool prevok = true )
63  {
64  const std::string tostr = to.GetLocation();
65  const std::string fromstr = from.GetLocation();
66 
67  if( prevok )
68  {
69  if( tostr == fromstr )
70  return "Retrying: " + tostr;
71  return "Redirected from: " + fromstr + " to: " + tostr;
72  }
73  return "Failed at: " + fromstr + ", retrying at: " + tostr;
74  }
75  };
76 
77  //----------------------------------------------------------------------------
79  //----------------------------------------------------------------------------
81  public OutgoingMsgHandler
82  {
83  friend class HandleRspJob;
84 
85  public:
86  //------------------------------------------------------------------------
95  //------------------------------------------------------------------------
97  ResponseHandler *respHandler,
98  const URL *url,
99  SIDManager *sidMgr,
100  LocalFileHandler *lFileHandler):
101  pRequest( msg ),
102  pResponse( 0 ),
103  pResponseHandler( respHandler ),
104  pUrl( *url ),
105  pSidMgr( sidMgr ),
106  pLFileHandler( lFileHandler ),
107  pExpiration( 0 ),
108  pRedirectAsAnswer( false ),
109  pHosts( 0 ),
110  pHasLoadBalancer( false ),
111  pHasSessionId( false ),
112  pChunkList( 0 ),
113  pRedirectCounter( 0 ),
114 
115  pAsyncOffset( 0 ),
116  pAsyncReadSize( 0 ),
117  pAsyncReadBuffer( 0 ),
118  pAsyncMsgSize( 0 ),
119 
120  pReadRawStarted( false ),
122 
123  pReadVRawMsgOffset( 0 ),
124  pReadVRawChunkHeaderDone( false ),
126  pReadVRawSizeError( false ),
127  pReadVRawChunkIndex( 0 ),
128  pReadVRawMsgDiscard( false ),
129 
130  pOtherRawStarted( false ),
131 
132  pFollowMetalink( false ),
133 
134  pStateful( false ),
135 
136  pAggregatedWaitTime( 0 ),
137 
138  pMsgInFly( false )
139 
140  {
142  if( msg->GetSessionId() )
143  pHasSessionId = true;
144  memset( &pReadVRawChunkHeader, 0, sizeof( readahead_list ) );
145 
146  Log *log = DefaultEnv::GetLog();
147  log->Debug( ExDbgMsg, "[%s] MsgHandler created: 0x%x (message: %s ).",
148  pUrl.GetHostId().c_str(), this,
149  pRequest->GetDescription().c_str() );
150  }
151 
152  //------------------------------------------------------------------------
154  //------------------------------------------------------------------------
156  {
158 
159  if( !pHasSessionId )
160  delete pRequest;
161  delete pResponse;
162  std::vector<Message *>::iterator it;
163  for( it = pPartialResps.begin(); it != pPartialResps.end(); ++it )
164  delete *it;
165 
166  pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
167  pResponse = reinterpret_cast<Message*>( 0xDEADBEEF );
168  pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
169  pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
170  pSidMgr = reinterpret_cast<SIDManager*>( 0xDEADBEEF );
171  pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
172  pHosts = reinterpret_cast<HostList*>( 0xDEADBEEF );
173  pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
174 
175  Log *log = DefaultEnv::GetLog();
176  log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: 0x%x.",
177  pUrl.GetHostId().c_str(), this );
178  }
179 
180  //------------------------------------------------------------------------
186  //------------------------------------------------------------------------
187  virtual uint16_t Examine( Message *msg );
188 
189  //------------------------------------------------------------------------
193  //------------------------------------------------------------------------
194  virtual uint16_t GetSid() const;
195 
196  //------------------------------------------------------------------------
200  //------------------------------------------------------------------------
201  virtual void Process( Message *msg );
202 
203  //------------------------------------------------------------------------
213  //------------------------------------------------------------------------
214  virtual Status ReadMessageBody( Message *msg,
215  int socket,
216  uint32_t &bytesRead );
217 
218  //------------------------------------------------------------------------
224  //------------------------------------------------------------------------
225  virtual uint8_t OnStreamEvent( StreamEvent event,
226  uint16_t streamNum,
227  Status status );
228 
229  //------------------------------------------------------------------------
231  //------------------------------------------------------------------------
232  virtual void OnStatusReady( const Message *message,
233  Status status );
234 
235  //------------------------------------------------------------------------
237  //------------------------------------------------------------------------
238  virtual bool IsRaw() const;
239 
240  //------------------------------------------------------------------------
249  //------------------------------------------------------------------------
250  Status WriteMessageBody( int socket,
251  uint32_t &bytesRead );
252 
253  //------------------------------------------------------------------------
258  //------------------------------------------------------------------------
259  ChunkList* GetMessageBody( uint32_t *&asyncOffset )
260  {
261  asyncOffset = &pAsyncOffset;
262  return pChunkList;
263  }
264 
265  //------------------------------------------------------------------------
269  //------------------------------------------------------------------------
270  void WaitDone( time_t now );
271 
272  //------------------------------------------------------------------------
274  //------------------------------------------------------------------------
275  void SetExpiration( time_t expiration )
276  {
277  pExpiration = expiration;
278  }
279 
280  //------------------------------------------------------------------------
283  //------------------------------------------------------------------------
284  void SetRedirectAsAnswer( bool redirectAsAnswer )
285  {
286  pRedirectAsAnswer = redirectAsAnswer;
287  }
288 
289  //------------------------------------------------------------------------
291  //------------------------------------------------------------------------
292  const Message *GetRequest() const
293  {
294  return pRequest;
295  }
296 
297  //------------------------------------------------------------------------
299  //------------------------------------------------------------------------
300  void SetLoadBalancer( const HostInfo &loadBalancer )
301  {
302  if( !loadBalancer.url.IsValid() )
303  return;
304  pLoadBalancer = loadBalancer;
305  pHasLoadBalancer = true;
306  }
307 
308  //------------------------------------------------------------------------
310  //------------------------------------------------------------------------
311  void SetHostList( HostList *hostList )
312  {
313  delete pHosts;
314  pHosts = hostList;
315  }
316 
317  //------------------------------------------------------------------------
319  //------------------------------------------------------------------------
320  void SetChunkList( ChunkList *chunkList )
321  {
322  pChunkList = chunkList;
323  if( chunkList )
324  pChunkStatus.resize( chunkList->size() );
325  else
326  pChunkStatus.clear();
327  }
328 
329  //------------------------------------------------------------------------
331  //------------------------------------------------------------------------
332  void SetRedirectCounter( uint16_t redirectCounter )
333  {
334  pRedirectCounter = redirectCounter;
335  }
336 
337  void SetFollowMetalink( bool followMetalink )
338  {
339  pFollowMetalink = followMetalink;
340  }
341 
342  void SetStateful( bool stateful )
343  {
344  pStateful = stateful;
345  }
346 
347  private:
348  //------------------------------------------------------------------------
350  //------------------------------------------------------------------------
351  Status ReadRawRead( Message *msg,
352  int socket,
353  uint32_t &bytesRead );
354 
355  //------------------------------------------------------------------------
357  //------------------------------------------------------------------------
359  int socket,
360  uint32_t &bytesRead );
361 
362  //------------------------------------------------------------------------
364  //------------------------------------------------------------------------
366  int socket,
367  uint32_t &bytesRead );
368 
369  //------------------------------------------------------------------------
372  //------------------------------------------------------------------------
373  Status ReadAsync( int socket, uint32_t &btesRead );
374 
375  //------------------------------------------------------------------------
377  //------------------------------------------------------------------------
378  void HandleError( Status status, Message *msg = 0 );
379 
380  //------------------------------------------------------------------------
382  //------------------------------------------------------------------------
383  Status RetryAtServer( const URL &url );
384 
385  //------------------------------------------------------------------------
387  //------------------------------------------------------------------------
388  void HandleResponse();
389 
390  //------------------------------------------------------------------------
392  //------------------------------------------------------------------------
394 
395  //------------------------------------------------------------------------
398  //------------------------------------------------------------------------
399  Status ParseResponse( AnyObject *&response );
400 
401  //------------------------------------------------------------------------
404  //------------------------------------------------------------------------
405  Status RewriteRequestRedirect( const URL &newUrl );
406 
407  //------------------------------------------------------------------------
409  //------------------------------------------------------------------------
411 
412  //------------------------------------------------------------------------
414  //------------------------------------------------------------------------
415  Status PostProcessReadV( VectorReadInfo *vReadInfo );
416 
417  //------------------------------------------------------------------------
419  //------------------------------------------------------------------------
421 
422  //------------------------------------------------------------------------
424  //------------------------------------------------------------------------
425  void UpdateTriedCGI(uint32_t errNo=0);
426 
427  //------------------------------------------------------------------------
429  //------------------------------------------------------------------------
430  void SwitchOnRefreshFlag();
431 
432  //------------------------------------------------------------------------
435  //------------------------------------------------------------------------
436  void HandleRspOrQueue();
437 
438  //------------------------------------------------------------------------
440  //------------------------------------------------------------------------
441  void HandleLocalRedirect( URL *url );
442 
443  //------------------------------------------------------------------------
448  //------------------------------------------------------------------------
449  bool IsRetryable( Message *request );
450 
451  //------------------------------------------------------------------------
458  //------------------------------------------------------------------------
459  bool OmitWait( Message *request, const URL &url );
460 
461  //------------------------------------------------------------------------
463  //------------------------------------------------------------------------
464  void DumpRedirectTraceBack();
465 
466  //------------------------------------------------------------------------
467  // Helper struct for async reading of chunks
468  //------------------------------------------------------------------------
469  struct ChunkStatus
470  {
471  ChunkStatus(): sizeError( false ), done( false ) {}
472  bool sizeError;
473  bool done;
474  };
475 
476  typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
477 
480  std::vector<Message *> pPartialResps;
488  time_t pExpiration;
494  std::string pRedirectUrl;
496  std::vector<ChunkStatus> pChunkStatus;
498 
499  uint32_t pAsyncOffset;
500  uint32_t pAsyncReadSize;
502  uint32_t pAsyncMsgSize;
503 
506 
514 
516 
518 
519  bool pStateful;
521 
522  std::unique_ptr<RedirectEntry> pRdirEntry;
524 
525  bool pMsgInFly;
526  };
527 }
528 
529 #endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__
bool pOtherRawStarted
Definition: XrdClXRootDMsgHandler.hh:515
std::vector< Message * > pPartialResps
Definition: XrdClXRootDMsgHandler.hh:480
Definition: XrdClAnyObject.hh:32
URL to
Definition: XrdClXRootDMsgHandler.hh:59
void UpdateTriedCGI(uint32_t errNo=0)
Update the "tried=" part of the CGI of the current message.
virtual uint16_t GetSid() const
const uint64_t ExDbgMsg
Definition: XrdClConstants.hh:41
std::vector< ChunkInfo > ChunkList
List of chunks.
Definition: XrdClXRootDResponses.hh:769
PostMaster * pPostMaster
Definition: XrdClXRootDMsgHandler.hh:483
Status RewriteRequestWait()
Some requests need to be rewritten also after getting kXR_wait - sigh.
Definition: XProtocol.hh:658
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
ChunkStatus()
Definition: XrdClXRootDMsgHandler.hh:471
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
Definition: XrdClXRootDMsgHandler.hh:332
bool pReadRawStarted
Definition: XrdClXRootDMsgHandler.hh:504
~XRootDMsgHandler()
Destructor.
Definition: XrdClXRootDMsgHandler.hh:155
void SetFollowMetalink(bool followMetalink)
Definition: XrdClXRootDMsgHandler.hh:337
RedirectTraceBack pRedirectTraceBack
Definition: XrdClXRootDMsgHandler.hh:523
URL from
Definition: XrdClXRootDMsgHandler.hh:58
LocalFileHandler * pLFileHandler
Definition: XrdClXRootDMsgHandler.hh:485
std::string ToString(bool prevok=true)
Definition: XrdClXRootDMsgHandler.hh:62
const std::string & GetDescription() const
Get the description of the message.
Definition: XrdClMessage.hh:74
std::vector< ChunkStatus > pChunkStatus
Definition: XrdClXRootDMsgHandler.hh:496
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClXRootDMsgHandler.hh:469
uint32_t pReadVRawMsgOffset
Definition: XrdClXRootDMsgHandler.hh:507
friend class HandleRspJob
Definition: XrdClXRootDMsgHandler.hh:83
Status RetryAtServer(const URL &url)
Retry the request at another server.
bool pReadVRawChunkHeaderDone
Definition: XrdClXRootDMsgHandler.hh:508
Status ParseResponse(AnyObject *&response)
StreamEvent
Events that may have occurred to the stream.
Definition: XrdClPostMasterInterfaces.hh:91
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:76
bool pFollowMetalink
Definition: XrdClXRootDMsgHandler.hh:517
bool done
Definition: XrdClXRootDMsgHandler.hh:473
Procedure execution status.
Definition: XrdClStatus.hh:109
Status RewriteRequestRedirect(const URL &newUrl)
URL pUrl
Definition: XrdClXRootDMsgHandler.hh:482
virtual void Process(Message *msg)
Message * pRequest
Definition: XrdClXRootDMsgHandler.hh:478
Status UnPackReadVResponse(Message *msg)
Unpack a single readv response.
time_t pExpiration
Definition: XrdClXRootDMsgHandler.hh:488
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
Definition: XrdClXRootDMsgHandler.hh:275
uint16_t pRedirectCounter
Definition: XrdClXRootDMsgHandler.hh:497
static Log * GetLog()
Get default log.
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, SIDManager *sidMgr, LocalFileHandler *lFileHandler)
Definition: XrdClXRootDMsgHandler.hh:96
bool pHasSessionId
Definition: XrdClXRootDMsgHandler.hh:493
ChunkList * pChunkList
Definition: XrdClXRootDMsgHandler.hh:495
Definition: XrdClXRootDResponses.hh:822
char * pAsyncReadBuffer
Definition: XrdClXRootDMsgHandler.hh:501
Status ReadRawReadV(Message *msg, int socket, uint32_t &bytesRead)
Handle a kXR_readv in raw mode.
void HandleError(Status status, Message *msg=0)
Recover error.
std::vector< HostInfo > HostList
Definition: XrdClXRootDResponses.hh:834
virtual uint16_t Examine(Message *msg)
SIDManager * pSidMgr
Definition: XrdClXRootDMsgHandler.hh:484
Status ReadRawOther(Message *msg, int socket, uint32_t &bytesRead)
Handle anything other than kXR_read and kXR_readv in raw mode.
virtual Status ReadMessageBody(Message *msg, int socket, uint32_t &bytesRead)
std::list< std::unique_ptr< RedirectEntry > > RedirectTraceBack
Definition: XrdClXRootDMsgHandler.hh:476
uint64_t GetSessionId() const
Get the session ID the message is meant for.
Definition: XrdClMessage.hh:90
Status WriteMessageBody(int socket, uint32_t &bytesRead)
bool pReadVRawMsgDiscard
Definition: XrdClXRootDMsgHandler.hh:513
Message * pResponse
Definition: XrdClXRootDMsgHandler.hh:479
bool IsValid() const
Is the url valid.
virtual uint8_t OnStreamEvent(StreamEvent event, uint16_t streamNum, Status status)
XRootDStatus status
Definition: XrdClXRootDMsgHandler.hh:60
Request status.
Definition: XrdClXRootDResponses.hh:212
Definition: XrdClAnyObject.hh:25
Handle XRootD stream IDs.
Definition: XrdClSIDManager.hh:33
Status PostProcessReadV(VectorReadInfo *vReadInfo)
Post process vector read.
Message handler.
Definition: XrdClPostMasterInterfaces.hh:68
int pAggregatedWaitTime
Definition: XrdClXRootDMsgHandler.hh:520
ChunkList * GetMessageBody(uint32_t *&asyncOffset)
Definition: XrdClXRootDMsgHandler.hh:259
void SwitchOnRefreshFlag()
Switch on the refresh flag for some requests.
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
Definition: XrdClXRootDMsgHandler.hh:320
void SetRedirectAsAnswer(bool redirectAsAnswer)
Definition: XrdClXRootDMsgHandler.hh:284
std::string pRedirectUrl
Definition: XrdClXRootDMsgHandler.hh:494
URL url
URL of the host.
Definition: XrdClXRootDResponses.hh:831
bool sizeError
Definition: XrdClXRootDMsgHandler.hh:472
bool pRedirectAsAnswer
Definition: XrdClXRootDMsgHandler.hh:489
bool pStateful
Definition: XrdClXRootDMsgHandler.hh:519
void HandleLocalRedirect(URL *url)
Handle a redirect to a local file.
Vector read info.
Definition: XrdClXRootDResponses.hh:774
Status ReadAsync(int socket, uint32_t &btesRead)
Handle an async response.
Definition: XrdClXRootDResponses.hh:839
void SetStateful(bool stateful)
Definition: XrdClXRootDMsgHandler.hh:342
Definition: XrdClLocalFileHandler.hh:32
uint32_t pAsyncMsgSize
Definition: XrdClXRootDMsgHandler.hh:502
const Message * GetRequest() const
Get the request pointer.
Definition: XrdClXRootDMsgHandler.hh:292
virtual bool IsRaw() const
Are we a raw writer or not?
uint32_t pAsyncReadSize
Definition: XrdClXRootDMsgHandler.hh:500
URL representation.
Definition: XrdClURL.hh:30
RedirectEntry(const URL &from, const URL &to)
Definition: XrdClXRootDMsgHandler.hh:53
uint32_t pAsyncOffset
Definition: XrdClXRootDMsgHandler.hh:499
bool OmitWait(Message *request, const URL &url)
Status ReadRawRead(Message *msg, int socket, uint32_t &bytesRead)
Handle a kXR_read in raw mode.
bool pReadVRawSizeError
Definition: XrdClXRootDMsgHandler.hh:510
std::unique_ptr< RedirectEntry > pRdirEntry
Definition: XrdClXRootDMsgHandler.hh:522
HostList * pHosts
Definition: XrdClXRootDMsgHandler.hh:490
int32_t pReadVRawChunkIndex
Definition: XrdClXRootDMsgHandler.hh:511
void WaitDone(time_t now)
XRootDStatus * ProcessStatus()
Extract the status information from the stuff that we got.
Status pLastError
Definition: XrdClXRootDMsgHandler.hh:487
uint32_t pReadRawCurrentOffset
Definition: XrdClXRootDMsgHandler.hh:505
Message status handler.
Definition: XrdClPostMasterInterfaces.hh:167
A hub for dispatching and receiving messages.
Definition: XrdClPostMaster.hh:42
bool pReadVRawChunkHeaderStarted
Definition: XrdClXRootDMsgHandler.hh:509
HostInfo pLoadBalancer
Definition: XrdClXRootDMsgHandler.hh:492
static PostMaster * GetPostMaster()
Get default post master.
Handle/Process/Forward XRootD messages.
Definition: XrdClXRootDMsgHandler.hh:80
virtual void OnStatusReady(const Message *message, Status status)
The requested action has been performed and the status is available.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
void HandleResponse()
Unpack the message and call the response handler.
bool pHasLoadBalancer
Definition: XrdClXRootDMsgHandler.hh:491
ResponseHandler * pResponseHandler
Definition: XrdClXRootDMsgHandler.hh:481
Definition: XrdClXRootDMsgHandler.hh:51
Status pStatus
Definition: XrdClXRootDMsgHandler.hh:486
bool pMsgInFly
Definition: XrdClXRootDMsgHandler.hh:525
void DumpRedirectTraceBack()
Dump the redirect-trace-back into the log file.
bool IsRetryable(Message *request)
Handle diagnostics.
Definition: XrdClLog.hh:101
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
Definition: XrdClXRootDMsgHandler.hh:300
readahead_list pReadVRawChunkHeader
Definition: XrdClXRootDMsgHandler.hh:512
void SetHostList(HostList *hostList)
Set host list.
Definition: XrdClXRootDMsgHandler.hh:311