XRootD
Loading...
Searching...
No Matches
XrdCl::ClassicCopyJob Class Reference

#include <XrdClClassicCopyJob.hh>

+ Inheritance diagram for XrdCl::ClassicCopyJob:
+ Collaboration diagram for XrdCl::ClassicCopyJob:

Public Member Functions

 ClassicCopyJob (uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
 
const XRootDStatusGetResult () const
 
virtual XRootDStatus Run (CopyProgressHandler *progress=0)
 
- Public Member Functions inherited from XrdCl::CopyJob
 CopyJob (uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
 Constructor.
 
virtual ~CopyJob ()
 Virtual destructor.
 
PropertyListGetProperties ()
 Get the job properties.
 
PropertyListGetResults ()
 Get the job results.
 
const URLGetSource () const
 Get source.
 
const URLGetTarget () const
 Get target.
 
void Init ()
 

Additional Inherited Members

- Protected Attributes inherited from XrdCl::CopyJob
uint16_t pJobId
 
PropertyListpProperties
 
PropertyListpResults
 
URL pSource
 
URL pTarget
 

Detailed Description

Definition at line 27 of file XrdClClassicCopyJob.hh.

Constructor & Destructor Documentation

◆ ClassicCopyJob()

XrdCl::ClassicCopyJob::ClassicCopyJob ( uint16_t jobId,
PropertyList * jobProperties,
PropertyList * jobResults )

Definition at line 2416 of file XrdClClassicCopyJob.cc.

2418 :
2419 CopyJob( jobId, jobProperties, jobResults )
2420 {
2421 Log *log = DefaultEnv::GetLog();
2422 log->Debug( UtilityMsg, "Creating a classic copy job, from %s to %s",
2423 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
2424 }
const URL & GetSource() const
Get source.
CopyJob(uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
Constructor.
const URL & GetTarget() const
Get target.
static Log * GetLog()
Get default log.
const uint64_t UtilityMsg
XrdSysError Log
Definition XrdConfig.cc:112

References XrdCl::Log::Debug(), XrdCl::DefaultEnv::GetLog(), XrdCl::CopyJob::GetSource(), XrdCl::CopyJob::GetTarget(), and XrdCl::UtilityMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ GetResult()

const XRootDStatus & XrdCl::ClassicCopyJob::GetResult ( ) const
inline

Definition at line 48 of file XrdClClassicCopyJob.hh.

49 {
50 return result;
51 }

◆ Run()

XRootDStatus XrdCl::ClassicCopyJob::Run ( CopyProgressHandler * progress = 0)
virtual

Run the copy job

Parameters
progressthe handler to be notified about the copy progress
Returns
status of the copy operation

Implements XrdCl::CopyJob.

Definition at line 2429 of file XrdClClassicCopyJob.cc.

2430 {
2431 Log *log = DefaultEnv::GetLog();
2432
2433 std::string checkSumMode;
2434 std::string checkSumType;
2435 std::string checkSumPreset;
2436 std::string zipSource;
2437 uint16_t parallelChunks;
2438 uint32_t chunkSize;
2439 uint64_t blockSize;
2440 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2441 rmOnBadCksum, continue_, zipappend, doserver;
2442 int32_t nbXcpSources;
2443 long long xRate;
2444 long long xRateThreshold;
2445 uint16_t cpTimeout;
2446 std::vector<std::string> addcksums;
2447
2448 pProperties->Get( "checkSumMode", checkSumMode );
2449 pProperties->Get( "checkSumType", checkSumType );
2450 pProperties->Get( "checkSumPreset", checkSumPreset );
2451 pProperties->Get( "parallelChunks", parallelChunks );
2452 pProperties->Get( "chunkSize", chunkSize );
2453 pProperties->Get( "posc", posc );
2454 pProperties->Get( "force", force );
2455 pProperties->Get( "coerce", coerce );
2456 pProperties->Get( "makeDir", makeDir );
2457 pProperties->Get( "dynamicSource", dynamicSource );
2458 pProperties->Get( "zipArchive", zip );
2459 pProperties->Get( "xcp", xcp );
2460 pProperties->Get( "xcpBlockSize", blockSize );
2461 pProperties->Get( "preserveXAttr", preserveXAttr );
2462 pProperties->Get( "xrate", xRate );
2463 pProperties->Get( "xrateThreshold", xRateThreshold );
2464 pProperties->Get( "rmOnBadCksum", rmOnBadCksum );
2465 pProperties->Get( "continue", continue_ );
2466 pProperties->Get( "cpTimeout", cpTimeout );
2467 pProperties->Get( "zipAppend", zipappend );
2468 pProperties->Get( "addcksums", addcksums );
2469 pProperties->Get( "doServer", doserver );
2470
2471 if( zip )
2472 pProperties->Get( "zipSource", zipSource );
2473
2474 if( xcp )
2475 pProperties->Get( "nbXcpSources", nbXcpSources );
2476
2477 if( force && continue_ )
2478 return SetResult( stError, errInvalidArgs, EINVAL,
2479 "Invalid argument combination: continue + force." );
2480
2481 if( zipappend && ( continue_ || force ) )
2482 return SetResult( stError, errInvalidArgs, EINVAL,
2483 "Invalid argument combination: ( continue | force ) + zip-append." );
2484
2485 //--------------------------------------------------------------------------
2486 // Start the cp t/o timer if necessary
2487 //--------------------------------------------------------------------------
2488 std::unique_ptr<timer_sec_t> cptimer;
2489 if( cpTimeout ) cptimer.reset( new timer_sec_t() );
2490
2491 //--------------------------------------------------------------------------
2492 // Remove on bad checksum implies that POSC semantics has to be enabled
2493 //--------------------------------------------------------------------------
2494 if( rmOnBadCksum ) posc = true;
2495
2496 //--------------------------------------------------------------------------
2497 // Resolve the 'auto' checksum type.
2498 //--------------------------------------------------------------------------
2499 if( checkSumType == "auto" )
2500 {
2501 checkSumType = Utils::InferChecksumType( GetSource(), GetTarget(), zip );
2502 if( checkSumType.empty() )
2503 return SetResult( stError, errCheckSumError, ENOTSUP, "Could not infer checksum type." );
2504 else
2505 log->Info( UtilityMsg, "Using inferred checksum type: %s.", checkSumType.c_str() );
2506 }
2507
2508 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2509 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2510
2511 //--------------------------------------------------------------------------
2512 // Initialize the source and the destination
2513 //--------------------------------------------------------------------------
2514 std::unique_ptr<Source> src;
2515 if( xcp )
2516 src.reset( new XRootDSourceXCp( &GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2517 else if( zip ) // TODO make zip work for xcp
2518 src.reset( new XRootDSourceZip( zipSource, &GetSource(), chunkSize, parallelChunks,
2519 checkSumType, addcksums , doserver) );
2520 else if( GetSource().GetProtocol() == "stdio" )
2521 src.reset( new StdInSource( checkSumType, chunkSize, addcksums ) );
2522 else
2523 {
2524 if( dynamicSource )
2525 src.reset( new XRootDSourceDynamic( &GetSource(), chunkSize, checkSumType, addcksums ) );
2526 else
2527 src.reset( new XRootDSource( &GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2528 }
2529
2530 XRootDStatus st = src->Initialize();
2531 if( !st.IsOK() ) return SourceError( st );
2532 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2533
2534 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2535 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2536
2537 std::unique_ptr<Destination> dest;
2538 URL newDestUrl( GetTarget() );
2539
2540 if( GetTarget().GetProtocol() == "stdio" )
2541 dest.reset( new StdOutDestination( checkSumType ) );
2542 else if( zipappend )
2543 {
2544 std::string fn = GetSource().GetPath();
2545 size_t pos = fn.rfind( '/' );
2546 if( pos != std::string::npos )
2547 fn = fn.substr( pos + 1 );
2548 int64_t size = src->GetSize();
2549 dest.reset( new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *this ) );
2550 }
2551 //--------------------------------------------------------------------------
2552 // For xrootd destination build the oss.asize hint
2553 //--------------------------------------------------------------------------
2554 else
2555 {
2556 if( src->GetSize() >= 0 )
2557 {
2558 URL::ParamsMap params = newDestUrl.GetParams();
2559 std::ostringstream o; o << src->GetSize();
2560 params["oss.asize"] = o.str();
2561 newDestUrl.SetParams( params );
2562 // makeDir = true; // Backward compatibility for xroot destinations!!!
2563 }
2564 dest.reset( new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *this ) );
2565 }
2566
2567 dest->SetForce( force );
2568 dest->SetPOSC( posc );
2569 dest->SetCoerce( coerce );
2570 dest->SetMakeDir( makeDir );
2571 dest->SetContinue( continue_ );
2572 st = dest->Initialize();
2573 if( !st.IsOK() ) return DestinationError( st );
2574
2575 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2576 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2577
2578 //--------------------------------------------------------------------------
2579 // Copy the chunks
2580 //--------------------------------------------------------------------------
2581 if( continue_ )
2582 {
2583 size -= dest->GetSize();
2584 XrdCl::XRootDStatus st = src->StartAt( dest->GetSize() );
2585 if( !st.IsOK() ) return SetResult( st );
2586 }
2587
2588 PageInfo pageInfo;
2589 uint64_t total_processed = 0;
2590 uint64_t processed = 0;
2591 auto start = time_nsec();
2592 uint16_t threshold_interval = parallelChunks;
2593 bool threshold_draining = false;
2594 timer_nsec_t threshold_timer;
2595 while( 1 )
2596 {
2597 st = src->GetChunk( pageInfo );
2598 if( !st.IsOK() )
2599 return SourceError( st);
2600
2601 if( st.IsOK() && st.code == suDone )
2602 break;
2603
2604 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2605 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2606
2607 if( xRate )
2608 {
2609 auto elapsed = ( time_nsec() - start ).count();
2610 double transferred = total_processed + pageInfo.GetLength();
2611 double expected = double( xRate ) / to_nsec( 1 ) * elapsed;
2612 //----------------------------------------------------------------------
2613 // check if our transfer rate didn't exceeded the limit
2614 // (we are too fast)
2615 //----------------------------------------------------------------------
2616 if( elapsed && // make sure elapsed time is greater than 0
2617 transferred > expected )
2618 {
2619 auto nsec = ( transferred / xRate * to_nsec( 1 ) ) - elapsed;
2620 sleep_nsec( nsec );
2621 }
2622 }
2623
2624 if( xRateThreshold )
2625 {
2626 auto elapsed = threshold_timer.elapsed();
2627 double transferred = processed + pageInfo.GetLength();
2628 double expected = double( xRateThreshold ) / to_nsec( 1 ) * elapsed;
2629 //----------------------------------------------------------------------
2630 // check if our transfer rate dropped below the threshold
2631 // (we are too slow)
2632 //----------------------------------------------------------------------
2633 if( elapsed && // make sure elapsed time is greater than 0
2634 transferred < expected &&
2635 threshold_interval == 0 ) // we check every # parallelChunks
2636 {
2637 if( !threshold_draining )
2638 {
2639 log->Warning( UtilityMsg, "Transfer rate dropped below requested ehreshold,"
2640 " trying different source!" );
2641 XRootDStatus st = src->TryOtherServer();
2642 if( !st.IsOK() ) return SetResult( stError, errThresholdExceeded, 0,
2643 "The transfer rate dropped below "
2644 "requested threshold!" );
2645 threshold_draining = true; // before the next measurement we need to drain
2646 // all the chunks that will come from the old server
2647 }
2648 else // now that all the chunks from the old server have
2649 { // been received we can start another measurement
2650 processed = 0;
2651 threshold_timer.reset();
2652 threshold_interval = parallelChunks;
2653 threshold_draining = false;
2654 }
2655 }
2656
2657 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2658 }
2659
2660 total_processed += pageInfo.GetLength();
2661 processed += pageInfo.GetLength();
2662
2663 st = dest->PutChunk( std::move( pageInfo ) );
2664 if( !st.IsOK() )
2665 {
2666 if( st.code == errRetry )
2667 {
2668 pResults->Set( "LastURL", dest->GetLastURL() );
2669 pResults->Set( "WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2670 return SetResult( st );
2671 }
2672 return DestinationError( st );
2673 }
2674
2675 if( progress )
2676 {
2677 progress->JobProgress( pJobId, total_processed, size );
2678 if( progress->ShouldCancel( pJobId ) )
2679 return SetResult( stError, errOperationInterrupted, kXR_Cancelled, "The copy-job has been cancelled!" );
2680 }
2681 }
2682
2683 st = dest->Flush();
2684 if( !st.IsOK() )
2685 return DestinationError( st );
2686
2687 //--------------------------------------------------------------------------
2688 // Copy extended attributes
2689 //--------------------------------------------------------------------------
2690 if( preserveXAttr && Utils::HasXAttr( GetSource() ) && Utils::HasXAttr( GetTarget() ) )
2691 {
2692 std::vector<xattr_t> xattrs;
2693 st = src->GetXAttr( xattrs );
2694 if( !st.IsOK() ) return SourceError( st );
2695 st = dest->SetXAttr( xattrs );
2696 if( !st.IsOK() ) return DestinationError( st );
2697 }
2698
2699 //--------------------------------------------------------------------------
2700 // The size of the source is known and not enough data has been transferred
2701 // to the destination
2702 //--------------------------------------------------------------------------
2703 if( src->GetSize() >= 0 && size != total_processed )
2704 {
2705 log->Error( UtilityMsg, "The declared source size is %ld bytes, but "
2706 "received %ld bytes.", size, total_processed );
2707 return SetResult( stError, errDataError );
2708 }
2709 pResults->Set( "size", total_processed );
2710
2711 //--------------------------------------------------------------------------
2712 // Finalize the destination
2713 //--------------------------------------------------------------------------
2714 st = dest->Finalize();
2715 if( !st.IsOK() )
2716 return DestinationError( st );
2717
2718 //--------------------------------------------------------------------------
2719 // Verify the checksums if needed
2720 //--------------------------------------------------------------------------
2721 if( checkSumMode != "none" )
2722 {
2723 log->Debug( UtilityMsg, "Attempting checksum calculation, mode: %s.",
2724 checkSumMode.c_str() );
2725 std::string sourceCheckSum;
2726 std::string targetCheckSum;
2727
2728 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2729 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2730
2731 //------------------------------------------------------------------------
2732 // Get the check sum at source
2733 //------------------------------------------------------------------------
2734 timeval oStart, oEnd;
2735 XRootDStatus st;
2736
2737 if( checkSumMode == "end2end" || checkSumMode == "source" ||
2738 !checkSumPreset.empty() )
2739 {
2740 gettimeofday( &oStart, 0 );
2741 if( !checkSumPreset.empty() )
2742 {
2743 sourceCheckSum = checkSumType + ":";
2744 sourceCheckSum += Utils::NormalizeChecksum( checkSumType,
2745 checkSumPreset );
2746 }
2747 else
2748 {
2749 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2750 }
2751 gettimeofday( &oEnd, 0 );
2752
2753 if( !st.IsOK() )
2754 return SourceError( st );
2755
2756 pResults->Set( "sourceCheckSum", sourceCheckSum );
2757 }
2758
2759 if( !addcksums.empty() )
2760 pResults->Set( "additionalCkeckSum", src->GetAddCks() );
2761
2762 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2763 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2764
2765 //------------------------------------------------------------------------
2766 // Get the check sum at destination
2767 //------------------------------------------------------------------------
2768 timeval tStart, tEnd;
2769
2770 if( checkSumMode == "end2end" || checkSumMode == "target" )
2771 {
2772 gettimeofday( &tStart, 0 );
2773 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2774 if( !st.IsOK() )
2775 return DestinationError( st );
2776 gettimeofday( &tEnd, 0 );
2777 pResults->Set( "targetCheckSum", targetCheckSum );
2778 }
2779
2780 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2781 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2782
2783 //------------------------------------------------------------------------
2784 // Make sure the checksums are both lower case
2785 //------------------------------------------------------------------------
2786 auto sanitize_cksum = []( char c )
2787 {
2788 std::locale loc;
2789 if( std::isalpha( c ) ) return std::tolower( c, loc );
2790 return c;
2791 };
2792
2793 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2794 sourceCheckSum.begin(), sanitize_cksum );
2795
2796 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2797 targetCheckSum.begin(), sanitize_cksum );
2798
2799 //------------------------------------------------------------------------
2800 // Compare and inform monitoring
2801 //------------------------------------------------------------------------
2802 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2803 {
2804 bool match = false;
2805 if( sourceCheckSum == targetCheckSum )
2806 match = true;
2807
2808 Monitor *mon = DefaultEnv::GetMonitor();
2809 if( mon )
2810 {
2811 Monitor::CheckSumInfo i;
2812 i.transfer.origin = &GetSource();
2813 i.transfer.target = &GetTarget();
2814 i.cksum = sourceCheckSum;
2815 i.oTime = Utils::GetElapsedMicroSecs( oStart, oEnd );
2816 i.tTime = Utils::GetElapsedMicroSecs( tStart, tEnd );
2817 i.isOK = match;
2818 mon->Event( Monitor::EvCheckSum, &i );
2819 }
2820
2821 if( !match )
2822 {
2823 if( rmOnBadCksum )
2824 {
2825 FileSystem fs( newDestUrl );
2826 st = fs.Rm( newDestUrl.GetPath() );
2827 if( !st.IsOK() )
2828 log->Error( UtilityMsg, "Invalid checksum: failed to remove the target file: %s", st.ToString().c_str() );
2829 else
2830 log->Info( UtilityMsg, "Target file removed due to bad checksum!" );
2831 }
2832
2833 st = dest->Finalize();
2834 if( !st.IsOK() )
2835 log->Error( UtilityMsg, "Failed to finalize the destination: %s", st.ToString().c_str() );
2836
2837 return SetResult( stError, errCheckSumError, 0 );
2838 }
2839
2840 log->Info( UtilityMsg, "Checksum verification: succeeded." );
2841 }
2842 }
2843
2844 return SetResult();
2845 }
@ kXR_Cancelled
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
PropertyList * pResults
PropertyList * pProperties
static Monitor * GetMonitor()
Get the monitor object.
@ EvCheckSum
CheckSumInfo: File checksummed.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static bool HasXAttr(const XrdCl::URL &url)
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidArgs
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t suDone
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.

References XrdCl::Monitor::CheckSumInfo::cksum, XrdCl::Status::code, XrdCl::Log::Debug(), XrdCl::errCheckSumError, XrdCl::errDataError, XrdCl::errInvalidArgs, XrdCl::errOperationExpired, XrdCl::errOperationInterrupted, XrdCl::Log::Error(), XrdCl::errRetry, XrdCl::errThresholdExceeded, XrdCl::Monitor::EvCheckSum, XrdCl::Monitor::Event(), XrdCl::PropertyList::Get(), XrdCl::Utils::GetElapsedMicroSecs(), XrdCl::PageInfo::GetLength(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::URL::GetParams(), XrdCl::URL::GetPath(), XrdCl::CopyJob::GetSource(), XrdCl::CopyJob::GetTarget(), XrdCl::Utils::HasXAttr(), XrdCl::Utils::InferChecksumType(), XrdCl::Log::Info(), XrdCl::Monitor::CheckSumInfo::isOK, XrdCl::Status::IsOK(), XrdCl::CopyProgressHandler::JobProgress(), kXR_Cancelled, XrdCl::Utils::NormalizeChecksum(), XrdCl::Monitor::TransferInfo::origin, XrdCl::Monitor::CheckSumInfo::oTime, XrdCl::CopyJob::pJobId, XrdCl::CopyJob::pProperties, XrdCl::CopyJob::pResults, XrdCl::FileSystem::Rm(), XrdCl::PropertyList::Set(), XrdCl::URL::SetParams(), XrdCl::CopyProgressHandler::ShouldCancel(), sleep_nsec(), XrdCl::stError, XrdCl::suDone, XrdCl::Monitor::TransferInfo::target, time_nsec(), to_nsec(), XrdCl::Status::ToString(), XrdCl::Monitor::CheckSumInfo::transfer, XrdCl::Monitor::CheckSumInfo::tTime, XrdCl::UtilityMsg, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: