xrootd
XrdClOperations.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_OPERATIONS_HH__
27 #define __XRD_CL_OPERATIONS_HH__
28 
29 #include <memory>
30 #include <stdexcept>
31 #include <sstream>
32 #include <tuple>
33 #include <future>
36 #include "XrdClArg.hh"
37 #include "XrdSys/XrdSysPthread.hh"
38 
39 namespace XrdCl
40 {
41 
42  template<bool HasHndl> class Operation;
43 
44  class Pipeline;
45 
46  //----------------------------------------------------------------------------
49  //----------------------------------------------------------------------------
51  {
52  template<bool> friend class Operation;
53 
54  public:
55 
56  //------------------------------------------------------------------------
62  //------------------------------------------------------------------------
63  PipelineHandler( ResponseHandler *handler, bool own );
64 
65  //------------------------------------------------------------------------
67  //------------------------------------------------------------------------
69 
70  //------------------------------------------------------------------------
72  //------------------------------------------------------------------------
73  void HandleResponseWithHosts( XRootDStatus *status, AnyObject *response,
74  HostList *hostList );
75 
76  //------------------------------------------------------------------------
78  //------------------------------------------------------------------------
79  void HandleResponse( XRootDStatus *status, AnyObject *response );
80 
81  //------------------------------------------------------------------------
83  //------------------------------------------------------------------------
85 
86  //------------------------------------------------------------------------
90  //------------------------------------------------------------------------
91  void AddOperation( Operation<true> *operation );
92 
93  //------------------------------------------------------------------------
100  //------------------------------------------------------------------------
101  void Assign( std::promise<XRootDStatus> prms,
102  std::function<void(const XRootDStatus&)> final );
103 
104  private:
105 
106  //------------------------------------------------------------------------
108  //------------------------------------------------------------------------
109  void HandleResponseImpl( XRootDStatus *status, AnyObject *response,
110  HostList *hostList = nullptr );
111 
112  inline void dealloc( XRootDStatus *status, AnyObject *response,
113  HostList *hostList )
114  {
115  delete status;
116  delete response;
117  delete hostList;
118  }
119 
120  //------------------------------------------------------------------------
122  //------------------------------------------------------------------------
124 
125  //------------------------------------------------------------------------
127  //------------------------------------------------------------------------
129 
130  //------------------------------------------------------------------------
132  //------------------------------------------------------------------------
133  std::unique_ptr<Operation<true>> nextOperation;
134 
135  //------------------------------------------------------------------------
137  //------------------------------------------------------------------------
138  std::promise<XRootDStatus> prms;
139 
140  //------------------------------------------------------------------------
143  //------------------------------------------------------------------------
144  std::function<void(const XRootDStatus&)> final;
145  };
146 
147  //----------------------------------------------------------------------------
153  //----------------------------------------------------------------------------
154  template<bool HasHndl>
155  class Operation
156  {
157  // Declare friendship between templates
158  template<bool>
159  friend class Operation;
160 
161  friend std::future<XRootDStatus> Async( Pipeline );
162 
163  friend class Pipeline;
164  friend class PipelineHandler;
165 
166  public:
167 
168  //------------------------------------------------------------------------
170  //------------------------------------------------------------------------
171  Operation() : valid( true )
172  {
173  }
174 
175  //------------------------------------------------------------------------
177  //------------------------------------------------------------------------
178  template<bool from>
180  handler( std::move( op.handler ) ), valid( true )
181  {
182  if( !op.valid ) throw std::invalid_argument( "Cannot construct "
183  "Operation from an invalid Operation!" );
184  op.valid = false;
185  }
186 
187  //------------------------------------------------------------------------
189  //------------------------------------------------------------------------
190  virtual ~Operation()
191  {
192  }
193 
194  //------------------------------------------------------------------------
196  //------------------------------------------------------------------------
197  virtual std::string ToString() = 0;
198 
199  //------------------------------------------------------------------------
203  //------------------------------------------------------------------------
204  virtual Operation<HasHndl>* Move() = 0;
205 
206  //------------------------------------------------------------------------
211  //------------------------------------------------------------------------
212  virtual Operation<true>* ToHandled() = 0;
213 
214  protected:
215 
216  //------------------------------------------------------------------------
226  //------------------------------------------------------------------------
227  void Run( std::promise<XRootDStatus> prms,
228  std::function<void(const XRootDStatus&)> final )
229  {
230  static_assert(HasHndl, "Only an operation that has a handler can be assigned to workflow");
231  handler->Assign( std::move( prms ), std::move( final ) );
232  XRootDStatus st = RunImpl();
233  if( st.IsOK() ) handler.release();
234  else
235  ForceHandler( st );
236  }
237 
238  //------------------------------------------------------------------------
245  //------------------------------------------------------------------------
246  virtual XRootDStatus RunImpl() = 0;
247 
248  //------------------------------------------------------------------------
255  //------------------------------------------------------------------------
256  void ForceHandler( const XRootDStatus &status )
257  {
258  handler->HandleResponse( new XRootDStatus( status ), nullptr );
259  // HandleResponse already freed the memory so we have to
260  // release the unique pointer
261  handler.release();
262  }
263 
264  //------------------------------------------------------------------------
268  //------------------------------------------------------------------------
270  {
271  if( handler )
272  {
273  handler->AddOperation( op );
274  }
275  }
276 
277  //------------------------------------------------------------------------
279  //------------------------------------------------------------------------
280  std::unique_ptr<PipelineHandler> handler;
281 
282  //------------------------------------------------------------------------
284  //------------------------------------------------------------------------
285  bool valid;
286  };
287 
288  //----------------------------------------------------------------------------
294  //----------------------------------------------------------------------------
295  class Pipeline
296  {
297  template<bool> friend class ParallelOperation;
298  friend std::future<XRootDStatus> Async( Pipeline );
299 
300  public:
301 
302  //------------------------------------------------------------------------
304  //------------------------------------------------------------------------
306  operation( op->Move() )
307  {
308 
309  }
310 
311  //------------------------------------------------------------------------
313  //------------------------------------------------------------------------
315  operation( op.Move() )
316  {
317 
318  }
319 
320  //------------------------------------------------------------------------
322  //------------------------------------------------------------------------
324  operation( op.Move() )
325  {
326 
327  }
328 
330  operation( op->ToHandled() )
331  {
332 
333  }
334 
335  //------------------------------------------------------------------------
337  //------------------------------------------------------------------------
339  operation( op.ToHandled() )
340  {
341 
342  }
343 
344  //------------------------------------------------------------------------
346  //------------------------------------------------------------------------
348  operation( op.ToHandled() )
349  {
350 
351  }
352 
353  Pipeline( Pipeline &&pipe ) :
354  operation( std::move( pipe.operation ) )
355  {
356 
357  }
358 
359  //------------------------------------------------------------------------
361  //------------------------------------------------------------------------
363  {
364  operation = std::move( pipe.operation );
365  return *this;
366  }
367 
368  //------------------------------------------------------------------------
372  //------------------------------------------------------------------------
373  operator Operation<true>&()
374  {
375  if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." );
376  return *operation.get();
377  }
378 
379  //------------------------------------------------------------------------
383  //------------------------------------------------------------------------
384  operator bool()
385  {
386  return bool( operation );
387  }
388 
389  private:
390 
391  //------------------------------------------------------------------------
396  //------------------------------------------------------------------------
398  {
399  return operation.get();
400  }
401 
402  //------------------------------------------------------------------------
408  //------------------------------------------------------------------------
409  void Run( std::function<void(const XRootDStatus&)> final = nullptr )
410  {
411  if( ftr.valid() )
412  throw std::logic_error( "Pipeline is already running" );
413 
414  // a promise that the pipe will have a result
415  std::promise<XRootDStatus> prms;
416  ftr = prms.get_future();
417  operation->Run( std::move( prms ), std::move( final ) );
418  }
419 
420  //------------------------------------------------------------------------
422  //------------------------------------------------------------------------
423  std::unique_ptr<Operation<true>> operation;
424 
425  //------------------------------------------------------------------------
427  //------------------------------------------------------------------------
428  std::future<XRootDStatus> ftr;
429 
430  };
431 
432  //----------------------------------------------------------------------------
438  //----------------------------------------------------------------------------
439  inline std::future<XRootDStatus> Async( Pipeline pipeline )
440  {
441  pipeline.Run();
442  return std::move( pipeline.ftr );
443  }
444 
445  //----------------------------------------------------------------------------
452  //----------------------------------------------------------------------------
453  inline XRootDStatus WaitFor( Pipeline pipeline )
454  {
455  return Async( std::move( pipeline ) ).get();
456  }
457 
458  //----------------------------------------------------------------------------
465  //----------------------------------------------------------------------------
466  template<template<bool> class Derived, bool HasHndl, typename HdlrFactory, typename ... Args>
467  class ConcreteOperation: public Operation<HasHndl>
468  {
469  template<template<bool> class, bool, typename, typename ...>
470  friend class ConcreteOperation;
471 
472  public:
473 
474  //------------------------------------------------------------------------
478  //------------------------------------------------------------------------
479  ConcreteOperation( Args&&... args ) : args( std::tuple<Args...>( std::move( args )... ) )
480  {
481  static_assert( !HasHndl, "It is only possible to construct operation without handler" );
482  }
483 
484  //------------------------------------------------------------------------
490  //------------------------------------------------------------------------
491  template<bool from>
493  Operation<HasHndl>( std::move( op ) ), args( std::move( op.args ) )
494  {
495 
496  }
497 
498  //------------------------------------------------------------------------
506  //------------------------------------------------------------------------
507  template<typename Hdlr>
508  Derived<true> operator>>( Hdlr &&hdlr )
509  {
510  // check if the resulting handler should be owned by us or by the user,
511  // if the user passed us directly a ResponseHandler it's owned by the
512  // user, otherwise we need to wrap the argument in a handler and in this
513  // case the resulting handler will be owned by us
514  constexpr bool own = !IsResponseHandler<Hdlr>::value;
515  return this->StreamImpl( HdlrFactory::Create( hdlr ), own );
516  }
517 
518  //------------------------------------------------------------------------
524  //------------------------------------------------------------------------
525  Derived<true> operator|( Operation<true> &op )
526  {
527  return PipeImpl( *this, op );
528  }
529 
530  //------------------------------------------------------------------------
536  //------------------------------------------------------------------------
537  Derived<true> operator|( Operation<true> &&op )
538  {
539  return PipeImpl( *this, op );
540  }
541 
542  //------------------------------------------------------------------------
548  //------------------------------------------------------------------------
549  Derived<true> operator|( Operation<false> &op )
550  {
551  return PipeImpl( *this, op );
552  }
553 
554  //------------------------------------------------------------------------
560  //------------------------------------------------------------------------
561  Derived<true> operator|( Operation<false> &&op )
562  {
563  return PipeImpl( *this, op );
564  }
565 
566  protected:
567 
568  //------------------------------------------------------------------------
572  //------------------------------------------------------------------------
574  {
575  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
576  return new Derived<HasHndl>( std::move( *me ) );
577  }
578 
579  //------------------------------------------------------------------------
583  //------------------------------------------------------------------------
585  {
586  this->handler.reset( new PipelineHandler() );
587  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
588  return new Derived<true>( std::move( *me ) );
589  }
590 
591  //------------------------------------------------------------------------
595  //------------------------------------------------------------------------
596  template<bool to>
597  Derived<to> Transform()
598  {
599  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
600  return Derived<to>( std::move( *me ) );
601  }
602 
603  //------------------------------------------------------------------------
609  //------------------------------------------------------------------------
610  inline Derived<true> StreamImpl( ResponseHandler *handler, bool own )
611  {
612  static_assert( !HasHndl, "Operator >> is available only for operation without handler" );
613  this->handler.reset( new PipelineHandler( handler, own ) );
614  return Transform<true>();
615  }
616 
617  //------------------------------------------------------------------------
624  //------------------------------------------------------------------------
625  inline static
626  Derived<true> PipeImpl( ConcreteOperation<Derived, true, HdlrFactory,
627  Args...> &me, Operation<true> &op )
628  {
629  me.AddOperation( op.Move() );
630  return me.template Transform<true>();
631  }
632 
633  //------------------------------------------------------------------------
640  //------------------------------------------------------------------------
641  inline static
642  Derived<true> PipeImpl( ConcreteOperation<Derived, true, HdlrFactory,
643  Args...> &me, Operation<false> &op )
644  {
645  me.AddOperation( op.ToHandled() );
646  return me.template Transform<true>();
647  }
648 
649  //------------------------------------------------------------------------
656  //------------------------------------------------------------------------
657  inline static
658  Derived<true> PipeImpl( ConcreteOperation<Derived, false, HdlrFactory,
659  Args...> &me, Operation<true> &op )
660  {
661  me.handler.reset( new PipelineHandler() );
662  me.AddOperation( op.Move() );
663  return me.template Transform<true>();
664  }
665 
666  //------------------------------------------------------------------------
673  //------------------------------------------------------------------------
674  inline static
675  Derived<true> PipeImpl( ConcreteOperation<Derived, false, HdlrFactory,
676  Args...> &me, Operation<false> &op )
677  {
678  me.handler.reset( new PipelineHandler() );
679  me.AddOperation( op.ToHandled() );
680  return me.template Transform<true>();
681  }
682 
683  //------------------------------------------------------------------------
685  //------------------------------------------------------------------------
686  std::tuple<Args...> args;
687  };
688 }
689 
690 #endif // __XRD_CL_OPERATIONS_HH__
std::unique_ptr< Operation< true > > operation
First operation in the pipeline.
Definition: XrdClOperations.hh:423
Definition: XrdClAnyObject.hh:32
Operation(Operation< from > &&op)
Move constructor between template instances.
Definition: XrdClOperations.hh:179
bool valid
Flag indicating if it is a valid object.
Definition: XrdClOperations.hh:285
std::future< XRootDStatus > ftr
The future result of the pipeline.
Definition: XrdClOperations.hh:428
ConcreteOperation(ConcreteOperation< Derived, from, HdlrFactory, Args... > &&op)
Definition: XrdClOperations.hh:492
Pipeline(Operation< false > &&op)
Constructor.
Definition: XrdClOperations.hh:347
Definition: XrdClOperationHandlers.hh:40
static Derived< true > PipeImpl(ConcreteOperation< Derived, false, HdlrFactory, Args... > &me, Operation< true > &op)
Definition: XrdClOperations.hh:658
Derived< true > operator|(Operation< true > &&op)
Definition: XrdClOperations.hh:537
Pipeline & operator=(Pipeline &&pipe)
Constructor.
Definition: XrdClOperations.hh:362
Derived< true > operator|(Operation< true > &op)
Definition: XrdClOperations.hh:525
Derived< true > operator>>(Hdlr &&hdlr)
Definition: XrdClOperations.hh:508
Derived< true > StreamImpl(ResponseHandler *handler, bool own)
Definition: XrdClOperations.hh:610
void HandleResponseImpl(XRootDStatus *status, AnyObject *response, HostList *hostList=nullptr)
Callback function implementation;.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:119
void dealloc(XRootDStatus *status, AnyObject *response, HostList *hostList)
Definition: XrdClOperations.hh:112
ConcreteOperation(Args &&... args)
Definition: XrdClOperations.hh:479
void AddOperation(Operation< true > *op)
Definition: XrdClOperations.hh:269
static Derived< true > PipeImpl(ConcreteOperation< Derived, true, HdlrFactory, Args... > &me, Operation< false > &op)
Definition: XrdClOperations.hh:642
ResponseHandler * responseHandler
The handler of our operation.
Definition: XrdClOperations.hh:123
bool ownHandler
true, if we own the handler
Definition: XrdClOperations.hh:128
~PipelineHandler()
Destructor.
Derived< true > operator|(Operation< false > &op)
Definition: XrdClOperations.hh:549
virtual ~Operation()
Destructor.
Definition: XrdClOperations.hh:190
void Assign(std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
Pipeline(Operation< true > &&op)
Constructor.
Definition: XrdClOperations.hh:323
std::vector< HostInfo > HostList
Definition: XrdClXRootDResponses.hh:834
void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Callback function.
XRootDStatus WaitFor(Pipeline pipeline)
Definition: XrdClOperations.hh:453
static Derived< true > PipeImpl(ConcreteOperation< Derived, true, HdlrFactory, Args... > &me, Operation< true > &op)
Definition: XrdClOperations.hh:626
friend class ConcreteOperation
Definition: XrdClOperations.hh:470
std::tuple< Args... > args
Operation arguments.
Definition: XrdClOperations.hh:686
Pipeline(Pipeline &&pipe)
Definition: XrdClOperations.hh:353
friend std::future< XRootDStatus > Async(Pipeline)
Definition: XrdClOperations.hh:439
PipelineHandler()
Default Constructor.
virtual std::string ToString()=0
Name of the operation.
std::unique_ptr< Operation< true > > nextOperation
Next operation in the pipeline.
Definition: XrdClOperations.hh:133
Pipeline(Operation< false > &op)
Constructor.
Definition: XrdClOperations.hh:338
Request status.
Definition: XrdClXRootDResponses.hh:212
Definition: XrdClAnyObject.hh:25
Definition: XrdClOperations.hh:42
Operation()
Constructor.
Definition: XrdClOperations.hh:171
static Derived< true > PipeImpl(ConcreteOperation< Derived, false, HdlrFactory, Args... > &me, Operation< false > &op)
Definition: XrdClOperations.hh:675
Definition: XrdClOperations.hh:50
Operation< HasHndl > * Move()
Definition: XrdClOperations.hh:573
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
virtual XRootDStatus RunImpl()=0
Pipeline(Operation< true > &op)
Constructor.
Definition: XrdClOperations.hh:314
std::promise< XRootDStatus > prms
The promise that there will be a result (traveling along the pipeline)
Definition: XrdClOperations.hh:138
Handle an async response.
Definition: XrdClXRootDResponses.hh:839
Operation< true > * operator->()
Definition: XrdClOperations.hh:397
Derived< to > Transform()
Definition: XrdClOperations.hh:597
void AddOperation(Operation< true > *operation)
Pipeline(Operation< false > *op)
Definition: XrdClOperations.hh:329
void ForceHandler(const XRootDStatus &status)
Definition: XrdClOperations.hh:256
virtual Operation< true > * ToHandled()=0
friend class PipelineHandler
Definition: XrdClOperations.hh:164
Operation< true > * ToHandled()
Definition: XrdClOperations.hh:584
void Run(std::function< void(const XRootDStatus &)> final=nullptr)
Definition: XrdClOperations.hh:409
void Run(std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
Definition: XrdClOperations.hh:227
Pipeline(Operation< true > *op)
Constructor.
Definition: XrdClOperations.hh:305
Derived< true > operator|(Operation< false > &&op)
Definition: XrdClOperations.hh:561
std::future< XRootDStatus > Async(Pipeline pipeline)
Definition: XrdClOperations.hh:439
Definition: XrdClOperations.hh:295
Definition: XrdClParallelOperation.hh:50
virtual Operation< HasHndl > * Move()=0
friend std::future< XRootDStatus > Async(Pipeline)
Definition: XrdClOperations.hh:439
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition: XrdClOperations.hh:280
Definition: XrdClOperations.hh:467