XRootD
Loading...
Searching...
No Matches
XrdClStream.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include "XrdCl/XrdClStream.hh"
26#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClChannel.hh"
29#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClMessage.hh"
32#include "XrdCl/XrdClUtils.hh"
34#include "XrdCl/XrdClMonitor.hh"
39
40#include <sys/types.h>
41#include <algorithm>
42#include <sys/socket.h>
43#include <sys/time.h>
44
45namespace XrdCl
46{
47 //----------------------------------------------------------------------------
48 // Statics
49 //----------------------------------------------------------------------------
50 RAtomic_uint64_t Stream::sSessCntGen{0};
51
52 //----------------------------------------------------------------------------
53 // Incoming message helper
54 //----------------------------------------------------------------------------
56 {
57 InMessageHelper( Message *message = 0,
58 MsgHandler *hndlr = 0,
59 time_t expir = 0,
60 uint16_t actio = 0 ):
61 msg( message ), handler( hndlr ), expires( expir ), action( actio ) {}
62 void Reset()
63 {
64 msg = 0; handler = 0; expires = 0; action = 0;
65 }
68 time_t expires;
69 uint16_t action;
70 };
71
72 //----------------------------------------------------------------------------
73 // Sub stream helper
74 //----------------------------------------------------------------------------
76 {
77 SubStreamData(): socket( 0 ), status( Socket::Disconnected )
78 {
79 outQueue = new OutQueue();
80 }
82 {
83 delete socket;
84 delete outQueue;
85 }
91 };
92
93 //----------------------------------------------------------------------------
94 // Constructor
95 //----------------------------------------------------------------------------
96 Stream::Stream( const URL *url, const URL &prefer ):
97 pUrl( url ),
98 pPrefer( prefer ),
99 pTransport( 0 ),
100 pPoller( 0 ),
101 pTaskManager( 0 ),
102 pJobManager( 0 ),
103 pIncomingQueue( 0 ),
104 pChannelData( 0 ),
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType( Utils::IPAll ),
109 pSessionId( 0 ),
110 pBytesSent( 0 ),
111 pBytesReceived( 0 )
112 {
113 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115
116 std::ostringstream o;
117 o << pUrl->GetHostId();
118 pStreamName = o.str();
119
120 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126
127 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129
130 pAddressType = Utils::String2AddressType( netStack );
131 if( pAddressType == Utils::AddressType::IPAuto )
132 {
134 if( !( stacks & XrdNetUtils::hasIP64 ) )
135 {
136 if( stacks & XrdNetUtils::hasIPv4 )
137 pAddressType = Utils::AddressType::IPv4;
138 else if( stacks & XrdNetUtils::hasIPv6 )
139 pAddressType = Utils::AddressType::IPv6;
140 }
141 }
142
143 Log *log = DefaultEnv::GetLog();
144 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146 "Window: %d", pStreamName.c_str(), netStack.c_str(),
147 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148 }
149
150 //----------------------------------------------------------------------------
151 // Destructor
152 //----------------------------------------------------------------------------
154 {
155 Disconnect( true );
156
157 Log *log = DefaultEnv::GetLog();
158 log->Debug( PostMasterMsg, "[%s] Destroying stream",
159 pStreamName.c_str() );
160
161 MonitorDisconnection( XRootDStatus() );
162
163 SubStreamList::iterator it;
164 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165 delete *it;
166 }
167
168 //----------------------------------------------------------------------------
169 // Initializer
170 //----------------------------------------------------------------------------
172 {
173 if( !pTransport || !pPoller || !pChannelData )
175
176 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177 pChannelData, 0, this );
178 pSubStreams.push_back( new SubStreamData() );
179 pSubStreams[0]->socket = s;
180 return XRootDStatus();
181 }
182
183 //------------------------------------------------------------------------
184 // Make sure that the underlying socket handler gets write readiness
185 // events
186 //------------------------------------------------------------------------
188 {
189 XrdSysMutexHelper scopedLock( pMutex );
190
191 //--------------------------------------------------------------------------
192 // We are in the process of connecting the main stream, so we do nothing
193 // because when the main stream connection is established it will connect
194 // all the other streams
195 //--------------------------------------------------------------------------
196 if( pSubStreams[0]->status == Socket::Connecting )
197 return XRootDStatus();
198
199 //--------------------------------------------------------------------------
200 // The main stream is connected, so we can verify whether we have
201 // the up and the down stream connected and ready to handle data.
202 // If anything is not right we fall back to stream 0.
203 //--------------------------------------------------------------------------
204 if( pSubStreams[0]->status == Socket::Connected )
205 {
206 if( pSubStreams[path.down]->status != Socket::Connected )
207 path.down = 0;
208
209 if( pSubStreams[path.up]->status == Socket::Disconnected )
210 {
211 path.up = 0;
212 return pSubStreams[0]->socket->EnableUplink();
213 }
214
215 if( pSubStreams[path.up]->status == Socket::Connected )
216 return pSubStreams[path.up]->socket->EnableUplink();
217
218 return XRootDStatus();
219 }
220
221 //--------------------------------------------------------------------------
222 // The main stream is not connected, we need to check whether enough time
223 // has passed since we last encountered an error (if any) so that we could
224 // re-attempt the connection
225 //--------------------------------------------------------------------------
226 Log *log = DefaultEnv::GetLog();
227 time_t now = ::time(0);
228
229 if( now-pLastStreamError < pStreamErrorWindow )
230 return pLastFatalError;
231
232 gettimeofday( &pConnectionStarted, 0 );
233 ++pConnectionCount;
234
235 //--------------------------------------------------------------------------
236 // Resolve all the addresses of the host we're supposed to connect to
237 //--------------------------------------------------------------------------
238 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239 if( !st.IsOK() )
240 {
241 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242 "the host", pStreamName.c_str() );
243 pLastStreamError = now;
244 st.status = stFatal;
245 pLastFatalError = st;
246 return st;
247 }
248
249 if( pPrefer.IsValid() )
250 {
251 std::vector<XrdNetAddr> addrresses;
252 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253 if( !st.IsOK() )
254 {
255 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257 }
258 else
259 {
260 std::vector<XrdNetAddr> tmp;
261 tmp.reserve( pAddresses.size() );
262 // first add all remaining addresses
263 auto itr = pAddresses.begin();
264 for( ; itr != pAddresses.end() ; ++itr )
265 {
266 if( !HasNetAddr( *itr, addrresses ) )
267 tmp.push_back( *itr );
268 }
269 // then copy all 'preferred' addresses
270 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271 // and keep the result
272 pAddresses.swap( tmp );
273 }
274 }
275
277 pAddresses );
278
279 while( !pAddresses.empty() )
280 {
281 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282 pAddresses.pop_back();
283 pConnectionInitTime = ::time( 0 );
284 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285 if( st.IsOK() )
286 {
287 pSubStreams[0]->status = Socket::Connecting;
288 break;
289 }
290 }
291 return st;
292 }
293
294 //----------------------------------------------------------------------------
295 // Queue the message for sending
296 //----------------------------------------------------------------------------
298 MsgHandler *handler,
299 bool stateful,
300 time_t expires )
301 {
302 XrdSysMutexHelper scopedLock( pMutex );
303 Log *log = DefaultEnv::GetLog();
304
305 //--------------------------------------------------------------------------
306 // Check the session ID and bounce if needed
307 //--------------------------------------------------------------------------
308 if( msg->GetSessionId() &&
309 (pSubStreams[0]->status != Socket::Connected ||
310 pSessionId != msg->GetSessionId()) )
312
313 //--------------------------------------------------------------------------
314 // Decide on the path to send the message
315 //--------------------------------------------------------------------------
316 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317 if( pSubStreams.size() <= path.up )
318 {
319 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320 "substream %d, using 0 instead", pStreamName.c_str(),
321 msg->GetObfuscatedDescription().c_str(), path.up );
322 path.up = 0;
323 }
324
325 log->Dump( PostMasterMsg, "[%s] Sending message %s (0x%x) through "
326 "substream %d expecting answer at %d", pStreamName.c_str(),
327 msg->GetObfuscatedDescription().c_str(), msg, path.up, path.down );
328
329 //--------------------------------------------------------------------------
330 // Enable *a* path and insert the message to the right queue
331 //--------------------------------------------------------------------------
332 XRootDStatus st = EnableLink( path );
333 if( st.IsOK() )
334 {
335 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337 expires, stateful );
338 }
339 else
340 st.status = stFatal;
341 return st;
342 }
343
344 //----------------------------------------------------------------------------
345 // Force connection
346 //----------------------------------------------------------------------------
348 {
349 XrdSysMutexHelper scopedLock( pMutex );
350 if( pSubStreams[0]->status == Socket::Connecting )
351 {
352 pSubStreams[0]->status = Socket::Disconnected;
353 XrdCl::PathID path( 0, 0 );
354 XrdCl::XRootDStatus st = EnableLink( path );
355 if( !st.IsOK() )
356 OnConnectError( 0, st );
357 }
358 }
359
360 //----------------------------------------------------------------------------
361 // Disconnect the stream
362 //----------------------------------------------------------------------------
363 void Stream::Disconnect( bool /*force*/ )
364 {
365 XrdSysMutexHelper scopedLock( pMutex );
366 SubStreamList::iterator it;
367 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368 {
369 (*it)->socket->Close();
370 (*it)->status = Socket::Disconnected;
371 }
372 }
373
374 //----------------------------------------------------------------------------
375 // Handle a clock event
376 //----------------------------------------------------------------------------
377 void Stream::Tick( time_t now )
378 {
379 //--------------------------------------------------------------------------
380 // Check for timed-out requests and incoming handlers
381 //--------------------------------------------------------------------------
382 pMutex.Lock();
383 OutQueue q;
384 SubStreamList::iterator it;
385 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386 q.GrabExpired( *(*it)->outQueue, now );
387 pMutex.UnLock();
388
390 pIncomingQueue->ReportTimeout( now );
391 }
392}
393
394//------------------------------------------------------------------------------
395// Handle message timeouts and reconnection in the future
396//------------------------------------------------------------------------------
397namespace
398{
399 class StreamConnectorTask: public XrdCl::Task
400 {
401 public:
402 //------------------------------------------------------------------------
403 // Constructor
404 //------------------------------------------------------------------------
405 StreamConnectorTask( const XrdCl::URL &url, const std::string &n ):
406 url( url )
407 {
408 std::string name = "StreamConnectorTask for ";
409 name += n;
410 SetName( name );
411 }
412
413 //------------------------------------------------------------------------
414 // Run the task
415 //------------------------------------------------------------------------
416 time_t Run( time_t )
417 {
419 return 0;
420 }
421
422 private:
423 XrdCl::URL url;
424 };
425}
426
427namespace XrdCl
428{
429 XRootDStatus Stream::RequestClose( Message &response )
430 {
431 ServerResponse *rsp = reinterpret_cast<ServerResponse*>( response.GetBuffer() );
432 if( rsp->hdr.dlen < 4 ) return XRootDStatus( stError );
433 Message *msg;
435 MessageUtils::CreateRequest( msg, req );
436 req->requestid = kXR_close;
437 memcpy( req->fhandle, reinterpret_cast<uint8_t*>( rsp->body.buffer.data ), 4 );
439 msg->SetSessionId( pSessionId );
440 NullResponseHandler *handler = new NullResponseHandler();
441 MessageSendParams params;
442 params.timeout = 0;
443 params.followRedirects = false;
444 params.stateful = true;
446 return MessageUtils::SendMessage( *pUrl, msg, handler, params, 0 );
447 }
448
449 //------------------------------------------------------------------------
450 // Check if message is a partial response
451 //------------------------------------------------------------------------
452 bool Stream::IsPartial( Message &msg )
453 {
454 ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
455 if( rsphdr->status == kXR_oksofar )
456 return true;
457
458 if( rsphdr->status == kXR_status )
459 {
460 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
462 return true;
463 }
464
465 return false;
466 }
467
468 //----------------------------------------------------------------------------
469 // Call back when a message has been reconstructed
470 //----------------------------------------------------------------------------
471 void Stream::OnIncoming( uint16_t subStream,
472 std::shared_ptr<Message> msg,
473 uint32_t bytesReceived )
474 {
475 msg->SetSessionId( pSessionId );
476 pBytesReceived += bytesReceived;
477
478 MsgHandler *handler = nullptr;
479 uint16_t action = 0;
480 {
481 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482 handler = mh.handler;
483 action = mh.action;
484 mh.Reset();
485 }
486
487 if( !IsPartial( *msg ) )
488 {
489 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490 *pChannelData );
491 if( streamAction & TransportHandler::DigestMsg )
492 return;
493
494 if( streamAction & TransportHandler::RequestClose )
495 {
496 RequestClose( *msg );
497 return;
498 }
499 }
500
501 Log *log = DefaultEnv::GetLog();
502
503 //--------------------------------------------------------------------------
504 // No handler, we discard the message ...
505 //--------------------------------------------------------------------------
506 if( !handler )
507 {
508 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509 log->Warning( PostMasterMsg, "[%s] Discarding received message: 0x%x "
510 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511 pStreamName.c_str(), msg.get(), rsp->hdr.status,
512 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513 return;
514 }
515
516 //--------------------------------------------------------------------------
517 // We have a handler, so we call the callback
518 //--------------------------------------------------------------------------
519 log->Dump( PostMasterMsg, "[%s] Handling received message: 0x%x.",
520 pStreamName.c_str(), msg.get() );
521
523 {
524 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: 0x%x.",
525 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526
527 // if we are handling partial response we have to take down the timeout fence
528 if( IsPartial( *msg ) )
529 {
530 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531 if( xrdHandler ) xrdHandler->PartialReceived();
532 }
533
534 return;
535 }
536
537 Job *job = new HandleIncMsgJob( handler );
538 pJobManager->QueueJob( job );
539 }
540
541 //----------------------------------------------------------------------------
542 // Call when one of the sockets is ready to accept a new message
543 //----------------------------------------------------------------------------
544 std::pair<Message *, MsgHandler *>
545 Stream::OnReadyToWrite( uint16_t subStream )
546 {
547 XrdSysMutexHelper scopedLock( pMutex );
548 Log *log = DefaultEnv::GetLog();
549 if( pSubStreams[subStream]->outQueue->IsEmpty() )
550 {
551 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552 pSubStreams[subStream]->socket->GetStreamName().c_str() );
553
554 pSubStreams[subStream]->socket->DisableUplink();
555 return std::make_pair( (Message *)0, (MsgHandler *)0 );
556 }
557
558 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560 h.expires,
561 h.stateful );
562 scopedLock.UnLock();
563 if( h.handler )
564 h.handler->OnReadyToSend( h.msg );
565 return std::make_pair( h.msg, h.handler );
566 }
567
568 void Stream::DisableIfEmpty( uint16_t subStream )
569 {
570 XrdSysMutexHelper scopedLock( pMutex );
571 Log *log = DefaultEnv::GetLog();
572
573 if( pSubStreams[subStream]->outQueue->IsEmpty() )
574 {
575 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
576 pSubStreams[subStream]->socket->GetStreamName().c_str() );
577 pSubStreams[subStream]->socket->DisableUplink();
578 }
579 }
580
581 //----------------------------------------------------------------------------
582 // Call when a message is written to the socket
583 //----------------------------------------------------------------------------
584 void Stream::OnMessageSent( uint16_t subStream,
585 Message *msg,
586 uint32_t bytesSent )
587 {
588 pTransport->MessageSent( msg, subStream, bytesSent,
589 *pChannelData );
590 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
591 pBytesSent += bytesSent;
592 if( h.handler )
593 {
595 bool rmMsg = false;
596 pIncomingQueue->AddMessageHandler( h.handler, h.handler->GetExpiration(), rmMsg );
597 if( rmMsg )
598 {
599 Log *log = DefaultEnv::GetLog();
600 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
601 pStreamName.c_str(), subStream );
602 }
603 }
604 pSubStreams[subStream]->outMsgHelper.Reset();
605 }
606
607 //----------------------------------------------------------------------------
608 // Call back when a message has been reconstructed
609 //----------------------------------------------------------------------------
610 void Stream::OnConnect( uint16_t subStream )
611 {
612 XrdSysMutexHelper scopedLock( pMutex );
613 pSubStreams[subStream]->status = Socket::Connected;
614
615 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
616 Log *log = DefaultEnv::GetLog();
617 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
618 subStream, ipstack.c_str() );
619
620 if( subStream == 0 )
621 {
622 pLastStreamError = 0;
623 pLastFatalError = XRootDStatus();
624 pConnectionCount = 0;
625 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
626 pSessionId = ++sSessCntGen;
627
628 //------------------------------------------------------------------------
629 // Create the streams if they don't exist yet
630 //------------------------------------------------------------------------
631 if( pSubStreams.size() == 1 && numSub > 1 )
632 {
633 for( uint16_t i = 1; i < numSub; ++i )
634 {
635 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
636 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
637 pChannelData, i, this );
638 pSubStreams.push_back( new SubStreamData() );
639 pSubStreams[i]->socket = s;
640 }
641 }
642
643 //------------------------------------------------------------------------
644 // Connect the extra streams, if we fail we move all the outgoing items
645 // to stream 0, we don't need to enable the uplink here, because it
646 // should be already enabled after the handshaking process is completed.
647 //------------------------------------------------------------------------
648 if( pSubStreams.size() > 1 )
649 {
650 log->Debug( PostMasterMsg, "[%s] Attempting to connect %d additional "
651 "streams.", pStreamName.c_str(), pSubStreams.size()-1 );
652 for( size_t i = 1; i < pSubStreams.size(); ++i )
653 {
654 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
655 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
656 if( !st.IsOK() )
657 {
658 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
659 pSubStreams[i]->socket->Close();
660 }
661 else
662 {
663 pSubStreams[i]->status = Socket::Connecting;
664 }
665 }
666 }
667
668 //------------------------------------------------------------------------
669 // Inform monitoring
670 //------------------------------------------------------------------------
671 pBytesSent = 0;
672 pBytesReceived = 0;
673 gettimeofday( &pConnectionDone, 0 );
675 if( mon )
676 {
678 i.server = pUrl->GetHostId();
679 i.sTOD = pConnectionStarted;
680 i.eTOD = pConnectionDone;
681 i.streams = pSubStreams.size();
682
683 AnyObject qryResult;
684 std::string *qryResponse = 0;
685 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
686 qryResult.Get( qryResponse );
687 i.auth = *qryResponse;
688 delete qryResponse;
689 mon->Event( Monitor::EvConnect, &i );
690 }
691
692 //------------------------------------------------------------------------
693 // For every connected control-stream call the global on-connect handler
694 //------------------------------------------------------------------------
696 }
697 else if( pOnDataConnJob )
698 {
699 //------------------------------------------------------------------------
700 // For every connected data-stream call the on-connect handler
701 //------------------------------------------------------------------------
702 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
703 }
704 }
705
706 //----------------------------------------------------------------------------
707 // On connect error
708 //----------------------------------------------------------------------------
709 void Stream::OnConnectError( uint16_t subStream, XRootDStatus status )
710 {
711 XrdSysMutexHelper scopedLock( pMutex );
712 Log *log = DefaultEnv::GetLog();
713 pSubStreams[subStream]->socket->Close();
714 time_t now = ::time(0);
715
716 //--------------------------------------------------------------------------
717 // For every connection error call the global connection error handler
718 //--------------------------------------------------------------------------
720
721 //--------------------------------------------------------------------------
722 // If we connected subStream == 0 and cannot connect >0 then we just give
723 // up and move the outgoing messages to another queue
724 //--------------------------------------------------------------------------
725 if( subStream > 0 )
726 {
727 pSubStreams[subStream]->status = Socket::Disconnected;
728 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
729 if( pSubStreams[0]->status == Socket::Connected )
730 {
731 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
732 if( !st.IsOK() )
733 OnFatalError( 0, st, scopedLock );
734 return;
735 }
736
737 if( pSubStreams[0]->status == Socket::Connecting )
738 return;
739
740 OnFatalError( subStream, status, scopedLock );
741 return;
742 }
743
744 //--------------------------------------------------------------------------
745 // Check if we still have time to try and do something in the current window
746 //--------------------------------------------------------------------------
747 time_t elapsed = now-pConnectionInitTime;
748 log->Error( PostMasterMsg, "[%s] elapsed = %d, pConnectionWindow = %d "
749 "seconds.", pStreamName.c_str(), elapsed, pConnectionWindow );
750
751 //------------------------------------------------------------------------
752 // If we have some IP addresses left we try them
753 //------------------------------------------------------------------------
754 if( !pAddresses.empty() )
755 {
756 XRootDStatus st;
757 do
758 {
759 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
760 pAddresses.pop_back();
761 pConnectionInitTime = ::time( 0 );
762 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
763 }
764 while( !pAddresses.empty() && !st.IsOK() );
765
766 if( !st.IsOK() )
767 OnFatalError( subStream, st, scopedLock );
768
769 return;
770 }
771 //------------------------------------------------------------------------
772 // If we still can retry with the same host name, we sleep until the end
773 // of the connection window and try
774 //------------------------------------------------------------------------
775 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
776 && !status.IsFatal() )
777 {
778 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %d "
779 "seconds.", pStreamName.c_str(), pConnectionWindow-elapsed );
780
781 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
782 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
783 return;
784 }
785 //--------------------------------------------------------------------------
786 // We are out of the connection window, the only thing we can do here
787 // is re-resolving the host name and retrying if we still can
788 //--------------------------------------------------------------------------
789 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
790 {
791 pAddresses.clear();
792 pSubStreams[0]->status = Socket::Disconnected;
793 PathID path( 0, 0 );
794 XRootDStatus st = EnableLink( path );
795 if( !st.IsOK() )
796 OnFatalError( subStream, st, scopedLock );
797 return;
798 }
799
800 //--------------------------------------------------------------------------
801 // Else, we fail
802 //--------------------------------------------------------------------------
803 OnFatalError( subStream, status, scopedLock );
804 }
805
806 //----------------------------------------------------------------------------
807 // Call back when an error has occurred
808 //----------------------------------------------------------------------------
809 void Stream::OnError( uint16_t subStream, XRootDStatus status )
810 {
811 XrdSysMutexHelper scopedLock( pMutex );
812 Log *log = DefaultEnv::GetLog();
813 pSubStreams[subStream]->socket->Close();
814 pSubStreams[subStream]->status = Socket::Disconnected;
815
816 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
817 pStreamName.c_str(), subStream, status.ToString().c_str() );
818
819 //--------------------------------------------------------------------------
820 // Reinsert the stuff that we have failed to sent
821 //--------------------------------------------------------------------------
822 if( pSubStreams[subStream]->outMsgHelper.msg )
823 {
824 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
825 pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
826 h.stateful );
827 pSubStreams[subStream]->outMsgHelper.Reset();
828 }
829
830 //--------------------------------------------------------------------------
831 // Reinsert the receiving handler and reset any partially read partial
832 //--------------------------------------------------------------------------
833 if( pSubStreams[subStream]->inMsgHelper.handler )
834 {
835 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
836 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
837 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
838 if( xrdHandler ) xrdHandler->PartialReceived();
839 h.Reset();
840 }
841
842 //--------------------------------------------------------------------------
843 // We are dealing with an error of a peripheral stream. If we don't have
844 // anything to send don't bother recovering. Otherwise move the requests
845 // to stream 0 if possible.
846 //--------------------------------------------------------------------------
847 if( subStream > 0 )
848 {
849 if( pSubStreams[subStream]->outQueue->IsEmpty() )
850 return;
851
852 if( pSubStreams[0]->status != Socket::Disconnected )
853 {
854 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
855 if( pSubStreams[0]->status == Socket::Connected )
856 {
857 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
858 if( !st.IsOK() )
859 OnFatalError( 0, st, scopedLock );
860 return;
861 }
862 }
863 OnFatalError( subStream, status, scopedLock );
864 return;
865 }
866
867 //--------------------------------------------------------------------------
868 // If we lost the stream 0 we have lost the session, we re-enable the
869 // stream if we still have things in one of the outgoing queues, otherwise
870 // there is not point to recover at this point.
871 //--------------------------------------------------------------------------
872 if( subStream == 0 )
873 {
874 MonitorDisconnection( status );
875
876 SubStreamList::iterator it;
877 size_t outstanding = 0;
878 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
879 outstanding += (*it)->outQueue->GetSizeStateless();
880
881 if( outstanding )
882 {
883 PathID path( 0, 0 );
884 XRootDStatus st = EnableLink( path );
885 if( !st.IsOK() )
886 {
887 OnFatalError( 0, st, scopedLock );
888 return;
889 }
890 }
891
892 //------------------------------------------------------------------------
893 // We're done here, unlock the stream mutex to avoid deadlocks and
894 // report the disconnection event to the handlers
895 //------------------------------------------------------------------------
896 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
897 "message handlers.", pStreamName.c_str() );
898 OutQueue q;
899 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
900 q.GrabStateful( *(*it)->outQueue );
901 scopedLock.UnLock();
902
903 q.Report( status );
904 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
905 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
906 return;
907 }
908 }
909
910 //------------------------------------------------------------------------
911 // Force error
912 //------------------------------------------------------------------------
913 void Stream::ForceError( XRootDStatus status, bool hush )
914 {
915 XrdSysMutexHelper scopedLock( pMutex );
916 Log *log = DefaultEnv::GetLog();
917 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
918 {
919 if( pSubStreams[substream]->status != Socket::Connected ) continue;
920 pSubStreams[substream]->socket->Close();
921 pSubStreams[substream]->status = Socket::Disconnected;
922
923 if( !hush )
924 log->Error( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
925 pStreamName.c_str(), status.ToString().c_str() );
926
927 //--------------------------------------------------------------------
928 // Reinsert the stuff that we have failed to sent
929 //--------------------------------------------------------------------
930 if( pSubStreams[substream]->outMsgHelper.msg )
931 {
932 OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
933 pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
934 h.stateful );
935 pSubStreams[substream]->outMsgHelper.Reset();
936 }
937
938 //--------------------------------------------------------------------
939 // Reinsert the receiving handler and reset any partially read partial
940 //--------------------------------------------------------------------
941 if( pSubStreams[substream]->inMsgHelper.handler )
942 {
943 InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
944 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
945 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
946 if( xrdHandler ) xrdHandler->PartialReceived();
947 h.Reset();
948 }
949 }
950
951 pConnectionCount = 0;
952
953 //------------------------------------------------------------------------
954 // We're done here, unlock the stream mutex to avoid deadlocks and
955 // report the disconnection event to the handlers
956 //------------------------------------------------------------------------
957 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
958 "message handlers.", pStreamName.c_str() );
959
960 SubStreamList::iterator it;
961 OutQueue q;
962 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
963 q.GrabItems( *(*it)->outQueue );
964 scopedLock.UnLock();
965
966 q.Report( status );
967
968 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
969 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
970 }
971
972 //----------------------------------------------------------------------------
973 // On fatal error
974 //----------------------------------------------------------------------------
975 void Stream::OnFatalError( uint16_t subStream,
976 XRootDStatus status,
977 XrdSysMutexHelper &lock )
978 {
979 Log *log = DefaultEnv::GetLog();
980 pSubStreams[subStream]->status = Socket::Disconnected;
981 log->Error( PostMasterMsg, "[%s] Unable to recover: %s.",
982 pStreamName.c_str(), status.ToString().c_str() );
983
984 //--------------------------------------------------------------------------
985 // Don't set the stream error windows for authentication errors as the user
986 // may refresh his credential at any time
987 //--------------------------------------------------------------------------
988 if( status.code != errAuthFailed )
989 {
990 pConnectionCount = 0;
991 pLastStreamError = ::time(0);
992 pLastFatalError = status;
993 }
994
995 SubStreamList::iterator it;
996 OutQueue q;
997 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
998 q.GrabItems( *(*it)->outQueue );
999 lock.UnLock();
1000
1001 status.status = stFatal;
1002 q.Report( status );
1003 pIncomingQueue->ReportStreamEvent( MsgHandler::FatalError, status );
1004 pChannelEvHandlers.ReportEvent( ChannelEventHandler::FatalError, status );
1005
1006 }
1007
1008 //----------------------------------------------------------------------------
1009 // Inform monitoring about disconnection
1010 //----------------------------------------------------------------------------
1011 void Stream::MonitorDisconnection( XRootDStatus status )
1012 {
1013 Monitor *mon = DefaultEnv::GetMonitor();
1014 if( mon )
1015 {
1016 Monitor::DisconnectInfo i;
1017 i.server = pUrl->GetHostId();
1018 i.rBytes = pBytesReceived;
1019 i.sBytes = pBytesSent;
1020 i.cTime = ::time(0) - pConnectionDone.tv_sec;
1021 i.status = status;
1022 mon->Event( Monitor::EvDisconnect, &i );
1023 }
1024 }
1025
1026 //----------------------------------------------------------------------------
1027 // Call back when a message has been reconstructed
1028 //----------------------------------------------------------------------------
1029 bool Stream::OnReadTimeout( uint16_t substream )
1030 {
1031 //--------------------------------------------------------------------------
1032 // We only take the main stream into account
1033 //--------------------------------------------------------------------------
1034 if( substream != 0 )
1035 return true;
1036
1037 //--------------------------------------------------------------------------
1038 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1039 // It is assumed that the underlying transport makes sure that there is no
1040 // pending requests that are not answered, ie. all possible virtual streams
1041 // are de-allocated
1042 //--------------------------------------------------------------------------
1043 Log *log = DefaultEnv::GetLog();
1044 SubStreamList::iterator it;
1045 time_t now = time(0);
1046
1047 XrdSysMutexHelper scopedLock( pMutex );
1048 uint32_t outgoingMessages = 0;
1049 time_t lastActivity = 0;
1050 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1051 {
1052 outgoingMessages += (*it)->outQueue->GetSize();
1053 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1054 if( lastActivity < sockLastActivity )
1055 lastActivity = sockLastActivity;
1056 }
1057
1058 if( !outgoingMessages )
1059 {
1060 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1061 *pChannelData );
1062 if( disconnect )
1063 {
1064 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1065 pStreamName.c_str() );
1066 scopedLock.UnLock();
1067 //----------------------------------------------------------------------
1068 // Important note!
1069 //
1070 // This destroys the Stream object itself, the underlined
1071 // AsyncSocketHandler object (that called this method) and the Channel
1072 // object that aggregates this Stream.
1073 //----------------------------------------------------------------------
1075 return false;
1076 }
1077 }
1078
1079 //--------------------------------------------------------------------------
1080 // Check if the stream is broken
1081 //--------------------------------------------------------------------------
1082 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1083 *pChannelData );
1084 if( !st.IsOK() )
1085 {
1086 scopedLock.UnLock();
1087 OnError( substream, st );
1088 return false;
1089 }
1090 return true;
1091 }
1092
1093 //----------------------------------------------------------------------------
1094 // Call back when a message has been reconstru
1095 //----------------------------------------------------------------------------
1096 bool Stream::OnWriteTimeout( uint16_t /*substream*/ )
1097 {
1098 return true;
1099 }
1100
1101 //----------------------------------------------------------------------------
1102 // Register channel event handler
1103 //----------------------------------------------------------------------------
1105 {
1106 pChannelEvHandlers.AddHandler( handler );
1107 }
1108
1109 //----------------------------------------------------------------------------
1110 // Remove a channel event handler
1111 //----------------------------------------------------------------------------
1113 {
1114 pChannelEvHandlers.RemoveHandler( handler );
1115 }
1116
1117 //----------------------------------------------------------------------------
1118 // Install a incoming message handler
1119 //----------------------------------------------------------------------------
1120 MsgHandler*
1121 Stream::InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream )
1122 {
1123 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1124 if( !mh.handler )
1125 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1126 mh.expires,
1127 mh.action );
1128
1129 if( !mh.handler )
1130 return nullptr;
1131
1132 if( mh.action & MsgHandler::Raw )
1133 return mh.handler;
1134 return nullptr;
1135 }
1136
1137 //----------------------------------------------------------------------------
1141 //----------------------------------------------------------------------------
1142 uint16_t Stream::InspectStatusRsp( uint16_t stream,
1143 MsgHandler *&incHandler )
1144 {
1145 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1146 if( !mh.handler )
1148
1149 uint16_t action = mh.handler->InspectStatusRsp();
1150 mh.action |= action;
1151
1152 if( action & MsgHandler::RemoveHandler )
1153 pIncomingQueue->RemoveMessageHandler( mh.handler );
1154
1155 if( action & MsgHandler::Raw )
1156 {
1157 incHandler = mh.handler;
1158 return MsgHandler::Raw;
1159 }
1160
1161 if( action & MsgHandler::Corrupted )
1162 return MsgHandler::Corrupted;
1163
1164 if( action & MsgHandler::More )
1165 return MsgHandler::More;
1166
1167 return MsgHandler::None;
1168 }
1169
1170 //----------------------------------------------------------------------------
1171 // Check if channel can be collapsed using given URL
1172 //----------------------------------------------------------------------------
1173 bool Stream::CanCollapse( const URL &url )
1174 {
1175 Log *log = DefaultEnv::GetLog();
1176
1177 //--------------------------------------------------------------------------
1178 // Resolve all the addresses of the host we're supposed to connect to
1179 //--------------------------------------------------------------------------
1180 std::vector<XrdNetAddr> prefaddrs;
1181 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1182 if( !st.IsOK() )
1183 {
1184 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1185 , pStreamName.c_str(), url.GetHostName().c_str() );
1186 return false;
1187 }
1188
1189 //--------------------------------------------------------------------------
1190 // Resolve all the addresses of the alias
1191 //--------------------------------------------------------------------------
1192 std::vector<XrdNetAddr> aliasaddrs;
1193 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1194 if( !st.IsOK() )
1195 {
1196 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1197 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1198 return false;
1199 }
1200
1201 //--------------------------------------------------------------------------
1202 // Now check if the preferred host is part of the alias
1203 //--------------------------------------------------------------------------
1204 auto itr = prefaddrs.begin();
1205 for( ; itr != prefaddrs.end() ; ++itr )
1206 {
1207 auto itr2 = aliasaddrs.begin();
1208 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1209 if( itr->Same( &*itr2 ) ) return true;
1210 }
1211
1212 return false;
1213 }
1214
1215 //------------------------------------------------------------------------
1216 // Query the stream
1217 //------------------------------------------------------------------------
1218 Status Stream::Query( uint16_t query, AnyObject &result )
1219 {
1220 switch( query )
1221 {
1223 {
1224 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1225 return Status();
1226 }
1227
1229 {
1230 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1231 return Status();
1232 }
1233
1235 {
1236 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1237 return Status();
1238 }
1239
1240 default:
1242 }
1243 }
1244
1245}
kXR_char streamid[2]
Definition XProtocol.hh:914
kXR_unt16 requestid
Definition XProtocol.hh:228
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_status
Definition XProtocol.hh:907
struct ServerResponseBody_Status bdy
kXR_char fhandle[4]
Definition XProtocol.hh:229
@ kXR_close
Definition XProtocol.hh:115
union ServerResponse::@12 body
ServerResponseHeader hdr
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ FatalError
Stream has been broken and won't be recovered.
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
void AddMessageHandler(MsgHandler *handler, time_t expires, bool &rmMsg)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
virtual time_t GetExpiration()=0
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
Status ForceDisconnect(const URL &url)
Shut down a channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
A network socket.
SocketStatus
Status of the socket.
@ Disconnected
The socket is disconnected.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void Tick(time_t now)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
~Stream()
Destructor.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:170
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:445
Random utilities.
Definition XrdClUtils.hh:50
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint16_t errQueryNotSupported
const uint16_t errUninitialized
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errInvalidSession
const uint16_t errAuthFailed
@ kXR_PartialResult
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Describe a server login event.
std::string server
"user@host:port"
uint16_t streams
Number of streams.
timeval sTOD
gettimeofday() when login started
timeval eTOD
gettimeofday() when login ended
std::string auth
authentication protocol used or empty if none
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
bool IsFatal() const
Fatal error.
std::string ToString() const
Create a string representation.
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
AsyncSocketHandler * socket
OutQueue::MsgHelper outMsgHelper
Socket::SocketStatus status
static const uint16_t Auth
Transport name, returns std::string *.