XRootD
Loading...
Searching...
No Matches
XrdClMetalinkRedirector.cc
Go to the documentation of this file.
1/*
2 * XrdClMetalinkRedirector.cc
3 *
4 * Created on: May 2, 2016
5 * Author: simonm
6 */
7
9
12#include "XrdCl/XrdClLog.hh"
14#include "XrdCl/XrdClFile.hh"
16#include "XrdCl/XrdClURL.hh"
18#include "XrdCl/XrdClUtils.hh"
21
23
25
26#include <arpa/inet.h>
27#include <fcntl.h>
28
29namespace XrdCl
30{
31
32 void DeallocArgs( XRootDStatus *status, AnyObject *response,
33 HostList *hostList )
34 {
35 delete status;
36 delete response;
37 delete hostList;
38 }
39
40 //----------------------------------------------------------------------------
41 // Read metalink response handler.
42 //
43 // If the whole file has been read triggers parsing and
44 // initializes the Redirector, otherwise triggers another
45 // read.
46 //----------------------------------------------------------------------------
48 {
49 public:
50 //----------------------------------------------------------------------------
51 // Constructor
52 //----------------------------------------------------------------------------
54 const std::string &content = "" ) :
55 pRedirector( mr ), pUserHandler( userHandler ), pBuffer(
56 new char[DefaultCPChunkSize] ), pContent( content )
57 {
58 }
59
60 //----------------------------------------------------------------------------
61 // Destructor
62 //----------------------------------------------------------------------------
64 {
65 delete[] pBuffer;
66 }
67
68 //----------------------------------------------------------------------------
69 // Handle the response
70 //----------------------------------------------------------------------------
71 virtual void HandleResponse( XRootDStatus *status, AnyObject *response )
72 {
73 try
74 {
75 // check the status
76 if( !status->IsOK() )
77 throw status;
78 // we don't need it anymore
79 delete status;
80 // make sure we got a response
81 if( !response )
82 throw new XRootDStatus( stError, errInternal );
83 // make sure the response is not empty
84 ChunkInfo * info = 0;
85 response->Get( info );
86 if( !info )
87 throw new XRootDStatus( stError, errInternal );
88 uint32_t bytesRead = info->length;
89 uint64_t offset = info->offset + bytesRead;
90 pContent += std::string( pBuffer, bytesRead );
91 // are we done ?
92 if( bytesRead > 0 )
93 {
94 // lets try to read another chunk
95 MetalinkReadHandler * mrh = new MetalinkReadHandler( pRedirector,
96 pUserHandler, pContent );
97 XRootDStatus st = pRedirector->pFile->Read( offset,
98 DefaultCPChunkSize, mrh->GetBuffer(), mrh );
99 if( !st.IsOK() )
100 {
101 delete mrh;
102 throw new XRootDStatus( st );
103 }
104 }
105 else // we have the whole metalink file
106 {
107 // we don't need the File object anymore
108 delete pRedirector->pFile;
109 pRedirector->pFile = 0;
110 // now we can parse the metalink file
111 XRootDStatus st = pRedirector->Parse( pContent );
112 // now with the redirector fully initialized we can handle pending requests
113 pRedirector->FinalizeInitialization();
114 // we are done, pass the status to the user (whatever it is)
115 if( pUserHandler )
116 pUserHandler->HandleResponse( new XRootDStatus( st ), 0 );
117 }
118 // clean up
119 delete response;
120 }
121 catch( XRootDStatus *status )
122 {
123 pRedirector->FinalizeInitialization( *status );
124 // if we were not able to read from the metalink,
125 // propagate the error to the user handler
126 if( pUserHandler )
127 pUserHandler->HandleResponse( status, 0 );
128 else
129 DeallocArgs( status, response, 0 );
130 }
131
132 delete this;
133 }
134
135 //----------------------------------------------------------------------------
136 // Get the receive-buffer
137 //----------------------------------------------------------------------------
138 char * GetBuffer()
139 {
140 return pBuffer;
141 }
142
143 private:
144
145 MetalinkRedirector *pRedirector;
146 ResponseHandler *pUserHandler;
147 char *pBuffer;
148 std::string pContent;
149 };
150
151 //----------------------------------------------------------------------------
152 // Open metalink response handler.
153 //
154 // After successful open trrigers a read.
155 //----------------------------------------------------------------------------
157 {
158 public:
159 //----------------------------------------------------------------------------
160 // Constructor
161 //----------------------------------------------------------------------------
163 ResponseHandler *userHandler ) :
164 pRedirector( mr ), pUserHandler( userHandler )
165 {
166 }
167
168 //----------------------------------------------------------------------------
169 // Handle the response
170 //----------------------------------------------------------------------------
172 AnyObject *response, HostList *hostList )
173 {
174 try
175 {
176 if( status->IsOK() )
177 {
178 delete status;
179 // download the content
180 MetalinkReadHandler *mrh = new MetalinkReadHandler( pRedirector,
181 pUserHandler );
182 XRootDStatus st = pRedirector->pFile->Read( 0, DefaultCPChunkSize,
183 mrh->GetBuffer(), mrh );
184 if( !st.IsOK() )
185 {
186 delete mrh;
187 throw new XRootDStatus( stError, errInternal );
188 } else
189 {
190 delete response;
191 delete hostList;
192 }
193 } else
194 throw status;
195 } catch( XRootDStatus *status )
196 {
197 pRedirector->FinalizeInitialization( *status );
198 // if we were not able to schedule a read
199 // pass an error to the user handler
200 if( pUserHandler )
201 pUserHandler->HandleResponseWithHosts( status, response, hostList );
202 else
203 DeallocArgs( status, response, hostList );
204 }
205
206 delete this;
207 }
208
209 private:
210
211 MetalinkRedirector *pRedirector;
212 ResponseHandler *pUserHandler;
213 };
214
215 //----------------------------------------------------------------------------
216 // Constructor
217 //----------------------------------------------------------------------------
218 MetalinkRedirector::MetalinkRedirector( const std::string & url ) :
219 pUrl( url ), pFile( new File( File::DisableVirtRedirect ) ), pReady(
220 false ), pFileSize( -1 )
221 {
222 }
223
224 //----------------------------------------------------------------------------
225 // Destructor
226 //----------------------------------------------------------------------------
228 {
229 delete pFile;
230 }
231
232 //----------------------------------------------------------------------------
233 // Initializes the object with the content of the metalink file
234 //----------------------------------------------------------------------------
236 {
237 MetalinkOpenHandler *handler = new MetalinkOpenHandler( this, userHandler );
238 XRootDStatus st = pFile->Open( pUrl, OpenFlags::Read, Access::None, handler,
239 0 );
240 if( !st.IsOK() )
241 delete handler;
242
243 return st;
244 }
245
246 //----------------------------------------------------------------------------
247 // Parses the metalink file
248 //----------------------------------------------------------------------------
249 XRootDStatus MetalinkRedirector::Parse( const std::string &metalink )
250 {
251 Log *log = DefaultEnv::GetLog();
252 Env *env = DefaultEnv::GetEnv();
253 std::string glfnRedirector;
254 env->GetString( "GlfnRedirector", glfnRedirector );
255 // parse the metalink
256 XrdXmlMetaLink parser( "root:xroot:roots:xroots:file:", "xroot:",
257 glfnRedirector.empty() ? 0 : glfnRedirector.c_str() );
258 int size = 0;
259 XrdOucFileInfo **fileInfos = parser.ConvertAll( metalink.c_str(), size,
260 metalink.size() );
261 if( !fileInfos )
262 {
263 int ecode;
264 const char * etxt = parser.GetStatus( ecode );
265 log->Error( UtilityMsg,
266 "Failed to parse the metalink file: %s (error code: %d)", etxt,
267 ecode );
269 "Malformed or corrupted metalink file." );
270 }
271 // we are expecting just one file per metalink (as agreed with RUCIO)
272 if( size != 1 )
273 {
274 log->Error( UtilityMsg, "Expected only one file per metalink." );
275 return XRootDStatus( stError, errDataError );
276 }
277
278 InitCksum( fileInfos );
279 InitReplicas( fileInfos );
280 pTarget = fileInfos[0]->GetTargetName();
281 pFileSize = fileInfos[0]->GetSize();
282
283 XrdXmlMetaLink::DeleteAll( fileInfos, size );
284
285 return XRootDStatus();
286 }
287
288 //----------------------------------------------------------------------------
289 // Finalize the initialization process:
290 // - mark as ready
291 // - setup the status
292 // - and handle pending redirects
293 //----------------------------------------------------------------------------
294 void MetalinkRedirector::FinalizeInitialization( const XRootDStatus &status )
295 {
296 XrdSysMutexHelper scopedLock( pMutex );
297 pReady = true;
298 pStatus = status;
299 // Handle pending redirects (those that were
300 // submitted before the metalink has been loaded)
301 while( !pPendingRedirects.empty() )
302 {
303 const Message *msg = pPendingRedirects.front().first;
304 MsgHandler *handler = pPendingRedirects.front().second;
305 pPendingRedirects.pop_front();
306 if( !handler || !msg )
307 continue;
308 HandleRequestImpl( msg, handler );
309 }
310 }
311
312 //----------------------------------------------------------------------------
313 // Generates redirect response for the given request
314 //----------------------------------------------------------------------------
315 std::shared_ptr<Message> MetalinkRedirector::GetResponse( const Message *msg ) const
316 {
317 if( !pStatus.IsOK() )
318 return GetErrorMsg( msg, "Could not load the Metalink file.",
319 static_cast<XErrorCode>( XProtocol::mapError( pStatus.errNo ) ) );
320 const ClientRequestHdr *req =
321 reinterpret_cast<const ClientRequestHdr*>( msg->GetBuffer() );
322 // get the redirect location
323 std::string replica;
324 if( !GetReplica( *msg, replica ).IsOK() )
325 return GetErrorMsg( msg, "Metalink: no more replicas to try.", kXR_noReplicas );
326 auto resp = std::make_shared<Message>( sizeof(ServerResponse) );
327 ServerResponse* response =
328 reinterpret_cast<ServerResponse*>( resp->GetBuffer() );
329 response->hdr.status = kXR_redirect;
330 response->hdr.streamid[0] = req->streamid[0];
331 response->hdr.streamid[1] = req->streamid[1];
332 response->hdr.dlen = 4 + replica.size(); // 4 bytes are reserved for port number
333 response->body.redirect.port = -1; // this indicates that the full URL will be given in the 'host' field
334 memcpy( response->body.redirect.host, replica.c_str(), replica.size() );
335 return resp;
336 }
337
338 //----------------------------------------------------------------------------
339 // Generates error response for the given request
340 //----------------------------------------------------------------------------
341 std::shared_ptr<Message> MetalinkRedirector::GetErrorMsg( const Message *msg,
342 const std::string &errMsg, XErrorCode code ) const
343 {
344 const ClientRequestHdr *req =
345 reinterpret_cast<const ClientRequestHdr*>( msg->GetBuffer() );
346
347 auto resp = std::make_shared<Message>( sizeof(ServerResponse) );
348 ServerResponse* response =
349 reinterpret_cast<ServerResponse*>( resp->GetBuffer() );
350
351 response->hdr.status = kXR_error;
352 response->hdr.streamid[0] = req->streamid[0];
353 response->hdr.streamid[1] = req->streamid[1];
354 response->hdr.dlen = 4 + errMsg.size();
355 response->body.error.errnum = htonl( code );
356 memcpy( response->body.error.errmsg, errMsg.c_str(), errMsg.size() );
357
358 return resp;
359 }
360
361 //----------------------------------------------------------------------------
362 // Creates an instant redirect response for the given message
363 // or an error response if there are no more replicas to try.
364 // The virtual response is being handled by the given handler.
365 //----------------------------------------------------------------------------
366 XRootDStatus MetalinkRedirector::HandleRequestImpl( const Message *msg,
367 MsgHandler *handler )
368 {
369 auto resp = GetResponse( msg );
370 JobManager *jobMan = DefaultEnv::GetPostMaster()->GetJobManager();
371 RedirectJob *job = new RedirectJob( handler, std::move( resp ) );
372 jobMan->QueueJob( job );
373 return XRootDStatus();
374 }
375
376 //----------------------------------------------------------------------------
377 // If the MetalinkRedirector is initialized creates an instant
378 // redirect response, otherwise queues the request until initialization
379 // is done.
380 //----------------------------------------------------------------------------
382 MsgHandler *handler )
383 {
384 XrdSysMutexHelper scopedLck( pMutex );
385 // if the metalink data haven't been loaded yet, make it pending
386 if( !pReady )
387 {
388 pPendingRedirects.push_back(
389 std::make_pair( msg, handler ) );;
390 return XRootDStatus();
391 }
392 // otherwise generate a virtual response
393 return HandleRequestImpl( msg, handler );
394 }
395
396 //----------------------------------------------------------------------------
398 //----------------------------------------------------------------------------
400 {
401 ReplicaList::const_iterator itr = GetReplica( req );
402 return pReplicas.end() - itr;
403 }
404
405 //----------------------------------------------------------------------------
406 // Gets the file checksum if specified in the metalink
407 //----------------------------------------------------------------------------
408 void MetalinkRedirector::InitCksum( XrdOucFileInfo **fileInfos )
409 {
410 const char *chvalue = 0, *chtype = 0;
411 while( ( chtype = fileInfos[0]->GetDigest( chvalue ) ) )
412 {
413 pChecksums[chtype] = chvalue;
414 }
415 }
416
417 //----------------------------------------------------------------------------
418 // Initializes replica list
419 //----------------------------------------------------------------------------
420 void MetalinkRedirector::InitReplicas( XrdOucFileInfo **fileInfos )
421 {
422 URL replica;
423 const char *url = 0;
424 while( ( url = fileInfos[0]->GetUrl() ) )
425 {
426 replica = URL( url );
427 if( !replica.IsValid() || replica.GetURL().size() > 4096 )
428 continue; // this is the internal limit (defined in the protocol)
429 pReplicas.push_back( replica.GetURL() );
430 }
431 }
432
433 //----------------------------------------------------------------------------
435 //----------------------------------------------------------------------------
436 XRootDStatus MetalinkRedirector::GetReplica( const Message &msg,
437 std::string &replica ) const
438 {
439 ReplicaList::const_iterator itr = GetReplica( msg );
440 if( itr == pReplicas.end() )
441 return XRootDStatus( stError, errNotFound );
442
444 int tlsmtl = DefaultTlsMetalink;
445 env->GetInt( "TlsMetalink", tlsmtl );
446 URL url( *itr );
447 if( tlsmtl && ( url.GetProtocol() == "root" || url.GetProtocol() == "xroot" ) )
448 url.SetProtocol( "roots" );
449 replica = url.GetURL();
450
451 return XRootDStatus();
452 }
453
454 //----------------------------------------------------------------------------
456 //----------------------------------------------------------------------------
457 MetalinkRedirector::ReplicaList::const_iterator
458 MetalinkRedirector::GetReplica( const Message &msg ) const
459 {
460 if( pReplicas.empty() )
461 return pReplicas.cend();
462
463 std::string tried;
464 if( !GetCgiInfo( msg, "tried", tried ).IsOK() )
465 return pReplicas.cbegin();
466
467 ReplicaList triedList;
468 Utils::splitString( triedList, tried, "," );
469 std::set<std::string> triedSet( triedList.begin(), triedList.end() );
470
471 ReplicaList::const_iterator itr = pReplicas.begin();
472 for( ; itr != pReplicas.end(); ++itr )
473 {
474 URL url( *itr );
475 if( !triedSet.count( url.GetHostName() ) ) break;
476 }
477
478 return itr;
479 }
480
481 //----------------------------------------------------------------------------
483 //----------------------------------------------------------------------------
484 XRootDStatus MetalinkRedirector::GetCgiInfo( const Message &msg,
485 const std::string &key, std::string &value ) const
486 {
487 const ClientRequestHdr *req =
488 reinterpret_cast<const ClientRequestHdr*>( msg.GetBuffer() );
489 kXR_int32 dlen = msg.IsMarshalled() ? ntohl( req->dlen ) : req->dlen;
490 std::string url( msg.GetBuffer( 24 ), dlen );
491 size_t pos = url.find( '?' );
492 if( pos == std::string::npos )
493 return XRootDStatus( stError );
494 size_t start = url.find( key, pos );
495 if( start == std::string::npos )
496 return XRootDStatus( stError );
497 start += key.size() + 1; // the +1 stands for the '=' sign that is not part of the key
498 size_t end = url.find( '&', start );
499 if( end == std::string::npos )
500 end = url.size();
501 value = url.substr( start, end - start );
502 return XRootDStatus();
503 }
504
505} /* namespace XrdCl */
XErrorCode
Definition XProtocol.hh:987
@ kXR_noReplicas
union ServerResponse::@28 body
kXR_char streamid[2]
Definition XProtocol.hh:156
kXR_char streamid[2]
Definition XProtocol.hh:912
@ kXR_redirect
Definition XProtocol.hh:902
@ kXR_error
Definition XProtocol.hh:901
ServerResponseHeader hdr
int kXR_int32
Definition XPtypes.hh:89
static int mapError(int rc)
void Get(Type &object)
Retrieve the object being held.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A file.
Definition XrdClFile.hh:46
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:206
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:99
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
The message representation used throughout the system.
MetalinkOpenHandler(MetalinkRedirector *mr, ResponseHandler *userHandler)
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
MetalinkReadHandler(MetalinkRedirector *mr, ResponseHandler *userHandler, const std::string &content="")
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
XRootDStatus Load(ResponseHandler *userHandler)
virtual ~MetalinkRedirector()
Destructor.
XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)
MetalinkRedirector(const std::string &url)
virtual int Count(Message &req) const
Count how many replicas do we have left to try for given request.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
long long GetSize()
const char * GetTargetName()
const int DefaultCPChunkSize
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errNotFound
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const int DefaultTlsMetalink
const uint64_t UtilityMsg
void DeallocArgs(XRootDStatus *status, AnyObject *response, HostList *hostList)
Describe a data chunk for vector read.
uint32_t length
offset in the file
@ Read
Open only for reading.
bool IsOK() const
We're fine.
uint32_t errNo
Errno, if any.