XRootD
Loading...
Searching...
No Matches
XrdClXRootDMsgHandler.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26#define __XRD_CL_XROOTD_MSG_HANDLER_HH__
27
31#include "XrdCl/XrdClMessage.hh"
33#include "XrdCl/XrdClLog.hh"
35
41
46
48#include "XrdOuc/XrdOucUtils.hh"
49
50#include <sys/uio.h>
51#include <arpa/inet.h> // for network unmarshaling stuff
52
53#include <array>
54#include <list>
55#include <memory>
56#include <atomic>
57#include <memory>
58
59namespace XrdCl
60{
61 class PostMaster;
62 class SIDManager;
63 class URL;
64 class LocalFileHandler;
65 class Socket;
66
67 //----------------------------------------------------------------------------
68 // Single entry in the redirect-trace-back
69 //----------------------------------------------------------------------------
71 {
72 enum Type
73 {
78 };
79
80 RedirectEntry( const URL &from, const URL &to, Type type ) :
81 from( from ), to( to ), type( type )
82 {
83
84 }
85
90
91 std::string ToString( bool prevok = true )
92 {
93 const std::string tostr = to.GetLocation();
94 const std::string fromstr = from.GetLocation();
95
96 if( prevok )
97 {
98 switch( type )
99 {
100 case EntryRedirect: return "Redirected from: " + fromstr + " to: "
101 + tostr;
102
103 case EntryRedirectOnWait: return "Server responded with wait. "
104 "Falling back to virtual redirector: " + tostr;
105
106 case EntryRetry: return "Retrying: " + tostr;
107
108 case EntryWait: return "Waited at server request. Resending: "
109 + tostr;
110 }
111 }
112 return "Failed at: " + fromstr + ", retrying at: " + tostr;
113 }
114 };
115
116 //----------------------------------------------------------------------------
118 //----------------------------------------------------------------------------
120 {
121 friend class HandleRspJob;
122
123 public:
124 //------------------------------------------------------------------------
133 //------------------------------------------------------------------------
135 ResponseHandler *respHandler,
136 const URL *url,
137 std::shared_ptr<SIDManager> sidMgr,
138 LocalFileHandler *lFileHandler):
139 pRequest( msg ),
140 pResponseHandler( respHandler ),
141 pUrl( *url ),
142 pEffectiveDataServerUrl( 0 ),
143 pSidMgr( sidMgr ),
144 pLFileHandler( lFileHandler ),
145 pExpiration( 0 ),
146 pRedirectAsAnswer( false ),
147 pOksofarAsAnswer( false ),
148 pHasLoadBalancer( false ),
149 pHasSessionId( false ),
150 pChunkList( 0 ),
151 pKBuff( 0 ),
152 pRedirectCounter( 0 ),
153 pNotAuthorizedCounter( 0 ),
154
155 pAsyncOffset( 0 ),
156 pAsyncChunkIndex( 0 ),
157
158 pPgWrtCksumBuff( 4 ),
159 pPgWrtCurrentPageOffset( 0 ),
160 pPgWrtCurrentPageNb( 0 ),
161
162 pOtherRawStarted( false ),
163
164 pFollowMetalink( false ),
165
166 pStateful( false ),
167
168 pAggregatedWaitTime( 0 ),
169
170 pMsgInFly( false ),
171
172 pTimeoutFence( false ),
173
174 pDirListStarted( false ),
175 pDirListWithStat( false ),
176
177 pCV( 0 ),
178
179 pSslErrCnt( 0 )
180 {
181 pPostMaster = DefaultEnv::GetPostMaster();
182 if( msg->GetSessionId() )
183 pHasSessionId = true;
184
185 Log *log = DefaultEnv::GetLog();
186 log->Debug( ExDbgMsg, "[%s] MsgHandler created: 0x%x (message: %s ).",
187 pUrl.GetHostId().c_str(), this,
188 pRequest->GetObfuscatedDescription().c_str() );
189
190 ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
191 if( ntohs( hdr->requestid ) == kXR_pgread )
192 {
193 ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
194 pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
195 ntohl( pgrdreq->rlen ) ) );
196 }
197
198 if( ntohs( hdr->requestid ) == kXR_readv )
199 pBodyReader.reset( new AsyncVectorReader( *url, *pRequest ) );
200 else if( ntohs( hdr->requestid ) == kXR_read )
201 pBodyReader.reset( new AsyncRawReader( *url, *pRequest ) );
202 else
203 pBodyReader.reset( new AsyncDiscardReader( *url, *pRequest ) );
204 }
205
206 //------------------------------------------------------------------------
208 //------------------------------------------------------------------------
210 {
211 DumpRedirectTraceBack();
212
213 if( !pHasSessionId )
214 delete pRequest;
215 delete pEffectiveDataServerUrl;
216
217 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
218 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
219 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
220 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
221 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
222 pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
223
224 Log *log = DefaultEnv::GetLog();
225 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: 0x%x.",
226 pUrl.GetHostId().c_str(), this );
227 }
228
229 //------------------------------------------------------------------------
235 //------------------------------------------------------------------------
236 virtual uint16_t Examine( std::shared_ptr<Message> &msg );
237
238 //------------------------------------------------------------------------
247 //------------------------------------------------------------------------
248 virtual uint16_t InspectStatusRsp();
249
250 //------------------------------------------------------------------------
254 //------------------------------------------------------------------------
255 virtual uint16_t GetSid() const;
256
257 //------------------------------------------------------------------------
261 //------------------------------------------------------------------------
262 virtual void Process();
263
264 //------------------------------------------------------------------------
274 //------------------------------------------------------------------------
276 Socket *socket,
277 uint32_t &bytesRead );
278
279 //------------------------------------------------------------------------
284 //------------------------------------------------------------------------
285 virtual uint8_t OnStreamEvent( StreamEvent event,
286 XRootDStatus status );
287
288 //------------------------------------------------------------------------
290 //------------------------------------------------------------------------
291 virtual void OnStatusReady( const Message *message,
292 XRootDStatus status );
293
294 //------------------------------------------------------------------------
296 //------------------------------------------------------------------------
297 virtual bool IsRaw() const;
298
299 //------------------------------------------------------------------------
308 //------------------------------------------------------------------------
310 uint32_t &bytesWritten );
311
312 //------------------------------------------------------------------------
316 //------------------------------------------------------------------------
317 void WaitDone( time_t now );
318
319 //------------------------------------------------------------------------
321 //------------------------------------------------------------------------
322 void SetExpiration( time_t expiration )
323 {
324 pExpiration = expiration;
325 }
326
327 //------------------------------------------------------------------------
329 //------------------------------------------------------------------------
331 {
332 return pExpiration;
333 }
334
335 //------------------------------------------------------------------------
338 //------------------------------------------------------------------------
339 void SetRedirectAsAnswer( bool redirectAsAnswer )
340 {
341 pRedirectAsAnswer = redirectAsAnswer;
342 }
343
344 //------------------------------------------------------------------------
347 //------------------------------------------------------------------------
348 void SetOksofarAsAnswer( bool oksofarAsAnswer )
349 {
350 pOksofarAsAnswer = oksofarAsAnswer;
351 }
352
353 //------------------------------------------------------------------------
355 //------------------------------------------------------------------------
356 const Message *GetRequest() const
357 {
358 return pRequest;
359 }
360
361 //------------------------------------------------------------------------
363 //------------------------------------------------------------------------
364 void SetLoadBalancer( const HostInfo &loadBalancer )
365 {
366 if( !loadBalancer.url.IsValid() )
367 return;
368 pLoadBalancer = loadBalancer;
369 pHasLoadBalancer = true;
370 }
371
372 //------------------------------------------------------------------------
374 //------------------------------------------------------------------------
375 void SetHostList( HostList *hostList )
376 {
377 pHosts.reset( hostList );
378 }
379
380 //------------------------------------------------------------------------
382 //------------------------------------------------------------------------
383 void SetChunkList( ChunkList *chunkList )
384 {
385 pChunkList = chunkList;
386 if( pBodyReader )
387 pBodyReader->SetChunkList( chunkList );
388 if( chunkList )
389 pChunkStatus.resize( chunkList->size() );
390 else
391 pChunkStatus.clear();
392 }
393
394 void SetCrc32cDigests( std::vector<uint32_t> && crc32cDigests )
395 {
396 pCrc32cDigests = std::move( crc32cDigests );
397 }
398
399 //------------------------------------------------------------------------
401 //------------------------------------------------------------------------
403 {
404 pKBuff = kbuff;
405 }
406
407 //------------------------------------------------------------------------
409 //------------------------------------------------------------------------
410 void SetRedirectCounter( uint16_t redirectCounter )
411 {
412 pRedirectCounter = redirectCounter;
413 }
414
415 void SetFollowMetalink( bool followMetalink )
416 {
417 pFollowMetalink = followMetalink;
418 }
419
420 void SetStateful( bool stateful )
421 {
422 pStateful = stateful;
423 }
424
425 //------------------------------------------------------------------------
429 //------------------------------------------------------------------------
430 void PartialReceived();
431
432 private:
433
434 //------------------------------------------------------------------------
436 //------------------------------------------------------------------------
437 void HandleError( XRootDStatus status );
438
439 //------------------------------------------------------------------------
441 //------------------------------------------------------------------------
442 Status RetryAtServer( const URL &url, RedirectEntry::Type entryType );
443
444 //------------------------------------------------------------------------
446 //------------------------------------------------------------------------
447 void HandleResponse();
448
449 //------------------------------------------------------------------------
451 //------------------------------------------------------------------------
452 XRootDStatus *ProcessStatus();
453
454 //------------------------------------------------------------------------
457 //------------------------------------------------------------------------
458 Status ParseResponse( AnyObject *&response );
459
460 //------------------------------------------------------------------------
463 //------------------------------------------------------------------------
464 Status ParseXAttrResponse( char *data, size_t len, AnyObject *&response );
465
466 //------------------------------------------------------------------------
469 //------------------------------------------------------------------------
470 Status RewriteRequestRedirect( const URL &newUrl );
471
472 //------------------------------------------------------------------------
474 //------------------------------------------------------------------------
475 Status RewriteRequestWait();
476
477 //------------------------------------------------------------------------
479 //------------------------------------------------------------------------
480 void UpdateTriedCGI(uint32_t errNo=0);
481
482 //------------------------------------------------------------------------
484 //------------------------------------------------------------------------
485 void SwitchOnRefreshFlag();
486
487 //------------------------------------------------------------------------
490 //------------------------------------------------------------------------
491 void HandleRspOrQueue();
492
493 //------------------------------------------------------------------------
495 //------------------------------------------------------------------------
496 void HandleLocalRedirect( URL *url );
497
498 //------------------------------------------------------------------------
503 //------------------------------------------------------------------------
504 bool IsRetriable();
505
506 //------------------------------------------------------------------------
513 //------------------------------------------------------------------------
514 bool OmitWait( Message &request, const URL &url );
515
516 //------------------------------------------------------------------------
522 //------------------------------------------------------------------------
523 bool RetriableErrorResponse( const Status &status );
524
525 //------------------------------------------------------------------------
527 //------------------------------------------------------------------------
528 void DumpRedirectTraceBack();
529
536 //------------------------------------------------------------------------
537 template<typename T>
538 Status ReadFromBuffer( char *&buffer, size_t &buflen, T& result );
539
540 //------------------------------------------------------------------------
547 //------------------------------------------------------------------------
548 Status ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result );
549
550 //------------------------------------------------------------------------
558 //------------------------------------------------------------------------
559 Status ReadFromBuffer( char *&buffer, size_t &buflen, size_t size,
560 std::string &result );
561
562 //------------------------------------------------------------------------
563 // Helper struct for async reading of chunks
564 //------------------------------------------------------------------------
565 struct ChunkStatus
566 {
567 ChunkStatus(): sizeError( false ), done( false ) {}
568 bool sizeError;
569 bool done;
570 };
571
572 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
573
574 static const size_t CksumSize = sizeof( uint32_t );
575 static const size_t PageWithCksum = XrdSys::PageSize + CksumSize;
576 static const size_t MaxSslErrRetry = 3;
577
578 inline static size_t NbPgPerRsp( uint64_t offset, uint32_t dlen )
579 {
580 uint32_t pgcnt = 0;
581 uint32_t remainder = offset % XrdSys::PageSize;
582 if( remainder > 0 )
583 {
584 // account for the first unaligned page
585 ++pgcnt;
586 // the size of the 1st unaligned page
587 uint32_t _1stpg = XrdSys::PageSize - remainder;
588 if( _1stpg + CksumSize > dlen )
589 _1stpg = dlen - CksumSize;
590 dlen -= _1stpg + CksumSize;
591 }
592 pgcnt += dlen / PageWithCksum;
593 if( dlen % PageWithCksum )
594 ++ pgcnt;
595 return pgcnt;
596 }
597
598 Message *pRequest;
599 std::shared_ptr<Message> pResponse; //< the ownership is shared with MsgReader
600 std::vector<std::shared_ptr<Message>> pPartialResps; //< the ownership is shared with MsgReader
601 ResponseHandler *pResponseHandler;
602 URL pUrl;
603 URL *pEffectiveDataServerUrl;
604 PostMaster *pPostMaster;
605 std::shared_ptr<SIDManager> pSidMgr;
606 LocalFileHandler *pLFileHandler;
607 XRootDStatus pStatus;
608 Status pLastError;
609 time_t pExpiration;
610 bool pRedirectAsAnswer;
611 bool pOksofarAsAnswer;
612 std::unique_ptr<HostList> pHosts;
613 bool pHasLoadBalancer;
614 HostInfo pLoadBalancer;
615 bool pHasSessionId;
616 std::string pRedirectUrl;
617 ChunkList *pChunkList;
618 std::vector<uint32_t> pCrc32cDigests;
619 XrdSys::KernelBuffer *pKBuff;
620 std::vector<ChunkStatus> pChunkStatus;
621 uint16_t pRedirectCounter;
622 uint16_t pNotAuthorizedCounter;
623
624 uint32_t pAsyncOffset;
625 uint32_t pAsyncChunkIndex;
626
627 std::unique_ptr<AsyncPageReader> pPageReader;
628 std::unique_ptr<AsyncRawReaderIntfc> pBodyReader;
629
630 Buffer pPgWrtCksumBuff;
631 uint32_t pPgWrtCurrentPageOffset;
632 uint32_t pPgWrtCurrentPageNb;
633
634 bool pOtherRawStarted;
635
636 bool pFollowMetalink;
637
638 bool pStateful;
639 int pAggregatedWaitTime;
640
641 std::unique_ptr<RedirectEntry> pRdirEntry;
642 RedirectTraceBack pRedirectTraceBack;
643
644 bool pMsgInFly;
645
646 //------------------------------------------------------------------------
647 // true if MsgHandler is both in inQueue and installed in respective
648 // Stream (this could happen if server gave oksofar response), otherwise
649 // false
650 //------------------------------------------------------------------------
651 std::atomic<bool> pTimeoutFence;
652
653 //------------------------------------------------------------------------
654 // if we are serving chunked data to the user's handler in case of
655 // kXR_dirlist we need to memorize if the response contains stat info or
656 // not (the information is only encoded in the first chunk)
657 //------------------------------------------------------------------------
658 bool pDirListStarted;
659 bool pDirListWithStat;
660
661 //------------------------------------------------------------------------
662 // synchronization is needed in case the MsgHandler has been configured
663 // to serve kXR_oksofar as a response to the user's handler
664 //------------------------------------------------------------------------
665 XrdSysCondVar pCV;
666
667 //------------------------------------------------------------------------
668 // Count of consecutive `errTlsSslError` errors
669 //------------------------------------------------------------------------
670 size_t pSslErrCnt;
671 };
672}
673
674#endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_pgread
Definition XProtocol.hh:142
Object for discarding data.
Object for reading out data from the kXR_read response.
Object for reading out data from the VectorRead response.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
Handle diagnostics.
Definition XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
StreamEvent
Events that may have occurred to the stream.
A hub for dispatching and receiving messages.
Handle an async response.
A network socket.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition XrdClURL.cc:337
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:445
Handle/Process/Forward XRootD messages.
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
virtual uint16_t Examine(std::shared_ptr< Message > &msg)
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
void SetFollowMetalink(bool followMetalink)
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
const Message * GetRequest() const
Get the request pointer.
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status)
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
virtual void OnStatusReady(const Message *message, XRootDStatus status)
The requested action has been performed and the status is available.
virtual bool IsRaw() const
Are we a raw writer or not?
time_t GetExpiration()
Get a timestamp after which we give up.
void SetOksofarAsAnswer(bool oksofarAsAnswer)
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
virtual void Process()
Process the message if it was "taken" by the examine action.
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
void SetRedirectAsAnswer(bool redirectAsAnswer)
virtual uint16_t InspectStatusRsp()
virtual uint16_t GetSid() const
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
std::vector< HostInfo > HostList
const uint64_t ExDbgMsg
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
URL url
URL of the host.
RedirectEntry(const URL &from, const URL &to, Type type)
std::string ToString(bool prevok=true)
Procedure execution status.