XRootD
Loading...
Searching...
No Matches
XrdClFileStateHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClURL.hh"
27#include "XrdCl/XrdClLog.hh"
28#include "XrdCl/XrdClStatus.hh"
35#include "XrdCl/XrdClMonitor.hh"
41#include "XrdCl/XrdClUtils.hh"
42
43#ifdef WITH_XRDEC
45#endif
46
47#include "XrdOuc/XrdOucCRC.hh"
49
53
54#include <sstream>
55#include <memory>
56#include <numeric>
57#include <sys/time.h>
58#include <uuid/uuid.h>
59#include <mutex>
60
61namespace
62{
63 //----------------------------------------------------------------------------
64 // Helper callback for handling PgRead responses
65 //----------------------------------------------------------------------------
66 class PgReadHandler : public XrdCl::ResponseHandler
67 {
68 friend class PgReadRetryHandler;
69
70 public:
71
72 //------------------------------------------------------------------------
73 // Constructor
74 //------------------------------------------------------------------------
75 PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
76 XrdCl::ResponseHandler *userHandler,
77 uint64_t orgOffset ) :
78 stateHandler( stateHandler ),
79 userHandler( userHandler ),
80 orgOffset( orgOffset ),
81 maincall( true ),
82 retrycnt( 0 ),
83 nbrepair( 0 )
84 {
85 }
86
87 //------------------------------------------------------------------------
88 // Handle the response
89 //------------------------------------------------------------------------
91 XrdCl::AnyObject *response,
92 XrdCl::HostList *hostList )
93 {
94 using namespace XrdCl;
95
96 std::unique_lock<std::mutex> lck( mtx );
97
98 if( !maincall )
99 {
100 //--------------------------------------------------------------------
101 // We are serving PgRead retry request
102 //--------------------------------------------------------------------
103 --retrycnt;
104 if( !status->IsOK() )
105 st.reset( status );
106 else
107 {
108 delete status; // by convention other args are null (see PgReadRetryHandler)
109 ++nbrepair; // update number of repaired pages
110 }
111
112 if( retrycnt == 0 )
113 {
114 //------------------------------------------------------------------
115 // All retries came back
116 //------------------------------------------------------------------
117 if( st->IsOK() )
118 {
119 PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
120 pginf.SetNbRepair( nbrepair );
121 userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
122 }
123 else
124 userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
125 lck.unlock();
126 delete this;
127 }
128
129 return;
130 }
131
132 //----------------------------------------------------------------------
133 // We are serving main PgRead request
134 //----------------------------------------------------------------------
135 if( !status->IsOK() )
136 {
137 //--------------------------------------------------------------------
138 // The main PgRead request has failed
139 //--------------------------------------------------------------------
140 userHandler->HandleResponseWithHosts( status, response, hostList );
141 lck.unlock();
142 delete this;
143 return;
144 }
145
146 maincall = false;
147
148 //----------------------------------------------------------------------
149 // Do the integrity check
150 //----------------------------------------------------------------------
151 PageInfo *pginf = 0;
152 response->Get( pginf );
153
154 uint64_t pgoff = pginf->GetOffset();
155 uint32_t bytesRead = pginf->GetLength();
156 std::vector<uint32_t> &cksums = pginf->GetCksums();
157 char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
158 size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
159 uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
160 if( pgsize > bytesRead ) pgsize = bytesRead;
161
162 for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
163 {
164 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
165 if( crcval != cksums[pgnb] )
166 {
167 Log *log = DefaultEnv::GetLog();
168 log->Info( FileMsg, "[0x%x@%s] Received corrupted page, will retry page #%d.",
169 this, stateHandler->pFileUrl->GetURL().c_str(), pgnb );
170
171 XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
172 if( !st.IsOK())
173 {
174 *status = st; // the reason for this failure
175 break;
176 }
177 ++retrycnt; // update the retry counter
178 }
179
180 bytesRead -= pgsize;
181 buffer += pgsize;
182 pgoff += pgsize;
183 pgsize = XrdSys::PageSize;
184 if( pgsize > bytesRead ) pgsize = bytesRead;
185 }
186
187
188 if( retrycnt == 0 )
189 {
190 //--------------------------------------------------------------------
191 // All went well!
192 //--------------------------------------------------------------------
193 userHandler->HandleResponseWithHosts( status, response, hostList );
194 lck.unlock();
195 delete this;
196 return;
197 }
198
199 //----------------------------------------------------------------------
200 // We have to wait for retries!
201 //----------------------------------------------------------------------
202 resp.reset( response );
203 hosts.reset( hostList );
204 st.reset( status );
205 }
206
207 void UpdateCksum( size_t pgnb, uint32_t crcval )
208 {
209 if( resp )
210 {
211 XrdCl::PageInfo *pginf = 0;
212 resp->Get( pginf );
213 pginf->GetCksums()[pgnb] = crcval;
214 }
215 }
216
217 private:
218
219 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
220 XrdCl::ResponseHandler *userHandler;
221 uint64_t orgOffset;
222
223 std::unique_ptr<XrdCl::AnyObject> resp;
224 std::unique_ptr<XrdCl::HostList> hosts;
225 std::unique_ptr<XrdCl::XRootDStatus> st;
226
227 std::mutex mtx;
228 bool maincall;
229 size_t retrycnt;
230 size_t nbrepair;
231
232 };
233
234 //----------------------------------------------------------------------------
235 // Helper callback for handling PgRead retries
236 //----------------------------------------------------------------------------
237 class PgReadRetryHandler : public XrdCl::ResponseHandler
238 {
239 public:
240
241 PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
242 pgnb( pgnb )
243 {
244
245 }
246
247 //------------------------------------------------------------------------
248 // Handle the response
249 //------------------------------------------------------------------------
251 XrdCl::AnyObject *response,
252 XrdCl::HostList *hostList )
253 {
254 using namespace XrdCl;
255
256 if( !status->IsOK() )
257 {
258 Log *log = DefaultEnv::GetLog();
259 log->Info( FileMsg, "[0x%x@%s] Failed to recover page #%d.",
260 this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
261 pgReadHandler->HandleResponseWithHosts( status, response, hostList );
262 delete this;
263 return;
264 }
265
266 XrdCl::PageInfo *pginf = 0;
267 response->Get( pginf );
268 if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
269 {
270 Log *log = DefaultEnv::GetLog();
271 log->Info( FileMsg, "[0x%x@%s] Failed to recover page #%d.",
272 this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
273 // we retry a page at a time so the length cannot exceed 4KB
274 DeleteArgs( status, response, hostList );
275 pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
276 delete this;
277 return;
278 }
279
280 uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
281 if( crcval != pginf->GetCksums().front() )
282 {
283 Log *log = DefaultEnv::GetLog();
284 log->Info( FileMsg, "[0x%x@%s] Failed to recover page #%d.",
285 this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
286 DeleteArgs( status, response, hostList );
287 pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
288 delete this;
289 return;
290 }
291
292 Log *log = DefaultEnv::GetLog();
293 log->Info( FileMsg, "[0x%x@%s] Successfully recovered page #%d.",
294 this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
295
296 DeleteArgs( 0, response, hostList );
297 pgReadHandler->UpdateCksum( pgnb, crcval );
298 pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
299 delete this;
300 }
301
302 private:
303
304 inline void DeleteArgs( XrdCl::XRootDStatus *status,
305 XrdCl::AnyObject *response,
306 XrdCl::HostList *hostList )
307 {
308 delete status;
309 delete response;
310 delete hostList;
311 }
312
313 PgReadHandler *pgReadHandler;
314 size_t pgnb;
315 };
316
317 //----------------------------------------------------------------------------
318 // Handle PgRead substitution with ordinary Read
319 //----------------------------------------------------------------------------
321 {
322 public:
323
324 //------------------------------------------------------------------------
325 // Constructor
326 //------------------------------------------------------------------------
327 PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
328 XrdCl::ResponseHandler *userHandler ) :
329 stateHandler( stateHandler ),
330 userHandler( userHandler )
331 {
332 }
333
334 //------------------------------------------------------------------------
335 // Handle the response
336 //------------------------------------------------------------------------
338 XrdCl::AnyObject *rdresp,
339 XrdCl::HostList *hostList )
340 {
341 if( !status->IsOK() )
342 {
343 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
344 delete this;
345 return;
346 }
347
348 using namespace XrdCl;
349
350 ChunkInfo *chunk = 0;
351 rdresp->Get( chunk );
352
353 std::vector<uint32_t> cksums;
354 if( stateHandler->pIsChannelEncrypted )
355 {
356 size_t nbpages = chunk->length / XrdSys::PageSize;
357 if( chunk->length % XrdSys::PageSize )
358 ++nbpages;
359 cksums.reserve( nbpages );
360
361 size_t size = chunk->length;
362 char *buffer = reinterpret_cast<char*>( chunk->buffer );
363
364 for( size_t pg = 0; pg < nbpages; ++pg )
365 {
366 size_t pgsize = XrdSys::PageSize;
367 if( pgsize > size ) pgsize = size;
368 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
369 cksums.push_back( crcval );
370 buffer += pgsize;
371 size -= pgsize;
372 }
373 }
374
375 PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
376 chunk->buffer, std::move( cksums ) );
377 delete rdresp;
378 AnyObject *response = new AnyObject();
379 response->Set( pages );
380 userHandler->HandleResponseWithHosts( status, response, hostList );
381
382 delete this;
383 }
384
385 private:
386
387 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
388 XrdCl::ResponseHandler *userHandler;
389 };
390
391 //----------------------------------------------------------------------------
392 // Object that does things to the FileStateHandler when kXR_open returns
393 // and then calls the user handler
394 //----------------------------------------------------------------------------
395 class OpenHandler: public XrdCl::ResponseHandler
396 {
397 public:
398 //------------------------------------------------------------------------
399 // Constructor
400 //------------------------------------------------------------------------
401 OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
402 XrdCl::ResponseHandler *userHandler ):
403 pStateHandler( stateHandler ),
404 pUserHandler( userHandler )
405 {
406 }
407
408 //------------------------------------------------------------------------
409 // Handle the response
410 //------------------------------------------------------------------------
411 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
412 XrdCl::AnyObject *response,
413 XrdCl::HostList *hostList )
414 {
415 using namespace XrdCl;
416
417 //----------------------------------------------------------------------
418 // Extract the statistics info
419 //----------------------------------------------------------------------
420 OpenInfo *openInfo = 0;
421 if( status->IsOK() )
422 response->Get( openInfo );
423#ifdef WITH_XRDEC
424 else
425 //--------------------------------------------------------------------
426 // Handle EC redirect
427 //--------------------------------------------------------------------
428 if( status->code == errRedirect )
429 {
430 std::string ecurl = status->GetErrorMessage();
431 EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
432 if( ecHandler )
433 {
434 pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
435 ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
436 return;
437 }
438 }
439#endif
440 //----------------------------------------------------------------------
441 // Notify the state handler and the client and say bye bye
442 //----------------------------------------------------------------------
443 pStateHandler->OnOpen( status, openInfo, hostList );
444 delete response;
445 if( pUserHandler )
446 pUserHandler->HandleResponseWithHosts( status, 0, hostList );
447 else
448 {
449 delete status;
450 delete hostList;
451 }
452 delete this;
453 }
454
455 private:
456 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
457 XrdCl::ResponseHandler *pUserHandler;
458 };
459
460 //----------------------------------------------------------------------------
461 // Object that does things to the FileStateHandler when kXR_close returns
462 // and then calls the user handler
463 //----------------------------------------------------------------------------
464 class CloseHandler: public XrdCl::ResponseHandler
465 {
466 public:
467 //------------------------------------------------------------------------
468 // Constructor
469 //------------------------------------------------------------------------
470 CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
471 XrdCl::ResponseHandler *userHandler,
472 XrdCl::Message *message ):
473 pStateHandler( stateHandler ),
474 pUserHandler( userHandler ),
475 pMessage( message )
476 {
477 }
478
479 //------------------------------------------------------------------------
481 //------------------------------------------------------------------------
482 virtual ~CloseHandler()
483 {
484 delete pMessage;
485 }
486
487 //------------------------------------------------------------------------
488 // Handle the response
489 //------------------------------------------------------------------------
490 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
491 XrdCl::AnyObject *response,
492 XrdCl::HostList *hostList )
493 {
494 pStateHandler->OnClose( status );
495 if( pUserHandler )
496 pUserHandler->HandleResponseWithHosts( status, response, hostList );
497 else
498 {
499 delete response;
500 delete status;
501 delete hostList;
502 }
503
504 delete this;
505 }
506
507 private:
508 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
509 XrdCl::ResponseHandler *pUserHandler;
510 XrdCl::Message *pMessage;
511 };
512
513 //----------------------------------------------------------------------------
514 // Stateful message handler
515 //----------------------------------------------------------------------------
516 class StatefulHandler: public XrdCl::ResponseHandler
517 {
518 public:
519 //------------------------------------------------------------------------
520 // Constructor
521 //------------------------------------------------------------------------
522 StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
523 XrdCl::ResponseHandler *userHandler,
524 XrdCl::Message *message,
525 const XrdCl::MessageSendParams &sendParams ):
526 pStateHandler( stateHandler ),
527 pUserHandler( userHandler ),
528 pMessage( message ),
529 pSendParams( sendParams )
530 {
531 }
532
533 //------------------------------------------------------------------------
534 // Destructor
535 //------------------------------------------------------------------------
536 virtual ~StatefulHandler()
537 {
538 delete pMessage;
539 delete pSendParams.chunkList;
540 delete pSendParams.kbuff;
541 }
542
543 //------------------------------------------------------------------------
544 // Handle the response
545 //------------------------------------------------------------------------
546 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
547 XrdCl::AnyObject *response,
548 XrdCl::HostList *hostList )
549 {
550 using namespace XrdCl;
551 std::unique_ptr<AnyObject> responsePtr( response );
552 pSendParams.hostList = hostList;
553
554 //----------------------------------------------------------------------
555 // Houston we have a problem...
556 //----------------------------------------------------------------------
557 if( !status->IsOK() )
558 {
559 XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
560 return;
561 }
562
563 //----------------------------------------------------------------------
564 // We're clear
565 //----------------------------------------------------------------------
566 responsePtr.release();
567 XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
568 if( pUserHandler )
569 pUserHandler->HandleResponseWithHosts( status, response, hostList );
570 else
571 {
572 delete status,
573 delete response;
574 delete hostList;
575 }
576 delete this;
577 }
578
579 //------------------------------------------------------------------------
581 //------------------------------------------------------------------------
582 XrdCl::ResponseHandler *GetUserHandler()
583 {
584 return pUserHandler;
585 }
586
587 private:
588 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
589 XrdCl::ResponseHandler *pUserHandler;
590 XrdCl::Message *pMessage;
591 XrdCl::MessageSendParams pSendParams;
592 };
593
594 //----------------------------------------------------------------------------
595 // Release-buffer Handler
596 //----------------------------------------------------------------------------
597 class ReleaseBufferHandler: public XrdCl::ResponseHandler
598 {
599 public:
600
601 //------------------------------------------------------------------------
602 // Constructor
603 //------------------------------------------------------------------------
604 ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
605 buffer( std::move( buffer ) ),
606 handler( handler )
607 {
608 }
609
610 //------------------------------------------------------------------------
611 // Handle the response
612 //------------------------------------------------------------------------
613 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
614 XrdCl::AnyObject *response,
615 XrdCl::HostList *hostList )
616 {
617 if (handler)
618 handler->HandleResponseWithHosts( status, response, hostList );
619 }
620
621 //------------------------------------------------------------------------
622 // Get the underlying buffer
623 //------------------------------------------------------------------------
624 XrdCl::Buffer& GetBuffer()
625 {
626 return buffer;
627 }
628
629 private:
630 XrdCl::Buffer buffer;
631 XrdCl::ResponseHandler *handler;
632 };
633}
634
635namespace XrdCl
636{
637 //----------------------------------------------------------------------------
638 // Constructor
639 //----------------------------------------------------------------------------
641 pFileState( Closed ),
642 pStatInfo( 0 ),
643 pFileUrl( 0 ),
644 pDataServer( 0 ),
645 pLoadBalancer( 0 ),
646 pStateRedirect( 0 ),
647 pWrtRecoveryRedir( 0 ),
648 pFileHandle( 0 ),
649 pOpenMode( 0 ),
650 pOpenFlags( 0 ),
651 pSessionId( 0 ),
652 pDoRecoverRead( true ),
653 pDoRecoverWrite( true ),
654 pFollowRedirects( true ),
655 pUseVirtRedirector( true ),
656 pIsChannelEncrypted( false ),
657 pAllowBundledClose( false ),
658 pPlugin( plugin )
659 {
660 pFileHandle = new uint8_t[4];
661 ResetMonitoringVars();
664 pLFileHandler = new LocalFileHandler();
665 }
666
667 //------------------------------------------------------------------------
672 //------------------------------------------------------------------------
673 FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
674 pFileState( Closed ),
675 pStatInfo( 0 ),
676 pFileUrl( 0 ),
677 pDataServer( 0 ),
678 pLoadBalancer( 0 ),
679 pStateRedirect( 0 ),
680 pWrtRecoveryRedir( 0 ),
681 pFileHandle( 0 ),
682 pOpenMode( 0 ),
683 pOpenFlags( 0 ),
684 pSessionId( 0 ),
685 pDoRecoverRead( true ),
686 pDoRecoverWrite( true ),
687 pFollowRedirects( true ),
688 pUseVirtRedirector( useVirtRedirector ),
689 pAllowBundledClose( false ),
690 pPlugin( plugin )
691 {
692 pFileHandle = new uint8_t[4];
693 ResetMonitoringVars();
696 pLFileHandler = new LocalFileHandler();
697 }
698
699 //----------------------------------------------------------------------------
700 // Destructor
701 //----------------------------------------------------------------------------
703 {
704 //--------------------------------------------------------------------------
705 // This, in principle, should never ever happen. Except for the case
706 // when we're interfaced with ROOT that may call this desctructor from
707 // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
708 // has been finalized by the linker. So, if we don't have the log object
709 // at this point we just give up the hope.
710 //--------------------------------------------------------------------------
711 if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
712 DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
713
716
719
720 if( pFileState != Closed && DefaultEnv::GetLog() )
721 {
722 XRootDStatus st;
723 MonitorClose( &st );
724 ResetMonitoringVars();
725 }
726
727 // check if the logger is still there, this is only for root, as root might
728 // have unload us already so in this case we don't want to do anything
729 if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
730 {
732 registry.Release( *pFileUrl );
733 }
734
735 delete pStatInfo;
736 delete pFileUrl;
737 delete pDataServer;
738 delete pLoadBalancer;
739 delete [] pFileHandle;
740 delete pLFileHandler;
741 }
742
743 //----------------------------------------------------------------------------
744 // Open the file pointed to by the given URL
745 //----------------------------------------------------------------------------
746 XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
747 const std::string &url,
748 uint16_t flags,
749 uint16_t mode,
750 ResponseHandler *handler,
751 uint16_t timeout )
752 {
753 XrdSysMutexHelper scopedLock( self->pMutex );
754
755 //--------------------------------------------------------------------------
756 // Check if we can proceed
757 //--------------------------------------------------------------------------
758 if( self->pFileState == Error )
759 return self->pStatus;
760
761 if( self->pFileState == OpenInProgress )
763
764 if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
765 self->pFileState == Recovering )
767
768 self->pFileState = OpenInProgress;
769
770 //--------------------------------------------------------------------------
771 // Check if the parameters are valid
772 //--------------------------------------------------------------------------
773 Log *log = DefaultEnv::GetLog();
774
775 if( self->pFileUrl )
776 {
777 if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
778 {
780 registry.Release( *self->pFileUrl );
781 }
782 delete self->pFileUrl;
783 self->pFileUrl = 0;
784 }
785
786 self->pFileUrl = new URL( url );
787
788 //--------------------------------------------------------------------------
789 // Add unique uuid to each open request so replays due to error/timeout
790 // recovery can be correctly handled.
791 //--------------------------------------------------------------------------
792 URL::ParamsMap cgi = self->pFileUrl->GetParams();
793 uuid_t uuid;
794 char requuid[37]= {0};
795 uuid_generate( uuid );
796 uuid_unparse( uuid, requuid );
797 cgi["xrdcl.requuid"] = requuid;
798 self->pFileUrl->SetParams( cgi );
799
800 if( !self->pFileUrl->IsValid() )
801 {
802 log->Error( FileMsg, "[0x%x@%s] Trying to open invalid url: %s",
803 self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
804 self->pStatus = XRootDStatus( stError, errInvalidArgs );
805 self->pFileState = Closed;
806 return self->pStatus;
807 }
808
809 //--------------------------------------------------------------------------
810 // Check if the recovery procedures should be enabled
811 //--------------------------------------------------------------------------
812 const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
813 URL::ParamsMap::const_iterator it;
814 it = urlParams.find( "xrdcl.recover-reads" );
815 if( (it != urlParams.end() && it->second == "false") ||
816 !self->pDoRecoverRead )
817 {
818 self->pDoRecoverRead = false;
819 log->Debug( FileMsg, "[0x%x@%s] Read recovery procedures are disabled",
820 self.get(), self->pFileUrl->GetURL().c_str() );
821 }
822
823 it = urlParams.find( "xrdcl.recover-writes" );
824 if( (it != urlParams.end() && it->second == "false") ||
825 !self->pDoRecoverWrite )
826 {
827 self->pDoRecoverWrite = false;
828 log->Debug( FileMsg, "[0x%x@%s] Write recovery procedures are disabled",
829 self.get(), self->pFileUrl->GetURL().c_str() );
830 }
831
832 //--------------------------------------------------------------------------
833 // Open the file
834 //--------------------------------------------------------------------------
835 log->Debug( FileMsg, "[0x%x@%s] Sending an open command", self.get(),
836 self->pFileUrl->GetURL().c_str() );
837
838 self->pOpenMode = mode;
839 self->pOpenFlags = flags;
840 OpenHandler *openHandler = new OpenHandler( self, handler );
841
842 Message *msg;
844 std::string path = self->pFileUrl->GetPathWithFilteredParams();
845 MessageUtils::CreateRequest( msg, req, path.length() );
846
847 req->requestid = kXR_open;
848 req->mode = mode;
849 req->options = flags | kXR_async | kXR_retstat;
850 req->dlen = path.length();
851 msg->Append( path.c_str(), path.length(), 24 );
852
854 MessageSendParams params; params.timeout = timeout;
855 params.followRedirects = self->pFollowRedirects;
857
858 XRootDStatus st = self->IssueRequest( *self->pFileUrl, msg, openHandler, params );
859
860 if( !st.IsOK() )
861 {
862 delete openHandler;
863 self->pStatus = st;
864 self->pFileState = Closed;
865 return st;
866 }
867 return st;
868 }
869
870 //----------------------------------------------------------------------------
871 // Close the file object
872 //----------------------------------------------------------------------------
873 XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
874 ResponseHandler *handler,
875 uint16_t timeout )
876 {
877 XrdSysMutexHelper scopedLock( self->pMutex );
878
879 //--------------------------------------------------------------------------
880 // Check if we can proceed
881 //--------------------------------------------------------------------------
882 if( self->pFileState == Error )
883 return self->pStatus;
884
885 if( self->pFileState == CloseInProgress )
887
888 if( self->pFileState == Closed )
890
891 if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
893
894 if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
896
897 self->pFileState = CloseInProgress;
898
899 Log *log = DefaultEnv::GetLog();
900 log->Debug( FileMsg, "[0x%x@%s] Sending a close command for handle 0x%x to "
901 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
902 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
903
904 //--------------------------------------------------------------------------
905 // Close the file
906 //--------------------------------------------------------------------------
907 Message *msg;
909 MessageUtils::CreateRequest( msg, req );
910
911 req->requestid = kXR_close;
912 memcpy( req->fhandle, self->pFileHandle, 4 );
913
915 msg->SetSessionId( self->pSessionId );
916 CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
917 MessageSendParams params;
918 params.timeout = timeout;
919 params.followRedirects = false;
920 params.stateful = true;
922
923 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
924
925 if( !st.IsOK() )
926 {
927 // an invalid-session error means the connection to the server has been
928 // closed, which in turn means that the server closed the file already
931 st.code == errPollerError || st.code == errSocketError )
932 {
933 self->pFileState = Closed;
934 ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
935 nullptr, nullptr );
937 return XRootDStatus();
938 }
939
940 delete closeHandler;
941 self->pStatus = st;
942 self->pFileState = Error;
943 return st;
944 }
945 return st;
946 }
947
948 //----------------------------------------------------------------------------
949 // Stat the file
950 //----------------------------------------------------------------------------
951 XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
952 bool force,
953 ResponseHandler *handler,
954 uint16_t timeout )
955 {
956 XrdSysMutexHelper scopedLock( self->pMutex );
957
958 if( self->pFileState == Error ) return self->pStatus;
959
960 if( self->pFileState != Opened && self->pFileState != Recovering )
962
963 //--------------------------------------------------------------------------
964 // Return the cached info
965 //--------------------------------------------------------------------------
966 if( !force )
967 {
968 AnyObject *obj = new AnyObject();
969 obj->Set( new StatInfo( *self->pStatInfo ) );
970 if (handler)
971 handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
972 return XRootDStatus();
973 }
974
975 Log *log = DefaultEnv::GetLog();
976 log->Debug( FileMsg, "[0x%x@%s] Sending a stat command for handle 0x%x to "
977 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
978 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
979
980 //--------------------------------------------------------------------------
981 // Issue a new stat request
982 // stating a file handle doesn't work (fixed in 3.2.0) so we need to
983 // stat the pat
984 //--------------------------------------------------------------------------
985 Message *msg;
987 std::string path = self->pFileUrl->GetPath();
988 MessageUtils::CreateRequest( msg, req );
989
990 req->requestid = kXR_stat;
991 memcpy( req->fhandle, self->pFileHandle, 4 );
992
993 MessageSendParams params;
994 params.timeout = timeout;
995 params.followRedirects = false;
996 params.stateful = true;
998
1000 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1001
1002 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1003 }
1004
1005 //----------------------------------------------------------------------------
1006 // Read a data chunk at a given offset - sync
1007 //----------------------------------------------------------------------------
1008 XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1009 uint64_t offset,
1010 uint32_t size,
1011 void *buffer,
1012 ResponseHandler *handler,
1013 uint16_t timeout )
1014 {
1015 XrdSysMutexHelper scopedLock( self->pMutex );
1016
1017 if( self->pFileState == Error ) return self->pStatus;
1018
1019 if( self->pFileState != Opened && self->pFileState != Recovering )
1021
1022 Log *log = DefaultEnv::GetLog();
1023 log->Debug( FileMsg, "[0x%x@%s] Sending a read command for handle 0x%x to "
1024 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1025 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1026
1027 Message *msg;
1028 ClientReadRequest *req;
1029 MessageUtils::CreateRequest( msg, req );
1030
1031 req->requestid = kXR_read;
1032 req->offset = offset;
1033 req->rlen = size;
1034 memcpy( req->fhandle, self->pFileHandle, 4 );
1035
1036 ChunkList *list = new ChunkList();
1037 list->push_back( ChunkInfo( offset, size, buffer ) );
1038
1040 MessageSendParams params;
1041 params.timeout = timeout;
1042 params.followRedirects = false;
1043 params.stateful = true;
1044 params.chunkList = list;
1046 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1047
1048 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1049 }
1050
1051 //------------------------------------------------------------------------
1052 // Read data pages at a given offset
1053 //------------------------------------------------------------------------
1054 XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1055 uint64_t offset,
1056 uint32_t size,
1057 void *buffer,
1058 ResponseHandler *handler,
1059 uint16_t timeout )
1060 {
1061 int issupported = true;
1062 AnyObject obj;
1064 int protver = 0;
1065 XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1066 if( st1.IsOK() && st2.IsOK() )
1067 {
1068 int *ptr = 0;
1069 obj.Get( ptr );
1070 issupported = ( *ptr & kXR_suppgrw ) && ( protver >= kXR_PROTPGRWVERSION );
1071 delete ptr;
1072 }
1073 else
1074 issupported = false;
1075
1076 if( !issupported )
1077 {
1078 DefaultEnv::GetLog()->Debug( FileMsg, "[0x%x@%s] PgRead not supported; substituting with Read.",
1079 self.get(), self->pFileUrl->GetURL().c_str() );
1080 ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1081 auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1082 if( !st.IsOK() ) delete substitHandler;
1083 return st;
1084 }
1085
1086 ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1087 auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1088 if( !st.IsOK() ) delete pgHandler;
1089 return st;
1090 }
1091
1092 XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1093 uint64_t offset,
1094 uint32_t size,
1095 size_t pgnb,
1096 void *buffer,
1097 PgReadHandler *handler,
1098 uint16_t timeout )
1099 {
1100 if( size > (uint32_t)XrdSys::PageSize )
1101 return XRootDStatus( stError, errInvalidArgs, EINVAL,
1102 "PgRead retry size exceeded 4KB." );
1103
1104 ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1105 XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1106 if( !st.IsOK() ) delete retryHandler;
1107 return st;
1108 }
1109
1110 XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1111 uint64_t offset,
1112 uint32_t size,
1113 void *buffer,
1114 uint16_t flags,
1115 ResponseHandler *handler,
1116 uint16_t timeout )
1117 {
1118 XrdSysMutexHelper scopedLock( self->pMutex );
1119
1120 if( self->pFileState == Error ) return self->pStatus;
1121
1122 if( self->pFileState != Opened && self->pFileState != Recovering )
1124
1125 Log *log = DefaultEnv::GetLog();
1126 log->Debug( FileMsg, "[0x%x@%s] Sending a pgread command for handle 0x%x to "
1127 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1128 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1129
1130 Message *msg;
1132 MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1133
1134 req->requestid = kXR_pgread;
1135 req->offset = offset;
1136 req->rlen = size;
1137 memcpy( req->fhandle, self->pFileHandle, 4 );
1138
1139 //--------------------------------------------------------------------------
1140 // Now adjust the message size so it can hold PgRead arguments
1141 //--------------------------------------------------------------------------
1142 req->dlen = sizeof( ClientPgReadReqArgs );
1143 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1144 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1145 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1146 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1147 args->reqflags = flags;
1148
1149 ChunkList *list = new ChunkList();
1150 list->push_back( ChunkInfo( offset, size, buffer ) );
1151
1153 MessageSendParams params;
1154 params.timeout = timeout;
1155 params.followRedirects = false;
1156 params.stateful = true;
1157 params.chunkList = list;
1159 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1160
1161 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1162 }
1163
1164 //----------------------------------------------------------------------------
1165 // Write a data chunk at a given offset - async
1166 //----------------------------------------------------------------------------
1167 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1168 uint64_t offset,
1169 uint32_t size,
1170 const void *buffer,
1171 ResponseHandler *handler,
1172 uint16_t timeout )
1173 {
1174 XrdSysMutexHelper scopedLock( self->pMutex );
1175
1176 if( self->pFileState == Error ) return self->pStatus;
1177
1178 if( self->pFileState != Opened && self->pFileState != Recovering )
1180
1181 Log *log = DefaultEnv::GetLog();
1182 log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
1183 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1184 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1185
1186 Message *msg;
1187 ClientWriteRequest *req;
1188 MessageUtils::CreateRequest( msg, req );
1189
1190 req->requestid = kXR_write;
1191 req->offset = offset;
1192 req->dlen = size;
1193 memcpy( req->fhandle, self->pFileHandle, 4 );
1194
1195 ChunkList *list = new ChunkList();
1196 list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1197
1198 MessageSendParams params;
1199 params.timeout = timeout;
1200 params.followRedirects = false;
1201 params.stateful = true;
1202 params.chunkList = list;
1203
1205
1207 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1208
1209 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1210 }
1211
1212 //----------------------------------------------------------------------------
1213 // Write a data chunk at a given offset
1214 //----------------------------------------------------------------------------
1215 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1216 uint64_t offset,
1217 Buffer &&buffer,
1218 ResponseHandler *handler,
1219 uint16_t timeout )
1220 {
1221 //--------------------------------------------------------------------------
1222 // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1223 // so fall back to normal write
1224 //--------------------------------------------------------------------------
1225 if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1226 {
1227 Log *log = DefaultEnv::GetLog();
1228 log->Info( FileMsg, "[0x%x@%s] Buffer is not page aligned (4KB), cannot "
1229 "convert it to kernel space buffer.", self.get(), self->pFileUrl->GetURL().c_str(),
1230 *((uint32_t*)self->pFileHandle) );
1231
1232 void *buff = buffer.GetBuffer();
1233 uint32_t size = buffer.GetSize();
1234 ReleaseBufferHandler *wrtHandler =
1235 new ReleaseBufferHandler( std::move( buffer ), handler );
1236 XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1237 if( !st.IsOK() )
1238 {
1239 buffer = std::move( wrtHandler->GetBuffer() );
1240 delete wrtHandler;
1241 }
1242 return st;
1243 }
1244
1245 //--------------------------------------------------------------------------
1246 // Transfer the data from user space to kernel space
1247 //--------------------------------------------------------------------------
1248 uint32_t length = buffer.GetSize();
1249 char *ubuff = buffer.Release();
1250
1251 std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1252 ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1253 if( ret < 0 )
1255
1256 //--------------------------------------------------------------------------
1257 // Now create a write request and enqueue it
1258 //--------------------------------------------------------------------------
1259 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1260 }
1261
1262 //----------------------------------------------------------------------------
1263 // Write a data from a given file descriptor at a given offset - async
1264 //----------------------------------------------------------------------------
1265 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1266 uint64_t offset,
1267 uint32_t size,
1268 Optional<uint64_t> fdoff,
1269 int fd,
1270 ResponseHandler *handler,
1271 uint16_t timeout )
1272 {
1273 //--------------------------------------------------------------------------
1274 // Read the data from the file descriptor into a kernel buffer
1275 //--------------------------------------------------------------------------
1276 std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1277 ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1278 XrdSys::Read( fd, *kbuff, size );
1279 if( ret < 0 )
1281
1282 //--------------------------------------------------------------------------
1283 // Now create a write request and enqueue it
1284 //--------------------------------------------------------------------------
1285 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1286 }
1287
1288 //----------------------------------------------------------------------------
1289 // Write number of pages at a given offset - async
1290 //----------------------------------------------------------------------------
1291 XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1292 uint64_t offset,
1293 uint32_t size,
1294 const void *buffer,
1295 std::vector<uint32_t> &cksums,
1296 ResponseHandler *handler,
1297 uint16_t timeout )
1298 {
1299 //--------------------------------------------------------------------------
1300 // Resolve timeout value
1301 //--------------------------------------------------------------------------
1302 if( timeout == 0 )
1303 {
1304 int val = DefaultRequestTimeout;
1305 XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1306 timeout = val;
1307 }
1308
1309 //--------------------------------------------------------------------------
1310 // Validate the digest vector size
1311 //--------------------------------------------------------------------------
1312 if( cksums.empty() )
1313 {
1314 const char *data = static_cast<const char*>( buffer );
1315 XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1316 }
1317 else
1318 {
1319 size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1320 if( crc32cCnt != cksums.size() )
1321 return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1322 }
1323
1324 //--------------------------------------------------------------------------
1325 // Create a context for PgWrite operation
1326 //--------------------------------------------------------------------------
1327 struct pgwrt_t
1328 {
1329 pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1330 {
1331 }
1332
1333 ~pgwrt_t()
1334 {
1335 if( handler )
1336 {
1337 // if all retries were successful no error status was set
1338 if( !status ) status = new XRootDStatus();
1339 handler->HandleResponse( status, nullptr );
1340 }
1341 }
1342
1343 static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1344 {
1345 if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1346 return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1347 }
1348
1349 inline void SetStatus( XRootDStatus* s )
1350 {
1351 if( !status ) status = s;
1352 else delete s;
1353 }
1354
1355 ResponseHandler *handler;
1356 XRootDStatus *status;
1357 };
1358 auto pgwrt = std::make_shared<pgwrt_t>( handler );
1359
1360 int fLen, lLen;
1361 XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1362 uint32_t fstpglen = fLen;
1363
1364 time_t start = ::time( nullptr );
1365 auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1366 {
1367 std::unique_ptr<AnyObject> scoped( r );
1368 // if the request failed simply pass the status to the
1369 // user handler
1370 if( !s->IsOK() )
1371 {
1372 pgwrt->SetStatus( s );
1373 return; // pgwrt destructor will call the handler
1374 }
1375 // also if the request was sucessful and there were no
1376 // corrupted pages pass the status to the user handler
1377 RetryInfo *inf = nullptr;
1378 r->Get( inf );
1379 if( !inf->NeedRetry() )
1380 {
1381 pgwrt->SetStatus( s );
1382 return; // pgwrt destructor will call the handler
1383 }
1384 delete s;
1385 // first adjust the timeout value
1386 uint16_t elapsed = ::time( nullptr ) - start;
1387 if( elapsed >= timeout )
1388 {
1389 pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1390 return; // pgwrt destructor will call the handler
1391 }
1392 else timeout -= elapsed;
1393 // retransmit the corrupted pages
1394 for( size_t i = 0; i < inf->Size(); ++i )
1395 {
1396 auto tpl = inf->At( i );
1397 uint64_t pgoff = std::get<0>( tpl );
1398 uint32_t pglen = std::get<1>( tpl );
1399 const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1400 uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1401 auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1402 {
1403 std::unique_ptr<AnyObject> scoped( r );
1404 // if we failed simply set the status
1405 if( !s->IsOK() )
1406 {
1407 pgwrt->SetStatus( s );
1408 return; // the destructor will call the handler
1409 }
1410 delete s;
1411 // otherwise check if the data were not corrupted again
1412 RetryInfo *inf = nullptr;
1413 r->Get( inf );
1414 if( inf->NeedRetry() ) // so we failed in the end
1415 {
1416 DefaultEnv::GetLog()->Warning( FileMsg, "[0x%x@%s] Failed retransmitting corrupted "
1417 "page: pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1418 self->pFileUrl->GetURL().c_str(), pgoff, pglen, pgdigest );
1419 pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1420 "Failed to retransmit corrupted page" ) );
1421 }
1422 else
1423 DefaultEnv::GetLog()->Info( FileMsg, "[0x%x@%s] Succesfuly retransmitted corrupted "
1424 "page: pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1425 self->pFileUrl->GetURL().c_str(), pgoff, pglen, pgdigest );
1426 } );
1427 auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1428 if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1429 DefaultEnv::GetLog()->Info( FileMsg, "[0x%x@%s] Retransmitting corrupted page: "
1430 "pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1431 self->pFileUrl->GetURL().c_str(), pgoff, pglen, pgdigest );
1432 }
1433 } );
1434
1435 auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1436 if( !st.IsOK() )
1437 {
1438 pgwrt->handler = nullptr;
1439 delete h;
1440 }
1441 return st;
1442 }
1443
1444 //------------------------------------------------------------------------
1445 // Write number of pages at a given offset - async
1446 //------------------------------------------------------------------------
1447 XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1448 uint64_t offset,
1449 uint32_t size,
1450 const void *buffer,
1451 uint32_t digest,
1452 ResponseHandler *handler,
1453 uint16_t timeout )
1454 {
1455 std::vector<uint32_t> cksums{ digest };
1456 return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1457 }
1458
1459 //------------------------------------------------------------------------
1460 // Write number of pages at a given offset - async
1461 //------------------------------------------------------------------------
1462 XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1463 uint64_t offset,
1464 uint32_t size,
1465 const void *buffer,
1466 std::vector<uint32_t> &cksums,
1467 kXR_char flags,
1468 ResponseHandler *handler,
1469 uint16_t timeout )
1470 {
1471 XrdSysMutexHelper scopedLock( self->pMutex );
1472
1473 if( self->pFileState == Error ) return self->pStatus;
1474
1475 if( self->pFileState != Opened && self->pFileState != Recovering )
1477
1478 Log *log = DefaultEnv::GetLog();
1479 log->Debug( FileMsg, "[0x%x@%s] Sending a pgwrite command for handle 0x%x to "
1480 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1481 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1482
1483 //--------------------------------------------------------------------------
1484 // Create the message
1485 //--------------------------------------------------------------------------
1486 Message *msg;
1488 MessageUtils::CreateRequest( msg, req );
1489
1490 req->requestid = kXR_pgwrite;
1491 req->offset = offset;
1492 req->dlen = size + cksums.size() * sizeof( uint32_t );
1493 req->reqflags = flags;
1494 memcpy( req->fhandle, self->pFileHandle, 4 );
1495
1496 ChunkList *list = new ChunkList();
1497 list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1498
1499 MessageSendParams params;
1500 params.timeout = timeout;
1501 params.followRedirects = false;
1502 params.stateful = true;
1503 params.chunkList = list;
1504 params.crc32cDigests.swap( cksums );
1505
1507
1509 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1510
1511 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1512 }
1513
1514 //----------------------------------------------------------------------------
1515 // Commit all pending disk writes - async
1516 //----------------------------------------------------------------------------
1517 XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1518 ResponseHandler *handler,
1519 uint16_t timeout )
1520 {
1521 XrdSysMutexHelper scopedLock( self->pMutex );
1522
1523 if( self->pFileState == Error ) return self->pStatus;
1524
1525 if( self->pFileState != Opened && self->pFileState != Recovering )
1527
1528 Log *log = DefaultEnv::GetLog();
1529 log->Debug( FileMsg, "[0x%x@%s] Sending a sync command for handle 0x%x to "
1530 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1531 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1532
1533 Message *msg;
1534 ClientSyncRequest *req;
1535 MessageUtils::CreateRequest( msg, req );
1536
1537 req->requestid = kXR_sync;
1538 memcpy( req->fhandle, self->pFileHandle, 4 );
1539
1540 MessageSendParams params;
1541 params.timeout = timeout;
1542 params.followRedirects = false;
1543 params.stateful = true;
1545
1547 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1548
1549 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1550 }
1551
1552 //----------------------------------------------------------------------------
1553 // Truncate the file to a particular size - async
1554 //----------------------------------------------------------------------------
1555 XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1556 uint64_t size,
1557 ResponseHandler *handler,
1558 uint16_t timeout )
1559 {
1560 XrdSysMutexHelper scopedLock( self->pMutex );
1561
1562 if( self->pFileState == Error ) return self->pStatus;
1563
1564 if( self->pFileState != Opened && self->pFileState != Recovering )
1566
1567 Log *log = DefaultEnv::GetLog();
1568 log->Debug( FileMsg, "[0x%x@%s] Sending a truncate command for handle 0x%x to "
1569 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1570 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1571
1572 Message *msg;
1574 MessageUtils::CreateRequest( msg, req );
1575
1576 req->requestid = kXR_truncate;
1577 memcpy( req->fhandle, self->pFileHandle, 4 );
1578 req->offset = size;
1579
1580 MessageSendParams params;
1581 params.timeout = timeout;
1582 params.followRedirects = false;
1583 params.stateful = true;
1585
1587 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1588
1589 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1590 }
1591
1592 //----------------------------------------------------------------------------
1593 // Read scattered data chunks in one operation - async
1594 //----------------------------------------------------------------------------
1595 XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1596 const ChunkList &chunks,
1597 void *buffer,
1598 ResponseHandler *handler,
1599 uint16_t timeout )
1600 {
1601 //--------------------------------------------------------------------------
1602 // Sanity check
1603 //--------------------------------------------------------------------------
1604 XrdSysMutexHelper scopedLock( self->pMutex );
1605
1606 if( self->pFileState == Error ) return self->pStatus;
1607
1608 if( self->pFileState != Opened && self->pFileState != Recovering )
1610
1611 Log *log = DefaultEnv::GetLog();
1612 log->Debug( FileMsg, "[0x%x@%s] Sending a vector read command for handle "
1613 "0x%x to %s", self.get(), self->pFileUrl->GetURL().c_str(),
1614 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1615
1616 //--------------------------------------------------------------------------
1617 // Build the message
1618 //--------------------------------------------------------------------------
1619 Message *msg;
1620 ClientReadVRequest *req;
1621 MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1622
1623 req->requestid = kXR_readv;
1624 req->dlen = sizeof(readahead_list)*chunks.size();
1625
1626 ChunkList *list = new ChunkList();
1627 char *cursor = (char*)buffer;
1628
1629 //--------------------------------------------------------------------------
1630 // Copy the chunk info
1631 //--------------------------------------------------------------------------
1632 readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1633 for( size_t i = 0; i < chunks.size(); ++i )
1634 {
1635 dataChunk[i].rlen = chunks[i].length;
1636 dataChunk[i].offset = chunks[i].offset;
1637 memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1638
1639 void *chunkBuffer;
1640 if( cursor )
1641 {
1642 chunkBuffer = cursor;
1643 cursor += chunks[i].length;
1644 }
1645 else
1646 chunkBuffer = chunks[i].buffer;
1647
1648 list->push_back( ChunkInfo( chunks[i].offset,
1649 chunks[i].length,
1650 chunkBuffer ) );
1651 }
1652
1653 //--------------------------------------------------------------------------
1654 // Send the message
1655 //--------------------------------------------------------------------------
1656 MessageSendParams params;
1657 params.timeout = timeout;
1658 params.followRedirects = false;
1659 params.stateful = true;
1660 params.chunkList = list;
1662
1664 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1665
1666 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1667 }
1668
1669 //------------------------------------------------------------------------
1670 // Write scattered data chunks in one operation - async
1671 //------------------------------------------------------------------------
1672 XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1673 const ChunkList &chunks,
1674 ResponseHandler *handler,
1675 uint16_t timeout )
1676 {
1677 //--------------------------------------------------------------------------
1678 // Sanity check
1679 //--------------------------------------------------------------------------
1680 XrdSysMutexHelper scopedLock( self->pMutex );
1681
1682 if( self->pFileState == Error ) return self->pStatus;
1683
1684 if( self->pFileState != Opened && self->pFileState != Recovering )
1686
1687 Log *log = DefaultEnv::GetLog();
1688 log->Debug( FileMsg, "[0x%x@%s] Sending a vector write command for handle "
1689 "0x%x to %s", self.get(), self->pFileUrl->GetURL().c_str(),
1690 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1691
1692 //--------------------------------------------------------------------------
1693 // Determine the size of the payload
1694 //--------------------------------------------------------------------------
1695
1696 // the size of write vector
1697 uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1698
1699 //--------------------------------------------------------------------------
1700 // Build the message
1701 //--------------------------------------------------------------------------
1702 Message *msg;
1704 MessageUtils::CreateRequest( msg, req, payloadSize );
1705
1706 req->requestid = kXR_writev;
1707 req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1708
1709 ChunkList *list = new ChunkList();
1710
1711 //--------------------------------------------------------------------------
1712 // Copy the chunk info
1713 //--------------------------------------------------------------------------
1714 XrdProto::write_list *writeList =
1715 reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1716
1717
1718
1719 for( size_t i = 0; i < chunks.size(); ++i )
1720 {
1721 writeList[i].wlen = chunks[i].length;
1722 writeList[i].offset = chunks[i].offset;
1723 memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1724
1725 list->push_back( ChunkInfo( chunks[i].offset,
1726 chunks[i].length,
1727 chunks[i].buffer ) );
1728 }
1729
1730 //--------------------------------------------------------------------------
1731 // Send the message
1732 //--------------------------------------------------------------------------
1733 MessageSendParams params;
1734 params.timeout = timeout;
1735 params.followRedirects = false;
1736 params.stateful = true;
1737 params.chunkList = list;
1739
1741 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1742
1743 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1744 }
1745
1746 //------------------------------------------------------------------------
1747 // Write scattered buffers in one operation - async
1748 //------------------------------------------------------------------------
1749 XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1750 uint64_t offset,
1751 const struct iovec *iov,
1752 int iovcnt,
1753 ResponseHandler *handler,
1754 uint16_t timeout )
1755 {
1756 XrdSysMutexHelper scopedLock( self->pMutex );
1757
1758 if( self->pFileState == Error ) return self->pStatus;
1759
1760 if( self->pFileState != Opened && self->pFileState != Recovering )
1762
1763 Log *log = DefaultEnv::GetLog();
1764 log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
1765 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1766 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1767
1768 Message *msg;
1769 ClientWriteRequest *req;
1770 MessageUtils::CreateRequest( msg, req );
1771
1772 ChunkList *list = new ChunkList();
1773
1774 uint32_t size = 0;
1775 for( int i = 0; i < iovcnt; ++i )
1776 {
1777 if( iov[i].iov_len == 0 ) continue;
1778 size += iov[i].iov_len;
1779 list->push_back( ChunkInfo( 0, iov[i].iov_len,
1780 (char*)iov[i].iov_base ) );
1781 }
1782
1783 req->requestid = kXR_write;
1784 req->offset = offset;
1785 req->dlen = size;
1786 memcpy( req->fhandle, self->pFileHandle, 4 );
1787
1788 MessageSendParams params;
1789 params.timeout = timeout;
1790 params.followRedirects = false;
1791 params.stateful = true;
1792 params.chunkList = list;
1793
1795
1797 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1798
1799 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1800 }
1801
1802 //------------------------------------------------------------------------
1803 // Read data into scattered buffers in one operation - async
1804 //------------------------------------------------------------------------
1805 XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1806 uint64_t offset,
1807 struct iovec *iov,
1808 int iovcnt,
1809 ResponseHandler *handler,
1810 uint16_t timeout )
1811 {
1812 XrdSysMutexHelper scopedLock( self->pMutex );
1813
1814 if( self->pFileState == Error ) return self->pStatus;
1815
1816 if( self->pFileState != Opened && self->pFileState != Recovering )
1818
1819 Log *log = DefaultEnv::GetLog();
1820 log->Debug( FileMsg, "[0x%x@%s] Sending a read command for handle 0x%x to "
1821 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1822 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1823
1824 Message *msg;
1825 ClientReadRequest *req;
1826 MessageUtils::CreateRequest( msg, req );
1827
1828 // calculate the total read size
1829 size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1830 {
1831 return acc + rhs.iov_len;
1832 } );
1833 req->requestid = kXR_read;
1834 req->offset = offset;
1835 req->rlen = size;
1837 memcpy( req->fhandle, self->pFileHandle, 4 );
1838
1839 ChunkList *list = new ChunkList();
1840 list->reserve( iovcnt );
1841 uint64_t choff = offset;
1842 for( int i = 0; i < iovcnt; ++i )
1843 {
1844 list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1845 choff += iov[i].iov_len;
1846 }
1847
1849 MessageSendParams params;
1850 params.timeout = timeout;
1851 params.followRedirects = false;
1852 params.stateful = true;
1853 params.chunkList = list;
1855 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1856
1857 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1858 }
1859
1860 //----------------------------------------------------------------------------
1861 // Performs a custom operation on an open file, server implementation
1862 // dependent - async
1863 //----------------------------------------------------------------------------
1864 XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1865 const Buffer &arg,
1866 ResponseHandler *handler,
1867 uint16_t timeout )
1868 {
1869 XrdSysMutexHelper scopedLock( self->pMutex );
1870
1871 if( self->pFileState == Error ) return self->pStatus;
1872
1873 if( self->pFileState != Opened && self->pFileState != Recovering )
1875
1876 Log *log = DefaultEnv::GetLog();
1877 log->Debug( FileMsg, "[0x%x@%s] Sending a fcntl command for handle 0x%x to "
1878 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1879 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1880
1881 Message *msg;
1882 ClientQueryRequest *req;
1883 MessageUtils::CreateRequest( msg, req, arg.GetSize() );
1884
1885 req->requestid = kXR_query;
1886 req->infotype = kXR_Qopaqug;
1887 req->dlen = arg.GetSize();
1888 memcpy( req->fhandle, self->pFileHandle, 4 );
1889 msg->Append( arg.GetBuffer(), arg.GetSize(), 24 );
1890
1891 MessageSendParams params;
1892 params.timeout = timeout;
1893 params.followRedirects = false;
1894 params.stateful = true;
1896
1898 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1899
1900 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1901 }
1902
1903 //----------------------------------------------------------------------------
1904 // Get access token to a file - async
1905 //----------------------------------------------------------------------------
1906 XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
1907 ResponseHandler *handler,
1908 uint16_t timeout )
1909 {
1910 XrdSysMutexHelper scopedLock( self->pMutex );
1911
1912 if( self->pFileState == Error ) return self->pStatus;
1913
1914 if( self->pFileState != Opened && self->pFileState != Recovering )
1916
1917 Log *log = DefaultEnv::GetLog();
1918 log->Debug( FileMsg, "[0x%x@%s] Sending a visa command for handle 0x%x to "
1919 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1920 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1921
1922 Message *msg;
1923 ClientQueryRequest *req;
1924 MessageUtils::CreateRequest( msg, req );
1925
1926 req->requestid = kXR_query;
1927 req->infotype = kXR_Qvisa;
1928 memcpy( req->fhandle, self->pFileHandle, 4 );
1929
1930 MessageSendParams params;
1931 params.timeout = timeout;
1932 params.followRedirects = false;
1933 params.stateful = true;
1935
1937 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1938
1939 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1940 }
1941
1942 //------------------------------------------------------------------------
1943 // Set extended attributes - async
1944 //------------------------------------------------------------------------
1945 XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
1946 const std::vector<xattr_t> &attrs,
1947 ResponseHandler *handler,
1948 uint16_t timeout )
1949 {
1950 XrdSysMutexHelper scopedLock( self->pMutex );
1951
1952 if( self->pFileState == Error ) return self->pStatus;
1953
1954 if( self->pFileState != Opened && self->pFileState != Recovering )
1956
1957 Log *log = DefaultEnv::GetLog();
1958 log->Debug( FileMsg, "[0x%x@%s] Sending a fattr set command for handle 0x%x to "
1959 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1960 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1961
1962 //--------------------------------------------------------------------------
1963 // Issue a new fattr get request
1964 //--------------------------------------------------------------------------
1965 return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
1966 }
1967
1968 //------------------------------------------------------------------------
1969 // Get extended attributes - async
1970 //------------------------------------------------------------------------
1971 XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
1972 const std::vector<std::string> &attrs,
1973 ResponseHandler *handler,
1974 uint16_t timeout )
1975 {
1976 XrdSysMutexHelper scopedLock( self->pMutex );
1977
1978 if( self->pFileState == Error ) return self->pStatus;
1979
1980 if( self->pFileState != Opened && self->pFileState != Recovering )
1982
1983 Log *log = DefaultEnv::GetLog();
1984 log->Debug( FileMsg, "[0x%x@%s] Sending a fattr get command for handle 0x%x to "
1985 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1986 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1987
1988 //--------------------------------------------------------------------------
1989 // Issue a new fattr get request
1990 //--------------------------------------------------------------------------
1991 return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
1992 }
1993
1994 //------------------------------------------------------------------------
1995 // Delete extended attributes - async
1996 //------------------------------------------------------------------------
1997 XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
1998 const std::vector<std::string> &attrs,
1999 ResponseHandler *handler,
2000 uint16_t timeout )
2001 {
2002 XrdSysMutexHelper scopedLock( self->pMutex );
2003
2004 if( self->pFileState == Error ) return self->pStatus;
2005
2006 if( self->pFileState != Opened && self->pFileState != Recovering )
2008
2009 Log *log = DefaultEnv::GetLog();
2010 log->Debug( FileMsg, "[0x%x@%s] Sending a fattr del command for handle 0x%x to "
2011 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2012 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2013
2014 //--------------------------------------------------------------------------
2015 // Issue a new fattr del request
2016 //--------------------------------------------------------------------------
2017 return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2018 }
2019
2020 //------------------------------------------------------------------------
2021 // List extended attributes - async
2022 //------------------------------------------------------------------------
2023 XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2024 ResponseHandler *handler,
2025 uint16_t timeout )
2026 {
2027 XrdSysMutexHelper scopedLock( self->pMutex );
2028
2029 if( self->pFileState == Error ) return self->pStatus;
2030
2031 if( self->pFileState != Opened && self->pFileState != Recovering )
2033
2034 Log *log = DefaultEnv::GetLog();
2035 log->Debug( FileMsg, "[0x%x@%s] Sending a fattr list command for handle 0x%x to "
2036 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2037 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2038
2039 //--------------------------------------------------------------------------
2040 // Issue a new fattr get request
2041 //--------------------------------------------------------------------------
2042 static const std::vector<std::string> nothing;
2043 return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2044 nothing, handler, timeout );
2045 }
2046
2047 //------------------------------------------------------------------------
2057 //------------------------------------------------------------------------
2058 XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2059 kXR_char code,
2060 ResponseHandler *handler,
2061 uint16_t timeout )
2062 {
2063 XrdSysMutexHelper scopedLock( self->pMutex );
2064
2065 if( self->pFileState == Error ) return self->pStatus;
2066
2067 if( self->pFileState != Opened && self->pFileState != Recovering )
2069
2070 Log *log = DefaultEnv::GetLog();
2071 log->Debug( FileMsg, "[0x%x@%s] Sending a checkpoint command for "
2072 "handle 0x%x to %s", self.get(), self->pFileUrl->GetURL().c_str(),
2073 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2074
2075 Message *msg;
2077 MessageUtils::CreateRequest( msg, req );
2078
2079 req->requestid = kXR_chkpoint;
2080 req->opcode = code;
2081 memcpy( req->fhandle, self->pFileHandle, 4 );
2082
2083 MessageSendParams params;
2084 params.timeout = timeout;
2085 params.followRedirects = false;
2086 params.stateful = true;
2087
2089
2091 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2092
2093 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2094 }
2095
2096 //------------------------------------------------------------------------
2106 //------------------------------------------------------------------------
2107 XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2108 uint64_t offset,
2109 uint32_t size,
2110 const void *buffer,
2111 ResponseHandler *handler,
2112 uint16_t timeout )
2113 {
2114 XrdSysMutexHelper scopedLock( self->pMutex );
2115
2116 if( self->pFileState == Error ) return self->pStatus;
2117
2118 if( self->pFileState != Opened && self->pFileState != Recovering )
2120
2121 Log *log = DefaultEnv::GetLog();
2122 log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
2123 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2124 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2125
2126 Message *msg;
2128 MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2129
2130 req->requestid = kXR_chkpoint;
2131 req->opcode = kXR_ckpXeq;
2132 req->dlen = 24; // as specified in the protocol specification
2133 memcpy( req->fhandle, self->pFileHandle, 4 );
2134
2136 wrtreq->requestid = kXR_write;
2137 wrtreq->offset = offset;
2138 wrtreq->dlen = size;
2139 memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2140
2141 ChunkList *list = new ChunkList();
2142 list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2143
2144 MessageSendParams params;
2145 params.timeout = timeout;
2146 params.followRedirects = false;
2147 params.stateful = true;
2148 params.chunkList = list;
2149
2151
2153 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2154
2155 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2156 }
2157
2158 //------------------------------------------------------------------------
2168 //------------------------------------------------------------------------
2169 XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2170 uint64_t offset,
2171 const struct iovec *iov,
2172 int iovcnt,
2173 ResponseHandler *handler,
2174 uint16_t timeout )
2175 {
2176 XrdSysMutexHelper scopedLock( self->pMutex );
2177
2178 if( self->pFileState == Error ) return self->pStatus;
2179
2180 if( self->pFileState != Opened && self->pFileState != Recovering )
2182
2183 Log *log = DefaultEnv::GetLog();
2184 log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
2185 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2186 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2187
2188 Message *msg;
2190 MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2191
2192 req->requestid = kXR_chkpoint;
2193 req->opcode = kXR_ckpXeq;
2194 req->dlen = 24; // as specified in the protocol specification
2195 memcpy( req->fhandle, self->pFileHandle, 4 );
2196
2197 ChunkList *list = new ChunkList();
2198 uint32_t size = 0;
2199 for( int i = 0; i < iovcnt; ++i )
2200 {
2201 if( iov[i].iov_len == 0 ) continue;
2202 size += iov[i].iov_len;
2203 list->push_back( ChunkInfo( 0, iov[i].iov_len,
2204 (char*)iov[i].iov_base ) );
2205 }
2206
2208 wrtreq->requestid = kXR_write;
2209 wrtreq->offset = offset;
2210 wrtreq->dlen = size;
2211 memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2212
2213 MessageSendParams params;
2214 params.timeout = timeout;
2215 params.followRedirects = false;
2216 params.stateful = true;
2217 params.chunkList = list;
2218
2220
2222 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2223
2224 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2225 }
2226
2227 //----------------------------------------------------------------------------
2228 // Check if the file is open
2229 //----------------------------------------------------------------------------
2231 {
2232 XrdSysMutexHelper scopedLock( pMutex );
2233
2234 if( pFileState == Opened || pFileState == Recovering )
2235 return true;
2236 return false;
2237 }
2238
2239 //----------------------------------------------------------------------------
2240 // Set file property
2241 //----------------------------------------------------------------------------
2242 bool FileStateHandler::SetProperty( const std::string &name,
2243 const std::string &value )
2244 {
2245 XrdSysMutexHelper scopedLock( pMutex );
2246 if( name == "ReadRecovery" )
2247 {
2248 if( value == "true" ) pDoRecoverRead = true;
2249 else pDoRecoverRead = false;
2250 return true;
2251 }
2252 else if( name == "WriteRecovery" )
2253 {
2254 if( value == "true" ) pDoRecoverWrite = true;
2255 else pDoRecoverWrite = false;
2256 return true;
2257 }
2258 else if( name == "FollowRedirects" )
2259 {
2260 if( value == "true" ) pFollowRedirects = true;
2261 else pFollowRedirects = false;
2262 return true;
2263 }
2264 else if( name == "BundledClose" )
2265 {
2266 if( value == "true" ) pAllowBundledClose = true;
2267 else pAllowBundledClose = false;
2268 return true;
2269 }
2270 return false;
2271 }
2272
2273 //----------------------------------------------------------------------------
2274 // Get file property
2275 //----------------------------------------------------------------------------
2276 bool FileStateHandler::GetProperty( const std::string &name,
2277 std::string &value ) const
2278 {
2279 XrdSysMutexHelper scopedLock( pMutex );
2280 if( name == "ReadRecovery" )
2281 {
2282 if( pDoRecoverRead ) value = "true";
2283 else value = "false";
2284 return true;
2285 }
2286 else if( name == "WriteRecovery" )
2287 {
2288 if( pDoRecoverWrite ) value = "true";
2289 else value = "false";
2290 return true;
2291 }
2292 else if( name == "FollowRedirects" )
2293 {
2294 if( pFollowRedirects ) value = "true";
2295 else value = "false";
2296 return true;
2297 }
2298 else if( name == "DataServer" && pDataServer )
2299 { value = pDataServer->GetHostId(); return true; }
2300 else if( name == "LastURL" && pDataServer )
2301 { value = pDataServer->GetURL(); return true; }
2302 else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2303 { value = pWrtRecoveryRedir->GetHostId(); return true; }
2304 value = "";
2305 return false;
2306 }
2307
2308 //----------------------------------------------------------------------------
2309 // Process the results of the opening operation
2310 //----------------------------------------------------------------------------
2312 const OpenInfo *openInfo,
2313 const HostList *hostList )
2314 {
2315 Log *log = DefaultEnv::GetLog();
2316 XrdSysMutexHelper scopedLock( pMutex );
2317
2318 //--------------------------------------------------------------------------
2319 // Assign the data server and the load balancer
2320 //--------------------------------------------------------------------------
2321 std::string lastServer = pFileUrl->GetHostId();
2322 if( hostList )
2323 {
2324 delete pDataServer;
2325 delete pLoadBalancer;
2326 pLoadBalancer = 0;
2327 delete pWrtRecoveryRedir;
2328 pWrtRecoveryRedir = 0;
2329
2330 pDataServer = new URL( hostList->back().url );
2331 pDataServer->SetParams( pFileUrl->GetParams() );
2332 if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2333 lastServer = pDataServer->GetHostId();
2334 HostList::const_iterator itC;
2335 URL::ParamsMap params = pDataServer->GetParams();
2336 for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2337 {
2338 MessageUtils::MergeCGI( params,
2339 itC->url.GetParams(),
2340 true );
2341 }
2342 pDataServer->SetParams( params );
2343
2344 HostList::const_reverse_iterator it;
2345 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2346 if( it->loadBalancer )
2347 {
2348 pLoadBalancer = new URL( it->url );
2349 break;
2350 }
2351
2352 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2353 if( it->flags & kXR_recoverWrts )
2354 {
2355 pWrtRecoveryRedir = new URL( it->url );
2356 break;
2357 }
2358 }
2359
2360 log->Debug( FileMsg, "[0x%x@%s] Open has returned with status %s",
2361 this, pFileUrl->GetURL().c_str(), status->ToStr().c_str() );
2362
2363 if( pDataServer && !pDataServer->IsLocalFile() )
2364 {
2365 //------------------------------------------------------------------------
2366 // Check if we are using a secure connection
2367 //------------------------------------------------------------------------
2368 XrdCl::AnyObject isencobj;
2370 QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2371 if( st.IsOK() )
2372 {
2373 bool *isenc;
2374 isencobj.Get( isenc );
2375 pIsChannelEncrypted = *isenc;
2376 delete isenc;
2377 }
2378 }
2379
2380 //--------------------------------------------------------------------------
2381 // We have failed
2382 //--------------------------------------------------------------------------
2383 pStatus = *status;
2384 if( !pStatus.IsOK() || !openInfo )
2385 {
2386 log->Debug( FileMsg, "[0x%x@%s] Error while opening at %s: %s",
2387 this, pFileUrl->GetURL().c_str(), lastServer.c_str(),
2388 pStatus.ToStr().c_str() );
2389 FailQueuedMessages( pStatus );
2390 pFileState = Error;
2391
2392 //------------------------------------------------------------------------
2393 // Report to monitoring
2394 //------------------------------------------------------------------------
2396 if( mon )
2397 {
2399 i.file = pFileUrl;
2400 i.status = status;
2402 mon->Event( Monitor::EvErrIO, &i );
2403 }
2404 }
2405 //--------------------------------------------------------------------------
2406 // We have succeeded
2407 //--------------------------------------------------------------------------
2408 else
2409 {
2410 //------------------------------------------------------------------------
2411 // Store the response info
2412 //------------------------------------------------------------------------
2413 openInfo->GetFileHandle( pFileHandle );
2414 pSessionId = openInfo->GetSessionId();
2415 if( openInfo->GetStatInfo() )
2416 {
2417 delete pStatInfo;
2418 pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2419 }
2420
2421 log->Debug( FileMsg, "[0x%x@%s] successfully opened at %s, handle: 0x%x, "
2422 "session id: %ld", this, pFileUrl->GetURL().c_str(),
2423 pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2424 pSessionId );
2425
2426 //------------------------------------------------------------------------
2427 // Inform the monitoring about opening success
2428 //------------------------------------------------------------------------
2429 gettimeofday( &pOpenTime, 0 );
2431 if( mon )
2432 {
2434 i.file = pFileUrl;
2435 i.dataServer = pDataServer->GetHostId();
2436 i.oFlags = pOpenFlags;
2437 i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2438 mon->Event( Monitor::EvOpen, &i );
2439 }
2440
2441 //------------------------------------------------------------------------
2442 // Resend the queued messages if any
2443 //------------------------------------------------------------------------
2444 ReSendQueuedMessages();
2445 pFileState = Opened;
2446 }
2447 }
2448
2449 //----------------------------------------------------------------------------
2450 // Process the results of the closing operation
2451 //----------------------------------------------------------------------------
2453 {
2454 Log *log = DefaultEnv::GetLog();
2455 XrdSysMutexHelper scopedLock( pMutex );
2456
2457 log->Debug( FileMsg, "[0x%x@%s] Close returned from %s with: %s", this,
2458 pFileUrl->GetURL().c_str(), pDataServer->GetHostId().c_str(),
2459 status->ToStr().c_str() );
2460
2461 log->Dump( FileMsg, "[0x%x@%s] Items in the fly %d, queued for recovery %d",
2462 this, pFileUrl->GetURL().c_str(), pInTheFly.size(),
2463 pToBeRecovered.size() );
2464
2465 MonitorClose( status );
2466 ResetMonitoringVars();
2467
2468 pStatus = *status;
2469 pFileState = Closed;
2470 }
2471
2472 //----------------------------------------------------------------------------
2473 // Handle an error while sending a stateful message
2474 //----------------------------------------------------------------------------
2475 void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2476 XRootDStatus *status,
2477 Message *message,
2478 ResponseHandler *userHandler,
2479 MessageSendParams &sendParams )
2480 {
2481 //--------------------------------------------------------------------------
2482 // It may be a redirection
2483 //--------------------------------------------------------------------------
2484 if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2485 {
2486 static const std::string root = "root", xroot = "xroot", file = "file",
2487 roots = "roots", xroots = "xroots";
2488 std::string msg = status->GetErrorMessage();
2489 if( !msg.compare( 0, root.size(), root ) ||
2490 !msg.compare( 0, xroot.size(), xroot ) ||
2491 !msg.compare( 0, file.size(), file ) ||
2492 !msg.compare( 0, roots.size(), roots ) ||
2493 !msg.compare( 0, xroots.size(), xroots ) )
2494 {
2495 FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2496 return;
2497 }
2498 }
2499
2500 //--------------------------------------------------------------------------
2501 // Handle error
2502 //--------------------------------------------------------------------------
2503 Log *log = DefaultEnv::GetLog();
2504 XrdSysMutexHelper scopedLock( self->pMutex );
2505 self->pInTheFly.erase( message );
2506
2507 log->Dump( FileMsg, "[0x%x@%s] File state error encountered. Message %s "
2508 "returned with %s", self.get(), self->pFileUrl->GetURL().c_str(),
2509 message->GetDescription().c_str(), status->ToStr().c_str() );
2510
2511 //--------------------------------------------------------------------------
2512 // Report to monitoring
2513 //--------------------------------------------------------------------------
2515 if( mon )
2516 {
2518 i.file = self->pFileUrl;
2519 i.status = status;
2520
2521 ClientRequest *req = (ClientRequest*)message->GetBuffer();
2522 switch( req->header.requestid )
2523 {
2531 }
2532
2533 mon->Event( Monitor::EvErrIO, &i );
2534 }
2535
2536 //--------------------------------------------------------------------------
2537 // The message is not recoverable
2538 // (message using a kernel buffer is not recoverable by definition)
2539 //--------------------------------------------------------------------------
2540 if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2541 {
2542 log->Error( FileMsg, "[0x%x@%s] Fatal file state error. Message %s "
2543 "returned with %s", self.get(), self->pFileUrl->GetURL().c_str(),
2544 message->GetDescription().c_str(), status->ToStr().c_str() );
2545
2546 self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2547 delete status;
2548 return;
2549 }
2550
2551 //--------------------------------------------------------------------------
2552 // Insert the message to the recovery queue and start the recovery
2553 // procedure if we don't have any more message in the fly
2554 //--------------------------------------------------------------------------
2555 self->pCloseReason = *status;
2556 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2557 delete status;
2558 }
2559
2560 //----------------------------------------------------------------------------
2561 // Handle stateful redirect
2562 //----------------------------------------------------------------------------
2563 void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2564 const std::string &redirectUrl,
2565 Message *message,
2566 ResponseHandler *userHandler,
2567 MessageSendParams &sendParams )
2568 {
2569 XrdSysMutexHelper scopedLock( self->pMutex );
2570 self->pInTheFly.erase( message );
2571
2572 //--------------------------------------------------------------------------
2573 // Register the state redirect url and append the new cgi information to
2574 // the file URL
2575 //--------------------------------------------------------------------------
2576 if( !self->pStateRedirect )
2577 {
2578 std::ostringstream o;
2579 self->pStateRedirect = new URL( redirectUrl );
2580 URL::ParamsMap params = self->pFileUrl->GetParams();
2581 MessageUtils::MergeCGI( params,
2582 self->pStateRedirect->GetParams(),
2583 false );
2584 self->pFileUrl->SetParams( params );
2585 }
2586
2587 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2588 }
2589
2590 //----------------------------------------------------------------------------
2591 // Handle stateful response
2592 //----------------------------------------------------------------------------
2593 void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2594 XRootDStatus *status,
2595 Message *message,
2596 AnyObject *response,
2597 HostList */*urlList*/ )
2598 {
2599 Log *log = DefaultEnv::GetLog();
2600 XrdSysMutexHelper scopedLock( self->pMutex );
2601
2602 log->Dump( FileMsg, "[0x%x@%s] Got state response for message %s",
2603 self.get(), self->pFileUrl->GetURL().c_str(),
2604 message->GetDescription().c_str() );
2605
2606 //--------------------------------------------------------------------------
2607 // Since this message may be the last "in-the-fly" and no recovery
2608 // is done if messages are in the fly, we may need to trigger recovery
2609 //--------------------------------------------------------------------------
2610 self->pInTheFly.erase( message );
2611 RunRecovery( self );
2612
2613 //--------------------------------------------------------------------------
2614 // Play with the actual response before returning it. This is a good
2615 // place to do caching in the future.
2616 //--------------------------------------------------------------------------
2617 ClientRequest *req = (ClientRequest*)message->GetBuffer();
2618 switch( req->header.requestid )
2619 {
2620 //------------------------------------------------------------------------
2621 // Cache the stat response
2622 //------------------------------------------------------------------------
2623 case kXR_stat:
2624 {
2625 StatInfo *info = 0;
2626 response->Get( info );
2627 delete self->pStatInfo;
2628 self->pStatInfo = new StatInfo( *info );
2629 break;
2630 }
2631
2632 //------------------------------------------------------------------------
2633 // Handle read response
2634 //------------------------------------------------------------------------
2635 case kXR_read:
2636 {
2637 ++self->pRCount;
2638 self->pRBytes += req->read.rlen;
2639 break;
2640 }
2641
2642 //------------------------------------------------------------------------
2643 // Handle read response
2644 //------------------------------------------------------------------------
2645 case kXR_pgread:
2646 {
2647 ++self->pRCount;
2648 self->pRBytes += req->pgread.rlen;
2649 break;
2650 }
2651
2652 //------------------------------------------------------------------------
2653 // Handle readv response
2654 //------------------------------------------------------------------------
2655 case kXR_readv:
2656 {
2657 ++self->pVRCount;
2658 size_t segs = req->header.dlen/sizeof(readahead_list);
2659 readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2660 for( size_t i = 0; i < segs; ++i )
2661 self->pVRBytes += dataChunk[i].rlen;
2662 self->pVSegs += segs;
2663 break;
2664 }
2665
2666 //------------------------------------------------------------------------
2667 // Handle write response
2668 //------------------------------------------------------------------------
2669 case kXR_write:
2670 {
2671 ++self->pWCount;
2672 self->pWBytes += req->write.dlen;
2673 break;
2674 }
2675
2676 //------------------------------------------------------------------------
2677 // Handle write response
2678 //------------------------------------------------------------------------
2679 case kXR_pgwrite:
2680 {
2681 ++self->pWCount;
2682 self->pWBytes += req->pgwrite.dlen;
2683 break;
2684 }
2685
2686 //------------------------------------------------------------------------
2687 // Handle writev response
2688 //------------------------------------------------------------------------
2689 case kXR_writev:
2690 {
2691 ++self->pVWCount;
2692 size_t size = req->header.dlen/sizeof(readahead_list);
2693 XrdProto::write_list *wrtList =
2694 reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2695 for( size_t i = 0; i < size; ++i )
2696 self->pVWBytes += wrtList[i].wlen;
2697 break;
2698 }
2699 };
2700 }
2701
2702 //------------------------------------------------------------------------
2704 //------------------------------------------------------------------------
2705 void FileStateHandler::Tick( time_t now )
2706 {
2707 if (pMutex.CondLock())
2708 {TimeOutRequests( now );
2709 pMutex.UnLock();
2710 }
2711 }
2712
2713 //----------------------------------------------------------------------------
2714 // Declare timeout on requests being recovered
2715 //----------------------------------------------------------------------------
2717 {
2718 if( !pToBeRecovered.empty() )
2719 {
2720 Log *log = DefaultEnv::GetLog();
2721 log->Dump( FileMsg, "[0x%x@%s] Got a timer event", this,
2722 pFileUrl->GetURL().c_str() );
2723 RequestList::iterator it;
2725 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2726 {
2727 if( it->params.expires <= now )
2728 {
2729 jobMan->QueueJob( new ResponseJob(
2730 it->handler,
2732 0, it->params.hostList ) );
2733 it = pToBeRecovered.erase( it );
2734 }
2735 else
2736 ++it;
2737 }
2738 }
2739 }
2740
2741 //----------------------------------------------------------------------------
2742 // Called in the child process after the fork
2743 //----------------------------------------------------------------------------
2745 {
2746 Log *log = DefaultEnv::GetLog();
2747
2748 if( pFileState == Closed || pFileState == Error )
2749 return;
2750
2751 if( (IsReadOnly() && pDoRecoverRead) ||
2752 (!IsReadOnly() && pDoRecoverWrite) )
2753 {
2754 log->Debug( FileMsg, "[0x%x@%s] Putting the file in recovery state in "
2755 "process %d", this, pFileUrl->GetURL().c_str(), getpid() );
2756 pFileState = Recovering;
2757 pInTheFly.clear();
2758 pToBeRecovered.clear();
2759 }
2760 else
2761 pFileState = Error;
2762 }
2763
2764 //------------------------------------------------------------------------
2765 // Try other data server
2766 //------------------------------------------------------------------------
2767 XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, uint16_t timeout )
2768 {
2769 XrdSysMutexHelper scopedLock( self->pMutex );
2770
2771 if( self->pFileState != Opened || !self->pLoadBalancer )
2773
2774 self->pFileState = Recovering;
2775
2776 Log *log = DefaultEnv::GetLog();
2777 log->Debug( FileMsg, "[0x%x@%s] Reopen file at next data server.",
2778 self.get(), self->pFileUrl->GetURL().c_str() );
2779
2780 // merge CGI
2781 auto lbcgi = self->pLoadBalancer->GetParams();
2782 auto dtcgi = self->pDataServer->GetParams();
2783 MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2784 // update tried CGI
2785 auto itr = lbcgi.find( "tried" );
2786 if( itr == lbcgi.end() )
2787 lbcgi["tried"] = self->pDataServer->GetHostName();
2788 else
2789 {
2790 std::string tried = itr->second;
2791 tried += "," + self->pDataServer->GetHostName();
2792 lbcgi["tried"] = tried;
2793 }
2794 self->pLoadBalancer->SetParams( lbcgi );
2795
2796 return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2797 }
2798
2799 //------------------------------------------------------------------------
2800 // Generic implementation of xattr operation
2801 //------------------------------------------------------------------------
2802 template<typename T>
2803 Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2804 kXR_char subcode,
2805 kXR_char options,
2806 const std::vector<T> &attrs,
2807 ResponseHandler *handler,
2808 uint16_t timeout )
2809 {
2810 //--------------------------------------------------------------------------
2811 // Issue a new fattr request
2812 //--------------------------------------------------------------------------
2813 Message *msg;
2814 ClientFattrRequest *req;
2815 MessageUtils::CreateRequest( msg, req );
2816
2817 req->requestid = kXR_fattr;
2818 req->subcode = subcode;
2819 req->numattr = attrs.size();
2820 req->options = options;
2821 memcpy( req->fhandle, self->pFileHandle, 4 );
2823 if( !st.IsOK() ) return st;
2824
2825 MessageSendParams params;
2826 params.timeout = timeout;
2827 params.followRedirects = false;
2828 params.stateful = true;
2830
2832 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2833
2834 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2835 }
2836
2837 //----------------------------------------------------------------------------
2838 // Send a message to a host or put it in the recovery queue
2839 //----------------------------------------------------------------------------
2840 Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2841 const URL &url,
2842 Message *msg,
2843 ResponseHandler *handler,
2844 MessageSendParams &sendParams )
2845 {
2846 //--------------------------------------------------------------------------
2847 // Recovering
2848 //--------------------------------------------------------------------------
2849 if( self->pFileState == Recovering )
2850 {
2851 return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2852 }
2853
2854 //--------------------------------------------------------------------------
2855 // Trying to send
2856 //--------------------------------------------------------------------------
2857 if( self->pFileState == Opened )
2858 {
2859 msg->SetSessionId( self->pSessionId );
2860 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2861
2862 //------------------------------------------------------------------------
2863 // Invalid session id means that the connection has been broken while we
2864 // were idle so we haven't been informed about this fact earlier.
2865 //------------------------------------------------------------------------
2866 if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
2867 return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2868
2869 if( st.IsOK() )
2870 self->pInTheFly.insert(msg);
2871 else
2872 delete handler;
2873 return st;
2874 }
2875 return Status( stError, errInvalidOp );
2876 }
2877
2878 //----------------------------------------------------------------------------
2879 // Check if the stateful error is recoverable
2880 //----------------------------------------------------------------------------
2881 bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
2882 {
2883 const auto recoverable_errors = {
2890 };
2891
2892 if (pDoRecoverRead || pDoRecoverWrite)
2893 for (const auto error : recoverable_errors)
2894 if (status.code == error)
2895 return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
2896
2897 return false;
2898 }
2899
2900 //----------------------------------------------------------------------------
2901 // Check if the file is open for read only
2902 //----------------------------------------------------------------------------
2903 bool FileStateHandler::IsReadOnly() const
2904 {
2905 if( (pOpenFlags & kXR_open_read) && !(pOpenFlags & kXR_open_updt) &&
2906 !(pOpenFlags & kXR_open_apnd ) )
2907 return true;
2908 return false;
2909 }
2910
2911 //----------------------------------------------------------------------------
2912 // Recover a message
2913 //----------------------------------------------------------------------------
2914 Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
2915 RequestData rd,
2916 bool callbackOnFailure )
2917 {
2918 self->pFileState = Recovering;
2919
2920 Log *log = DefaultEnv::GetLog();
2921 log->Dump( FileMsg, "[0x%x@%s] Putting message %s in the recovery list",
2922 self.get(), self->pFileUrl->GetURL().c_str(),
2923 rd.request->GetDescription().c_str() );
2924
2925 Status st = RunRecovery( self );
2926 if( st.IsOK() )
2927 {
2928 self->pToBeRecovered.push_back( rd );
2929 return st;
2930 }
2931
2932 if( callbackOnFailure )
2933 self->FailMessage( rd, st );
2934
2935 return st;
2936 }
2937
2938 //----------------------------------------------------------------------------
2939 // Run the recovery procedure if appropriate
2940 //----------------------------------------------------------------------------
2941 Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
2942 {
2943 if( self->pFileState != Recovering )
2944 return Status();
2945
2946 if( !self->pInTheFly.empty() )
2947 return Status();
2948
2949 Log *log = DefaultEnv::GetLog();
2950 log->Debug( FileMsg, "[0x%x@%s] Running the recovery procedure", self.get(),
2951 self->pFileUrl->GetURL().c_str() );
2952
2953 Status st;
2954 if( self->pStateRedirect )
2955 {
2956 SendClose( self, 0 );
2957 st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
2958 delete self->pStateRedirect; self->pStateRedirect = 0;
2959 }
2960 else if( self->IsReadOnly() && self->pLoadBalancer )
2961 st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
2962 else
2963 st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
2964
2965 if( !st.IsOK() )
2966 {
2967 self->pFileState = Error;
2968 self->pStatus = st;
2969 self->FailQueuedMessages( st );
2970 }
2971
2972 return st;
2973 }
2974
2975 //----------------------------------------------------------------------------
2976 // Send a close and ignore the response
2977 //----------------------------------------------------------------------------
2978 XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
2979 uint16_t timeout )
2980 {
2981 Message *msg;
2982 ClientCloseRequest *req;
2983 MessageUtils::CreateRequest( msg, req );
2984
2985 req->requestid = kXR_close;
2986 memcpy( req->fhandle, self->pFileHandle, 4 );
2987
2989 msg->SetSessionId( self->pSessionId );
2991 [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
2992 MessageSendParams params;
2993 params.timeout = timeout;
2994 params.followRedirects = false;
2995 params.stateful = true;
2996
2998
2999 return self->IssueRequest( *self->pDataServer, msg, handler, params );
3000 }
3001
3002 //----------------------------------------------------------------------------
3003 // Re-open the current file at a given server
3004 //----------------------------------------------------------------------------
3005 XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3006 const URL &url,
3007 uint16_t timeout )
3008 {
3009 Log *log = DefaultEnv::GetLog();
3010 log->Dump( FileMsg, "[0x%x@%s] Sending a recovery open command to %s",
3011 self.get(), self->pFileUrl->GetURL().c_str(), url.GetURL().c_str() );
3012
3013 //--------------------------------------------------------------------------
3014 // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3015 // procedure to delete a file that has been partially updated or fail it
3016 // because a partially uploaded file already exists.
3017 //--------------------------------------------------------------------------
3018 if( self->pOpenFlags & kXR_delete)
3019 {
3020 self->pOpenFlags &= ~kXR_delete;
3021 self->pOpenFlags |= kXR_open_updt;
3022 }
3023
3024 self->pOpenFlags &= ~kXR_new;
3025
3026 Message *msg;
3027 ClientOpenRequest *req;
3028 URL u = url;
3029
3030 if( url.GetPath().empty() )
3031 u.SetPath( self->pFileUrl->GetPath() );
3032
3033 std::string path = u.GetPathWithFilteredParams();
3034 MessageUtils::CreateRequest( msg, req, path.length() );
3035
3036 req->requestid = kXR_open;
3037 req->mode = self->pOpenMode;
3038 req->options = self->pOpenFlags;
3039 req->dlen = path.length();
3040 msg->Append( path.c_str(), path.length(), 24 );
3041
3042 // create a new reopen handler
3043 // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3044 // until we know that 'SendMessage' was successful)
3045 OpenHandler *openHandler = new OpenHandler( self, 0 );
3046 MessageSendParams params; params.timeout = timeout;
3049
3050 //--------------------------------------------------------------------------
3051 // Issue the open request
3052 //--------------------------------------------------------------------------
3053 XRootDStatus st = self->IssueRequest( url, msg, openHandler, params );
3054
3055 // if there was a problem destroy the open handler
3056 if( !st.IsOK() )
3057 {
3058 delete openHandler;
3059 self->pStatus = st;
3060 self->pFileState = Closed;
3061 }
3062 return st;
3063 }
3064
3065 //------------------------------------------------------------------------
3066 // Fail a message
3067 //------------------------------------------------------------------------
3068 void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3069 {
3070 Log *log = DefaultEnv::GetLog();
3071 log->Dump( FileMsg, "[0x%x@%s] Failing message %s with %s",
3072 this, pFileUrl->GetURL().c_str(),
3073 rd.request->GetDescription().c_str(),
3074 status.ToStr().c_str() );
3075
3076 StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3077 if( !sh )
3078 {
3079 Log *log = DefaultEnv::GetLog();
3080 log->Error( FileMsg, "[0x%x@%s] Internal error while recovering %s",
3081 this, pFileUrl->GetURL().c_str(),
3082 rd.request->GetDescription().c_str() );
3083 return;
3084 }
3085
3087 ResponseHandler *userHandler = sh->GetUserHandler();
3088 jobMan->QueueJob( new ResponseJob(
3089 userHandler,
3090 new XRootDStatus( status ),
3091 0, rd.params.hostList ) );
3092
3093 delete sh;
3094 }
3095
3096 //----------------------------------------------------------------------------
3097 // Fail queued messages
3098 //----------------------------------------------------------------------------
3099 void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3100 {
3101 RequestList::iterator it;
3102 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3103 FailMessage( *it, status );
3104 pToBeRecovered.clear();
3105 }
3106
3107 //------------------------------------------------------------------------
3108 // Re-send queued messages
3109 //------------------------------------------------------------------------
3110 void FileStateHandler::ReSendQueuedMessages()
3111 {
3112 RequestList::iterator it;
3113 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3114 {
3115 it->request->SetSessionId( pSessionId );
3116 ReWriteFileHandle( it->request );
3117 XRootDStatus st = IssueRequest( *pDataServer, it->request,
3118 it->handler, it->params );
3119 if( !st.IsOK() )
3120 FailMessage( *it, st );
3121 }
3122 pToBeRecovered.clear();
3123 }
3124
3125 //------------------------------------------------------------------------
3126 // Re-write file handle
3127 //------------------------------------------------------------------------
3128 void FileStateHandler::ReWriteFileHandle( Message *msg )
3129 {
3131 switch( hdr->requestid )
3132 {
3133 case kXR_read:
3134 {
3136 memcpy( req->fhandle, pFileHandle, 4 );
3137 break;
3138 }
3139 case kXR_write:
3140 {
3142 memcpy( req->fhandle, pFileHandle, 4 );
3143 break;
3144 }
3145 case kXR_sync:
3146 {
3148 memcpy( req->fhandle, pFileHandle, 4 );
3149 break;
3150 }
3151 case kXR_truncate:
3152 {
3154 memcpy( req->fhandle, pFileHandle, 4 );
3155 break;
3156 }
3157 case kXR_readv:
3158 {
3160 readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3161 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3162 memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3163 break;
3164 }
3165 case kXR_writev:
3166 {
3167 ClientWriteVRequest *req =
3168 reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3169 XrdProto::write_list *wrtList =
3170 reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3171 size_t size = req->dlen / sizeof(XrdProto::write_list);
3172 for( size_t i = 0; i < size; ++i )
3173 memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3174 break;
3175 }
3176 case kXR_pgread:
3177 {
3179 memcpy( req->fhandle, pFileHandle, 4 );
3180 break;
3181 }
3182 case kXR_pgwrite:
3183 {
3185 memcpy( req->fhandle, pFileHandle, 4 );
3186 break;
3187 }
3188 }
3189
3190 Log *log = DefaultEnv::GetLog();
3191 log->Dump( FileMsg, "[0x%x@%s] Rewritten file handle for %s to 0x%x",
3192 this, pFileUrl->GetURL().c_str(), msg->GetDescription().c_str(),
3193 *((uint32_t*)pFileHandle) );
3195 }
3196
3197 //----------------------------------------------------------------------------
3198 // Dispatch monitoring information on close
3199 //----------------------------------------------------------------------------
3200 void FileStateHandler::MonitorClose( const XRootDStatus *status )
3201 {
3203 if( mon )
3204 {
3206 i.file = pFileUrl;
3207 i.oTOD = pOpenTime;
3208 gettimeofday( &i.cTOD, 0 );
3209 i.rBytes = pRBytes;
3210 i.vrBytes = pVRBytes;
3211 i.wBytes = pWBytes;
3212 i.vwBytes = pVWBytes;
3213 i.vSegs = pVSegs;
3214 i.rCount = pRCount;
3215 i.vCount = pVRCount;
3216 i.wCount = pWCount;
3217 i.status = status;
3218 mon->Event( Monitor::EvClose, &i );
3219 }
3220 }
3221
3222 XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3223 Message *msg,
3224 ResponseHandler *handler,
3225 MessageSendParams &sendParams )
3226 {
3227 // first handle Metalinks
3228 if( pUseVirtRedirector && url.IsMetalink() )
3229 return MessageUtils::RedirectMessage( url, msg, handler,
3230 sendParams, pLFileHandler );
3231
3232 // than local file access
3233 if( url.IsLocalFile() )
3234 return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3235
3236 // and finally ordinary XRootD requests
3237 return MessageUtils::SendMessage( url, msg, handler,
3238 sendParams, pLFileHandler );
3239 }
3240
3241 //------------------------------------------------------------------------
3242 // Send a write request with payload being stored in a kernel buffer
3243 //------------------------------------------------------------------------
3244 XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3245 uint64_t offset,
3246 uint32_t length,
3247 std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3248 ResponseHandler *handler,
3249 uint16_t timeout )
3250 {
3251 //--------------------------------------------------------------------------
3252 // Create the write request
3253 //--------------------------------------------------------------------------
3254 XrdSysMutexHelper scopedLock( self->pMutex );
3255
3256 if( self->pFileState != Opened && self->pFileState != Recovering )
3258
3259 Log *log = DefaultEnv::GetLog();
3260 log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
3261 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
3262 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3263
3264 Message *msg;
3265 ClientWriteRequest *req;
3266 MessageUtils::CreateRequest( msg, req );
3267
3268 req->requestid = kXR_write;
3269 req->offset = offset;
3270 req->dlen = length;
3271 memcpy( req->fhandle, self->pFileHandle, 4 );
3272
3273 MessageSendParams params;
3274 params.timeout = timeout;
3275 params.followRedirects = false;
3276 params.stateful = true;
3277 params.kbuff = kbuff.release();
3278 params.chunkList = new ChunkList();
3279
3281
3283 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3284
3285 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3286 }
3287}
kXR_unt16 requestid
Definition XProtocol.hh:479
kXR_unt16 requestid
Definition XProtocol.hh:630
kXR_unt16 requestid
Definition XProtocol.hh:804
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
#define kXR_suppgrw
kXR_char fhandle[4]
Definition XProtocol.hh:531
kXR_char fhandle[4]
Definition XProtocol.hh:780
struct ClientPgReadRequest pgread
Definition XProtocol.hh:859
kXR_char fhandle[4]
Definition XProtocol.hh:805
kXR_char fhandle[4]
Definition XProtocol.hh:769
kXR_unt16 requestid
Definition XProtocol.hh:644
@ kXR_virtReadv
Definition XProtocol.hh:150
kXR_unt16 options
Definition XProtocol.hh:481
static const int kXR_ckpXeq
Definition XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:860
kXR_unt16 requestid
Definition XProtocol.hh:228
@ kXR_async
Definition XProtocol.hh:458
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_open_read
Definition XProtocol.hh:456
@ kXR_open_updt
Definition XProtocol.hh:457
@ kXR_open_apnd
Definition XProtocol.hh:462
@ kXR_retstat
Definition XProtocol.hh:463
struct ClientRequestHdr header
Definition XProtocol.hh:844
kXR_char fhandle[4]
Definition XProtocol.hh:509
#define kXR_recoverWrts
kXR_char fhandle[4]
Definition XProtocol.hh:645
kXR_char fhandle[4]
Definition XProtocol.hh:229
kXR_unt16 requestid
Definition XProtocol.hh:157
kXR_char fhandle[4]
Definition XProtocol.hh:633
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
struct ClientReadRequest read
Definition XProtocol.hh:865
kXR_int32 rlen
Definition XProtocol.hh:660
kXR_unt16 requestid
Definition XProtocol.hh:766
kXR_unt16 requestid
Definition XProtocol.hh:779
kXR_int64 offset
Definition XProtocol.hh:661
#define kXR_PROTPGRWVERSION
Definition XProtocol.hh:73
struct ClientWriteRequest write
Definition XProtocol.hh:874
kXR_unt16 requestid
Definition XProtocol.hh:670
@ kXR_Qopaqug
Definition XProtocol.hh:625
@ kXR_Qvisa
Definition XProtocol.hh:622
unsigned char kXR_char
Definition XPtypes.hh:65
if(ec< 0) ec
static int mapError(int rc)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetSize() const
Get the size of the message.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
An interface for file plug-ins.
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, uint16_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, uint16_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Opened
Opening has succeeded.
@ Recovering
Recovering from an error.
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool IsOpen() const
Check if the file is open.
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, uint16_t timeout)
Try other data server.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
const std::string & GetDescription() const
Get the description of the message.
An abstract class to describe the client-side monitoring plugin interface.
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
PgRead operation (.
PgWrite operation (.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:212
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:94
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:451
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:388
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition XrdClURL.cc:317
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
void SetPath(const std::string &path)
Set the path.
Definition XrdClURL.hh:220
bool IsLocalFile() const
Definition XrdClURL.cc:460
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:239
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInProgress
const uint16_t errSocketTimeout
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const uint64_t FileMsg
const uint16_t suAlreadyDone
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
const uint16_t errSocketError
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
const uint16_t errRedirect
const uint16_t errSocketDisconnected
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition XProtocol.hh:298
kXR_char fhandle[4]
Definition XProtocol.hh:288
kXR_unt16 requestid
Definition XProtocol.hh:287
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Describe a file close event.
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted