XRootD
Loading...
Searching...
No Matches
XrdCmsCluster.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d C m s C l u s t e r . c c */
4/* */
5/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cerrno>
32#include <fcntl.h>
33#include <cstdio>
34#include <cstdlib>
35#include <random>
36#include <unistd.h>
37#include <netinet/in.h>
38#include <sys/types.h>
39
41
42#include "Xrd/XrdJob.hh"
43#include "Xrd/XrdLink.hh"
44#include "Xrd/XrdScheduler.hh"
45
48#include "XrdCms/XrdCmsCache.hh"
52#include "XrdCms/XrdCmsNode.hh"
53#include "XrdCms/XrdCmsRole.hh"
54#include "XrdCms/XrdCmsRRQ.hh"
55#include "XrdCms/XrdCmsState.hh"
57#include "XrdCms/XrdCmsTrace.hh"
58#include "XrdCms/XrdCmsTypes.hh"
59
60#include "XrdOuc/XrdOucPup.hh"
61
64#include "XrdSys/XrdSysTimer.hh"
65
66using namespace XrdCms;
67
68/******************************************************************************/
69/* G l o b a l O b j e c t s */
70/******************************************************************************/
71
73
74/******************************************************************************/
75/* L o c a l S t r u c t u r e s */
76/******************************************************************************/
77
79{
80public:
81
82 void DoIt() {if (nodeP)
83 {nodeP->Delete(Cluster.STMutex);
84 delete this;
85 } else {
86 if (!Cluster.Drop(nodeEnt, nodeInst, this)) delete this;
87 }
88 }
89
90 XrdCmsDrop(XrdCmsNode *nP) : XrdJob("delete node"), nodeP(nP),
91 nodeEnt(0), nodeInst(0)
92 {Sched->Schedule((XrdJob *)this);}
93
94 XrdCmsDrop(int nid, int inst) : XrdJob("drop node"), nodeP(0),
95 nodeEnt(nid), nodeInst(inst)
96 {Sched->Schedule((XrdJob *)this, time(0)+Config.DRPDelay);}
97
99
103};
104
105/******************************************************************************/
106/* C o n s t r u c t o r */
107/******************************************************************************/
108
110{
111 memset((void *)NodeTab, 0, sizeof(NodeTab));
112 memset((void *)AltMans, (int)' ', sizeof(AltMans));
113 AltMend = AltMans;
114 AltMent = -1;
115 NodeCnt = 0;
116 STHi = -1;
117 SelWtot = 0;
118 SelRtot = 0;
119 SelTcnt = 0;
120 peerHost = 0;
121 peerMask = ~peerHost;
122}
123
124/******************************************************************************/
125/* A d d */
126/******************************************************************************/
127
128XrdCmsNode *XrdCmsCluster::Add(XrdLink *lp, int port, int Status, int sport,
129 const char *theNID, const char *theIF)
130
131{
132 EPNAME("Add")
133 const char *act = "";
134 XrdCmsNode *nP = 0;
135 XrdCmsClustID *cidP = 0;
136 XrdSysRWLockHelper STMHelper(STMutex, false); // Need write lock!
137 int tmp, Slot, Free = -1, Bump1 = -1, Bump2 = -1, Bump3 = -1, aSet = 0;
138 bool Special = (Status & (CMS_isMan|CMS_isPeer));
139 bool SpecAlt = (Special && !(Status & CMS_isSuper));
140 bool Hidden = false;
141
142// Find available slot for this node. Here are the priorities:
143// Slot = Reconnecting node
144// Free = Available slot ( 1st in table)
145// Bump1 = Disconnected server (last in table)
146// Bump2 = Connected server (last in table) if new one is managr/peer
147// Bump3 = Disconnected managr/peer ( 1st in table) if new one is managr/peer
148//
149 for (Slot = 0; Slot < STMax; Slot++)
150 if (NodeTab[Slot])
151 {if (NodeTab[Slot]->isNode(lp, theNID, port)) break;
152/*Conn*/ if (NodeTab[Slot]->isConn)
153 {if (!NodeTab[Slot]->isPerm && Special)
154 Bump2 = Slot; // Last conn Server
155/*Disc*/ } else {
156 if ( NodeTab[Slot]->isPerm)
157 {if (Bump3 < 0 && Special) Bump3 = Slot;}// 1st disc Man/Pr
158 else Bump1 = Slot; // Last disc Server
159 }
160 } else if (Free < 0) Free = Slot; // 1st free slot
161
162// Check if node is already logged in or is a relogin
163//
164 if (Slot < STMax)
165 {if (NodeTab[Slot] && NodeTab[Slot]->isBound)
166 {Say.Emsg("Cluster", lp->ID, "already logged in.");
167 return 0;
168 } else { // Rehook node to previous unconnected entry
169 nP = NodeTab[Slot];
170 nP->Link = lp;
171 nP->isOffline = 0;
172 nP->isBad &= ~XrdCmsNode::isSuspend;
173 nP->isConn = 1;
174 nP->Instance++;
175 nP->setName(lp, theIF, port); // Just in case it changed
176 act = "Reconnect ";
177 }
178 }
179
180// First see if this node may be an alternate
181//
182 if (!nP && SpecAlt)
183 {if ((cidP = XrdCmsClustID::Find(theNID)) && !(cidP->IsEmpty()))
184 {if (!(nP = AddAlt(cidP, lp, port, Status, sport, theNID, theIF)))
185 return 0;
186 aSet = 1; Slot = nP->NodeID;
187 if (nP != NodeTab[Slot]) {Hidden = true; act = "Alternate ";}
188 }
189 }
190
191// Reuse an old ID if we must or redirect the incoming node
192//
193 if (!nP)
194 {if (Free >= 0) Slot = Free;
195 else {if (Bump1 >= 0) Slot = Bump1;
196 else Slot = (Bump2 >= 0 ? Bump2 : Bump3);
197 if (Slot < 0)
198 {if (Status & CMS_isPeer) Say.Emsg("Cluster", "Add peer", lp->ID,
199 "failed; too many subscribers.");
200 else {sendAList(lp);
201 DEBUG(lp->ID <<" redirected; too many subscribers.");
202 }
203 return 0;
204 }
205
206 if (Status & CMS_isMan) {setAltMan(Slot, lp, sport); aSet=1;}
207 if (NodeTab[Slot] && !(Status & CMS_isPeer))
208 sendAList(NodeTab[Slot]->Link);
209
210 DEBUG(lp->ID << " bumps " << NodeTab[Slot]->Ident <<" #" <<Slot);
211 NodeTab[Slot]->Lock();
212 Remove("redirected", NodeTab[Slot], -1);
213 act = "Shoved ";
214 }
215 NodeTab[Slot] = nP = new XrdCmsNode(lp, theIF, theNID, port, 0, Slot);
216 if (!cidP) cidP = XrdCmsClustID::AddID(theNID);
217 if ((cidP->AddNode(nP, SpecAlt))) nP->cidP = cidP;
218 else {delete nP; NodeTab[Slot] = 0; return 0;} // OK to do delete!
219 }
220
221// Indicate whether this snode can be redirected
222//
223 nP->isPerm = (Status & (CMS_isMan | CMS_isPeer)) ? 1 : 0;
224
225// Assign new server
226//
227 if (!aSet && (Status & CMS_isSuper)) setAltMan(Slot, lp, sport);
228 if (Slot > STHi) STHi = Slot;
229 nP->isBound = 1;
230 nP->isConn = 1;
231 nP->isNoStage = 0 != (Status & CMS_noStage);
232 nP->isBad |= (Status & CMS_Suspend ? XrdCmsNode::isSuspend : 0);
233 nP->isMan = 0 != (Status & CMS_isMan);
234 nP->isPeer = 0 != (Status & CMS_isPeer);
236 nP->subsPort = sport;
237
238// If this is an actual non-hidden node, count it
239//
240 if (!Hidden)
241 {NodeCnt++;
242 if (Config.SUPLevel
243 && (tmp = NodeCnt*Config.SUPLevel/100) > Config.SUPCount)
244 {Config.SUPCount=tmp; CmsState.Set(tmp);}
245 } else nP->isMan |= 0x02;
246
247// Compute new peer mask, as needed
248//
249 if (nP->isPeer) peerHost |= nP->NodeMask;
250 else peerHost &= ~nP->NodeMask;
251 peerMask = ~peerHost;
252
253// Document login
254//
255 if (QTRACE(Debug))
256 {DEBUG(act <<nP->Ident <<" to cluster " <<nP->myNID <<" slot "
257 <<Slot <<'.' <<nP->Instance <<" (nodecnt=" <<NodeCnt
258 <<" supn=" <<Config.SUPCount <<")");
259 }
260
261// Compute new state of all nodes if we are a reporting manager.
262//
263 if (Config.asManager() && !Hidden)
265 nP->isBad & XrdCmsNode::isSuspend ? 0 : 1,
266 nP->isNoStage ? 0 : 1);
267
268// All done. Return the node locked.
269//
270 nP->Lock();
271 return nP;
272}
273
274/******************************************************************************/
275/* Private: A d d A l t */
276/******************************************************************************/
277
278// Warning STMutex must be held in write mode by the caller!
279
280XrdCmsNode *XrdCmsCluster::AddAlt(XrdCmsClustID *cidP, XrdLink *lp,
281 int port, int Status, int sport,
282 const char *theNID, const char *theIF)
283
284{
285 EPNAME("AddAlt")
286 XrdCmsNode *pP, *nP = 0;
287 int slot = cidP->Slot();
288
289// Check if this node is already in the alternate table
290//
291 if (cidP->Exists(lp, theNID, port))
292 {Say.Emsg(epname, lp->ID, "already logged in.");
293 return 0;
294 }
295
296// Add this node if there is room
297//
298 if (cidP->Avail())
299 {nP = new XrdCmsNode(lp, theIF, theNID, port, 0, slot);
300 if (!(cidP->AddNode(nP, true))) {delete nP; nP = 0;} // OK to do delete!
301 }
302
303// Check if we were actually able to add this node
304//
305 if (!nP)
306 {Say.Emsg(epname, "Add alternate manager", lp->ID,
307 "failed; too many subscribers.");
308 return 0;
309 }
310
311// Check if the existing lead dead and we can substiture this one
312//
313 if ((pP = NodeTab[slot]) && !(pP->isBound))
314 {setAltMan(nP->NodeID, nP->Link, sport);
315 Say.Emsg("AddAlt", nP->Ident, "replacing dropped", pP->Ident);
316 NodeTab[slot] = nP;
317 pP->DropJob = new XrdCmsDrop(pP); // Schedule deletion
318 }
319
320// Hook the node to the cluster table and return
321//
322 nP->cidP = cidP;
323 return nP;
324}
325
326/******************************************************************************/
327/* B l a c k L i s t */
328/******************************************************************************/
329
331{
332 static CmsDiscRequest discRequest = {{0, kYR_disc, 0, 0}};
333 XrdCmsNode *nP;
334 const char *etxt = "blacklisted.";
335 int i, blRD = 0;
336 bool inBL;
337
338// Obtain a lock on the table. We need this in write mode!
339//
340 STMutex.WriteLock();
341
342// Run through the table looking to put or out of the blacklist
343//
344 for (i = 0; i <= STHi; i++)
345 {if ((nP = NodeTab[i]))
346 {inBL = (blP && (blRD = XrdCmsBlackList::Present(nP->Name(), blP)));
347 if ((!inBL && !(nP->isBad & XrdCmsNode::isBlisted))
348 || ( inBL && (nP->isBad & XrdCmsNode::isBlisted))) continue;
349 nP->g2nLock(STMutex); // Downgrade to only node lock
350 if (inBL)
352 if (blRD < -1)
353 {if (kYR_Version > nP->myVersion)
354 etxt = "blacklisted; redirect unsupported.";
355 else etxt = "blacklisted with redirect.";
357 nP->Send((char *)&discRequest, sizeof(discRequest));
358 }
359 Say.Emsg("Manager", nP->Name(), etxt);
360 } else {
362 Say.Emsg("Manager", nP->Name(), "removed from blacklist.");
363 }
364 nP->n2gLock(STMutex);
365 }
366 }
367 STMutex.UnLock();
368}
369
370/******************************************************************************/
371/* B r o a d c a s t */
372/******************************************************************************/
373
374SMask_t XrdCmsCluster::Broadcast(SMask_t smask, const struct iovec *iod,
375 int iovcnt, int iotot)
376{
377 EPNAME("Broadcast")
378 int i;
379 XrdCmsNode *nP;
380 SMask_t bmask, unQueried(0);
381
382// Obtain a lock on the table and screen out peer nodes
383//
384 STMutex.ReadLock(); // Sufficient to prevent modifications
385 bmask = smask & peerMask;
386
387// Run through the table looking for nodes to send messages to. We don't need
388// the node lock for this but we do need to up the reference count to keep the
389// node pointer valid for the duration of the send() (may or may not block).
390//
391 for (i = 0; i <= STHi; i++)
392 {if ((nP = NodeTab[i]) && nP->isNode(bmask))
393 {if (nP->isOffline) unQueried |= nP->Mask();
394 else {nP->Ref();
395 STMutex.UnLock();
396 if (nP->Send(iod, iovcnt, iotot) < 0)
397 {unQueried |= nP->Mask();
398 DEBUG(nP->Ident <<" is unreachable");
399 }
400 nP->unRef();
401 STMutex.ReadLock();
402 }
403 }
404 }
405 STMutex.UnLock();
406 return unQueried;
407}
408
409/******************************************************************************/
410
412 char *Data, int Dlen)
413{
414 struct iovec ioV[3], *iovP = &ioV[1];
415 unsigned short Temp;
416 int Blen;
417
418// Construct packed data for the character argument. If data is a string then
419// Dlen must include the null byte if it is specified at all.
420//
421 Blen = XrdOucPup::Pack(&iovP, Data, Temp, (Dlen ? strlen(Data)+1 : Dlen));
422 Hdr.datalen = htons(static_cast<unsigned short>(Blen));
423
424// Complete the iovec and send off the data
425//
426 ioV[0].iov_base = (char *)&Hdr; ioV[0].iov_len = sizeof(Hdr);
427 return Broadcast(smask, ioV, 3, Blen+sizeof(Hdr));
428}
429
430/******************************************************************************/
431
433 void *Data, int Dlen)
434{
435 struct iovec ioV[2] = {{(char *)&Hdr, sizeof(Hdr)},
436 {(char *)Data, (size_t)Dlen}};
437
438// Send of the data as eveything was constructed properly
439//
440 Hdr.datalen = htons(static_cast<unsigned short>(Dlen));
441 return Broadcast(smask, ioV, 2, Dlen+sizeof(Hdr));
442}
443
444/******************************************************************************/
445/* B r o a d s e n d */
446/******************************************************************************/
447
448// Send message to first eligible node!
449
451 void *Data, int Dlen)
452{
453 EPNAME("Broadsend");
454 static int Start = 0;
455 XrdCmsNode *nP;
456 struct iovec ioV[2] = {{(char *)&Hdr, sizeof(Hdr)},
457 {(char *)Data, (size_t)Dlen}};
458 int i, Beg, Fin, ioTot = Dlen+sizeof(Hdr);
459
460// Send of the data as eveything was constructed properly
461//
462 Hdr.datalen = htons(static_cast<unsigned short>(Dlen));
463
464// Obtain a lock on the table and get the starting and ending position. Note
465// that the mechnism we use will necessarily skip newly added nodes.
466//
467 STMutex.ReadLock(); // Sufficient to prevent modifications
468 Beg = Start = (Start <= STHi ? Start+1 : 0);
469 Fin = STHi;
470
471// Run through the table looking for a node to send a message to. We don't need
472// the node lock for this but we do need to up the reference count to keep the
473// node pointer valid for the duration of the send() (may or may not block).
474//
475do{for (i = Beg; i <= Fin; i++)
476 {if ((nP = NodeTab[i]) && nP->isNode(Who))
477 {if (nP->isOffline) continue;
478 nP->Ref();
479 STMutex.UnLock();
480 if (nP->Send(ioV, 2, ioTot) >= 0) {nP->unRef(); return 1;}
481 DEBUG(nP->Ident <<" is unreachable");
482 nP->unRef();
483 STMutex.ReadLock();
484 }
485 }
486 if (!Beg) break;
487 Fin = Beg-1; Beg = 0;
488 } while(1);
489
490// Did not send to anyone
491//
492 STMutex.UnLock();
493 return 0;
494}
495
496/******************************************************************************/
497/* g e t M a s k */
498/******************************************************************************/
499
501{
502 int i;
503 XrdCmsNode *nP;
504 SMask_t smask(0);
505
506// Obtain a lock on the table
507//
508 STMutex.ReadLock();
509
510// Run through the table looking for a node with matching IP address
511//
512 for (i = 0; i <= STHi; i++)
513 if ((nP = NodeTab[i]) && nP->isNode(addr))
514 {smask = nP->NodeMask; break;}
515
516// All done
517//
518 STMutex.UnLock();
519 return smask;
520}
521
522/******************************************************************************/
523
525{
526 return XrdCmsClustID::Mask(Cid);
527}
528
529/******************************************************************************/
530/* L i s t */
531/******************************************************************************/
532
534{
535 static const int iSize = XrdCmsSelected::IdentSize;
536 XrdCmsNode *nP;
537 XrdCmsSelected *sipp = 0, *sip;
539 XrdNetIF::ifType ifGet = ifType;
540 int i, destLen;
541 bool retName = (opts & LS_IDNT) != 0;
542 bool retAny = (opts & LS_ANY ) != 0;
543 bool retDest = retName || (opts & LS_IPO);
544
545// If only one wanted, the select appropriately
546//
547 oksel = false;
548 STMutex.ReadLock();
549 for (i = 0; i <= STHi; i++)
550 if ((nP=NodeTab[i]) && (nP->NodeMask & mask))
551 {oksel = true;
552 if (retDest)
553 { if (nP->netIF.HasDest(ifType)) ifGet = ifType;
554 else if (!retAny) continue;
555 else {ifGet = (XrdNetIF::ifType)(ifType ^ XrdNetIF::PrivateIF);
556 if (!nP->netIF.HasDest(ifGet)) continue;
557 }
558 }
559 sip = new XrdCmsSelected(sipp);
560 if (retDest) destLen = nP->netIF.GetDest(sip->Ident, iSize,
561 ifGet, retName);
562 else if (nP->myNlen >= XrdCmsSelected::IdentSize) destLen = 0;
563 else {strcpy(sip->Ident, nP->myName); destLen = nP->myNlen;}
564 if (!destLen) {delete sip; continue;}
565
566 sip->IdentLen = destLen;
567 sip->Mask = nP->NodeMask;
568 sip->Id = nP->NodeID;
569 sip->Port = nP->netIF.Port();
570 sip->RefTotW = nP->RefTotW;
571 sip->RefTotR = nP->RefTotR;
572 sip->Shrin = nP->Shrin;
573 sip->Share = nP->Share;
574 sip->RoleID = nP->RoleID;
575 sip->Status = (nP->isOffline ? XrdCmsSelected::Offline : 0);
577 sip->Status |= XrdCmsSelected::Disable;
578 if (nP->isNoStage) sip->Status |= XrdCmsSelected::NoStage;
580 sip->Status |= XrdCmsSelected::Suspend;
581 if (nP->isRW ) sip->Status |= XrdCmsSelected::isRW;
582 if (nP->isMan ) sip->Status |= XrdCmsSelected::isMangr;
583 sipp = sip;
584 }
585 STMutex.UnLock();
586
587// Return result
588//
589 return sipp;
590}
591
592/******************************************************************************/
593/* L o c a t e */
594/******************************************************************************/
595
597{
598 EPNAME("Locate");
599 XrdCmsPInfo pinfo;
600 SMask_t qfVec(0);
601 char *Path;
602 int retc = 0;
603
604// Check if this is a locate for all current servers
605//
606 if (*Sel.Path.Val != '*') Path = Sel.Path.Val;
607 else {if (*(Sel.Path.Val+1) == '\0')
608 {Sel.Vec.hf = ~0LL; Sel.Vec.pf = Sel.Vec.wf = 0;
609 return 0;
610 }
611 Path = Sel.Path.Val+1;
612 }
613
614// Find out who serves this path
615//
616 if (!Cache.Paths.Find(Path, pinfo) || !pinfo.rovec)
617 {Sel.Vec.hf = Sel.Vec.pf = Sel.Vec.wf = 0;
618 return NotFound;
619 } else Sel.Vec.wf = pinfo.rwvec;
620
621// Check if this was a non-lookup request
622//
623 if (*Sel.Path.Val == '*')
624 {Sel.Vec.hf = pinfo.rovec; Sel.Vec.pf = 0;
625 Sel.Vec.wf = pinfo.rwvec;
626 return 0;
627 }
628
629// Complete the request info object if we have one
630//
631 if (Sel.InfoP)
632 {Sel.InfoP->rwVec = pinfo.rwvec;
633 Sel.InfoP->isLU = 1;
634 }
635
636// If we are running a shared file system preform an optional restricted
637// pre-selection and then do a standard selection.
638//
639 if (baseFS.isDFS())
640 {SMask_t amask, smask, pmask;
641 amask = pmask = pinfo.rovec;
642 smask = (Sel.Opts & XrdCmsSelect::Online ? 0 : pinfo.ssvec & amask);
643 Sel.Resp.DLen = 0;
644 if (!(retc = SelDFS(Sel, amask, pmask, smask, 1)))
645 return (Sel.Opts & XrdCmsSelect::Asap && Sel.InfoP
646 ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
647 if (retc < 0) return NotFound;
648 return 0;
649 }
650
651// First check if we have seen this file before. If so, get nodes that have it.
652// A Refresh request kills this because it's as if we hadn't seen it before.
653// If the file was found but either a query is in progress or we have a server
654// bounce; the client must wait.
655//
656 if (Sel.Opts & XrdCmsSelect::Refresh
657 || !(retc = Cache.GetFile(Sel, pinfo.rovec)))
658 {Cache.AddFile(Sel, 0);
659 qfVec = pinfo.rovec; Sel.Vec.hf = 0;
660 } else qfVec = Sel.Vec.bf;
661
662// Compute the delay, if any
663//
664 if ((!qfVec && retc >= 0) || (Sel.Vec.hf && Sel.InfoP)) retc = 0;
665 else if (!(retc = Cache.WT4File(Sel, Sel.Vec.hf))) retc = Wait4CBk;
666
667// Check if we have to ask any nodes if they have the file
668//
669 if (qfVec)
670 {CmsStateRequest QReq = {{Sel.Path.Hash, kYR_state, kYR_raw, 0}};
671 if (Sel.Opts & XrdCmsSelect::Refresh)
673 TRACE(Files, "seeking " <<Sel.Path.Val);
674 qfVec = Cluster.Broadcast(qfVec, QReq.Hdr,
675 (void *)Sel.Path.Val, Sel.Path.Len+1);
676 if (qfVec) Cache.UnkFile(Sel, qfVec);
677 }
678 return retc;
679}
680
681/******************************************************************************/
682/* M o n P e r f */
683/******************************************************************************/
684
686{
687 CmsUsageRequest Usage = {{0, kYR_usage, 0, 0}};
688 struct iovec ioV[] = {{(char *)&Usage, sizeof(Usage)}};
689 int ioVnum = sizeof(ioV)/sizeof(struct iovec);
690 int ioVtot = sizeof(Usage);
691 SMask_t allNodes(~0);
692 int uInterval = Config.AskPing*Config.AskPerf;
693
694// Sleep for the indicated amount of time, then ask for load on each server
695//
696 while(uInterval)
697 {XrdSysTimer::Snooze(uInterval);
698 Broadcast(allNodes, ioV, ioVnum, ioVtot);
699 }
700 return (void *)0;
701}
702
703/******************************************************************************/
704/* M o n R e f s */
705/******************************************************************************/
706
708{
709 XrdCmsNode *nP;
710 int snooze_interval = 60, snooze_total = 0;
711 int rCnt = 0, wCnt = 0;
712 bool resetW, resetR, resetRW;
713
714// Sleep for the snooze interval. If a reset was requested then do a selective
715// reset unless we reached our snooze maximum and enough selections have gone
716// by; in which case, do a global reset.
717//
718 do {XrdSysTimer::Snooze(snooze_interval);
719 int totR = 0, totW = 0;
720
721 STMutex.ReadLock();
722 for (int i = 0; i <= STHi; i++)
723 {if ((nP = NodeTab[i]))
724 {totR += nP->RefTotR;
725 totW += nP->RefTotW;
726 }
727 }
728 STMutex.UnLock();
729
730 rCnt += (totR - SelRtot); SelRtot = totR;
731 wCnt += (totW - SelWtot); SelWtot = totW;
732 snooze_total += snooze_interval;
733
734 resetR = (rCnt >= Config.RefTurn);
735 resetW = (wCnt >= Config.RefTurn);
736 resetRW = (snooze_total >= Config.RefReset && (resetW || resetR));
737 if (resetRW)
738 {ResetRef((SMask_t)0);
739 if (resetR) rCnt = 0;
740 if (resetW) wCnt = 0;
741 snooze_total = 0;
742 }
743 } while(1);
744
745 return (void *)0;
746}
747
748/******************************************************************************/
749/* R e m o v e */
750/******************************************************************************/
751
752// Warning! The node object must be locked upon entry. The lock is released
753// upon deletion of the object.
754
756{
757 theNode->DropJob = new XrdCmsDrop(theNode);
758}
759
760// Warning! The node object must be locked upon entry. The lock is released
761// prior to returning to the caller. This entry obtains the node
762// table lock. When immed != 0 then the node is immediately dropped.
763// When immed if < 0 then the caller already holds the STMutex in
764// write mode and it is not released upon exit.
765
766void XrdCmsCluster::Remove(const char *reason, XrdCmsNode *theNode, int immed)
767{
768 EPNAME("Remove_Node")
769 struct theLocks
770 {XrdSysRWLock *myMutex;
771 XrdCmsNode *myNode;
772 int myNID;
773 int myInst;
774 bool hasLK;
775 bool doDrop;
776 char myIdent[510];
777
778 theLocks(XrdSysRWLock *mtx, XrdCmsNode *node, int immed)
779 : myMutex(mtx), myNode(node), hasLK(immed < 0),
780 doDrop(false)
781 {strlcpy(myIdent, node->Ident, sizeof(myIdent));
782 myNID = node->ID(myInst);
783 if (!hasLK)
784 {myNode->Ref(); // Keep alive
785 myNode->UnLock();
786 myMutex->WriteLock(); // Get global lock
787 myNode->Lock();
788 myNode->unRef(); // Can't escape now
789 }
790 }
791 ~theLocks()
792 {if (myNode)
793 {if (doDrop)
794 {myNode->isBound = 0;
795 myNode->DropTime = 0;
796 myNode->DropJob = new XrdCmsDrop(myNode);
797 myNode->UnLock();
798 } else myNode->UnLock();
799 }
800 if (!hasLK) myMutex->UnLock();
801 }
802 } LockHandler(&STMutex, theNode, immed);
803
804 XrdCmsNode *altNode = 0;
805 int Inst, NodeID = theNode->ID(Inst);
806
807// The LockHandler makes sure that the proper locks are obtained in a deadlock
808// free order. However, this may require that the node lock be released and
809// then re-aquired. We check if we are still dealing with same node at entry.
810// If not, issue message and high-tail it out.
811//
812 if (LockHandler.myNID != NodeID || LockHandler.myInst != Inst)
813 {Say.Emsg("Manager", LockHandler.myIdent, "removal aborted.");
814 DEBUG(LockHandler.myIdent <<" node " <<NodeID <<'.' <<Inst <<" != "
815 << LockHandler.myNID <<'.' <<LockHandler.myInst <<" at entry.");
816 }
817
818// Mark node as being offline and remove any drop job from it
819//
820 theNode->isOffline = 1; // Global lock is held here
821
822// If the node is connected we simply close the connection. This will cause
823// the connection handler to re-initiate the node removal. This condition
824// exists only if one node is being displaced by another node. The Disc()
825// may take a long time, but it's done async by default on the WAN and sync
826// on the LAN (local connections are fast enough and error-free for this).
827//
828 if (theNode->isConn)
829 {theNode->Disc(reason, 0);
830 theNode->isGone = 1; // Disc() sets the isOffline flag
831 return;
832 }
833
834// If we are not the primary node, then get rid of this node post-haste
835//
836 if (!(NodeTab[NodeID] == theNode))
837 {const char *why = (theNode->isMan ? "dropped as alternate."
838 : "dropped and redirected.");
839 Say.Emsg("Remove_Node", theNode->Ident, why);
840 LockHandler.doDrop = true;
841 return;
842 }
843
844
845// If the node is part of the cluster, do not count it anymore and
846// indicate new state of this nodes if we are a reporting manager
847//
848 if (theNode->isBound)
849 {theNode->isBound = 0;
850 NodeCnt--;
851 if (Config.asManager())
853 theNode->isBad & XrdCmsNode::isSuspend ? 0 : -1,
854 theNode->isNoStage ? 0 : -1);
855 }
856
857// If we have a working alternate, substitute it here and immediately drop
858// the former primary. This allows the cache to remain warm.
859//
860 if (theNode->isMan && theNode->cidP && !(theNode->cidP->IsSingle())
861 && (altNode = theNode->cidP->RemNode(theNode)))
862 {if (altNode->isBound) NodeCnt++;
863 NodeTab[NodeID] = altNode;
864 if (Config.asManager())
866 altNode->isBad & XrdCmsNode::isSuspend ? 0 : 1,
867 altNode->isNoStage ? 0 : 1);
868 setAltMan(altNode->NodeID, altNode->Link, altNode->subsPort);
869 Say.Emsg("Manager",altNode->Ident,"replacing dropped",theNode->Ident);
870 LockHandler.doDrop = true;
871 return;
872 }
873
874// If this is an immediate drop request, do so now. Drop() will delete
875// the node object, so remove the node lock and tell LockHandler that.
876//
877 if (immed || !Config.DRPDelay || theNode->isBad & XrdCmsNode::isDoomed)
878 {theNode->UnLock();
879 LockHandler.myNode = 0;
880 Drop(NodeID, Inst);
881 return;
882 }
883
884// If a drop job is already scheduled, update the instance field. Otherwise,
885// Schedule a node drop at a future time.
886//
887 theNode->DropTime = time(0)+Config.DRPDelay;
888 if (theNode->DropJob) theNode->DropJob->nodeInst = Inst;
889 else theNode->DropJob = new XrdCmsDrop(NodeID, Inst);
890
891// Document removal
892//
893 if (reason)
894 Say.Emsg("Manager", theNode->Ident, "scheduled for removal;", reason);
895 else DEBUG(theNode->Ident <<" node " <<NodeID <<'.' <<Inst);
896}
897
898/******************************************************************************/
899/* R e s e t R e f */
900/******************************************************************************/
901
902void XrdCmsCluster::ResetRef(SMask_t nMask, bool isLocked)
903{
904 XrdCmsNode *nP;
905 bool doAll (nMask == 0);
906
907// Obtain a lock on the table if not already locked
908//
909 if (!isLocked) STMutex.ReadLock();
910
911// Reset reference counts as needed. We can do this with a read lock as the
912// reference counters are atomic.
913//
914 for (int i = 0; i <= STHi; i++)
915 {if ((nP = NodeTab[i]) && (doAll || nP->isNode(nMask)))
916 {nP->RefW = 0;
917 nP->RefR = 0;
918 nP->Shrem = nP->Share;
919 }
920 }
921
922// Unlock table and exit
923//
924 if (!isLocked) STMutex.UnLock();
925}
926
927/******************************************************************************/
928/* S e l e c t */
929/******************************************************************************/
930
932{
933 EPNAME("Select");
934 XrdCmsPInfo pinfo;
935 const char *Amode;
936 int dowt = 0, retc = 0, isRW, fRD, noSel = (Sel.Opts & XrdCmsSelect::Defer);
937 SMask_t amask, smask, pmask;
938
939// Establish some local options
940//
941 if (Sel.Opts & XrdCmsSelect::Write)
942 {isRW = 1; Amode = "write";
943 if (Config.RWDelay)
944 if (Sel.Opts & XrdCmsSelect::Create && Config.RWDelay < 2) fRD = 1;
945 else fRD = 0;
946 else fRD = 1;
947 }
948 else {isRW = 0; Amode = "read"; fRD = 1;}
949
950// Find out who serves this path
951//
952 if (!Cache.Paths.Find(Sel.Path.Val, pinfo)
953 || (amask = ((isRW ? pinfo.rwvec : pinfo.rovec) & ~Sel.nmask)) == 0)
954 {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
955 "No servers have %s access to the file", Amode)+1;
956 Sel.Resp.Port = kYR_ENOENT;
957 return EReplete;
958 }
959
960// If we are running a shared file system preform an optional restricted
961// pre-selection and then do a standard selection. Since all nodes are equal,
962// make sure the client is needlessly avoiding them as this signals an error.
963//
964 if (baseFS.isDFS())
965 {if (Sel.nmask && !(Sel.Opts & XrdCmsSelect::NoTryLim))
966 {pmask = (isRW ? pinfo.rwvec : pinfo.rovec) & Sel.nmask;
967 if (!(Sel.Opts & XrdCmsSelect::Online))
968 pmask |= pinfo.ssvec & Sel.nmask;
969 if (pmask && maxBits(pmask, baseFS.dfsTries()))
970 {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
971 "Too many attempts to gain dfs %s access to the file", Amode)+1;
972 return RetryErr;
973 }
974 }
975 pmask = amask;
976 smask = (Sel.Opts & XrdCmsSelect::Online ? 0 : pinfo.ssvec & amask);
977 if (baseFS.Trim())
978 {Sel.Resp.DLen = 0;
979 if (!(retc = SelDFS(Sel, amask, pmask, smask, isRW)))
980 return (fRD ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
981 if (retc < 0) return retc;
982 } else if (noSel) return 0;
983 return SelNode(Sel, pmask, smask);
984 }
985
986// If either a refresh is wanted or we didn't find the file, re-prime the cache
987// which will force the client to wait. Otherwise, compute the primary and
988// secondary selections. If there are none, the client may have to wait if we
989// have servers that we can query regarding the file. Note that for files being
990// opened in write mode, only one writable copy may exist unless this is a
991// meta-operation (e.g., remove) in which case the file itself remain unmodified
992// or a replica request, in which case we select a new target server.
993//
994 if (!(Sel.Opts & XrdCmsSelect::Refresh)
995 && (retc = Cache.GetFile(Sel, pinfo.rovec)))
996 {if (isRW)
997 { if (retc<0) return Config.LUPDelay;
998 else if (Sel.Opts & XrdCmsSelect::Replica)
999 {pmask = amask & ~(Sel.Vec.hf | Sel.Vec.bf); smask = 0;
1000 if (!pmask && !Sel.Vec.bf) return SelFail(Sel,eNoRep);
1001 }
1002 else if (Sel.Vec.bf) pmask = smask = 0;
1003 else if (Sel.Vec.hf)
1004 {if (Sel.Opts & XrdCmsSelect::NewFile) return SelFail(Sel,eExists);
1005 if (!(Sel.Opts & XrdCmsSelect::MWFiles))
1006 {if (!(Sel.Opts & XrdCmsSelect::isMeta)
1007 && maxBits(Sel.Vec.hf,2)) return SelFail(Sel,eDups);
1008 if ((Sel.Vec.hf & pinfo.rwvec)
1009 != (Sel.Vec.hf & pinfo.rovec)) return SelFail(Sel,eROfs);
1010 }
1011 if (!(pmask = Sel.Vec.hf & amask)) return SelFail(Sel,eNoSel);
1012 smask = 0;
1013 }
1015 {pmask = amask; smask = 0;}
1016 else if ((smask = pinfo.ssvec & amask)) pmask = 0;
1017 else pmask = smask = 0;
1018 } else {
1019 pmask = Sel.Vec.hf & amask;
1020 if (Sel.Opts & XrdCmsSelect::Online) {pmask &= ~Sel.Vec.pf; smask=0;}
1021 else smask = (retc < 0 ? 0 : pinfo.ssvec & amask);
1022 }
1023 if (Sel.Vec.hf & Sel.nmask) Cache.UnkFile(Sel, Sel.nmask);
1024 } else {
1025 Cache.AddFile(Sel, 0);
1026 Sel.Vec.bf = pinfo.rovec;
1027 Sel.Vec.hf = Sel.Vec.pf = pmask = smask = 0;
1028 retc = 0;
1029 }
1030
1031// A wait is required if we don't have any primary or seconday servers
1032//
1033 dowt = (!pmask && !smask);
1034
1035// If we can query additional servers, do so now. The client will be placed
1036// in the callback queue only if we have no possible selections
1037//
1038 if (Sel.Vec.bf)
1039 {CmsStateRequest QReq = {{Sel.Path.Hash, kYR_state, kYR_raw, 0}};
1040 if (Sel.Opts & XrdCmsSelect::Refresh)
1042 if (dowt) retc= (fRD ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
1043 TRACE(Files, "seeking " <<Sel.Path.Val);
1044 amask = Cluster.Broadcast(Sel.Vec.bf, QReq.Hdr,
1045 (void *)Sel.Path.Val,Sel.Path.Len+1);
1046 if (amask) Cache.UnkFile(Sel, amask);
1047 if (dowt) return retc;
1048 } else if (dowt && retc < 0 && !noSel)
1049 return (fRD ? Cache.WT4File(Sel,Sel.Vec.hf) : Config.LUPDelay);
1050
1051// Broadcast a freshen up request if wanted
1052//
1053 if ((Sel.Opts & XrdCmsSelect::Freshen) && (amask = pmask & ~Sel.Vec.bf))
1055 Cluster.Broadcast(amask, Qupt.Hdr,(void *)Sel.Path.Val,Sel.Path.Len+1);
1056 }
1057
1058// If we need to defer selection, simply return as this is a mindless prepare
1059//
1060 if (noSel) return 0;
1061
1062// Check if we have no useable servers
1063//
1064 if (dowt) return Unuseable(Sel);
1065
1066// Check if should eliminate staging servers. We may need to do this if the
1067// client has been eliminating too many of them as they all should be equal.
1068//
1069 if (Sel.nmask && pinfo.ssvec && !(Sel.Opts & XrdCmsSelect::NoTryLim)
1070 && maxBits(Sel.nmask & pinfo.ssvec, baseFS.stgTries()))
1071 {if (!pmask)
1072 {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
1073 "Too many attempts to stage %s access to the file", Amode)+1;
1074 return RetryErr;
1075 }
1076 smask = 0;
1077 }
1078
1079// Select a node
1080//
1081 return SelNode(Sel, pmask, smask);
1082}
1083
1084/******************************************************************************/
1085
1086int XrdCmsCluster::Select(SMask_t pmask, int &port, char *hbuff, int &hlen,
1087 int isrw, int isMulti, int ifWant)
1088{
1089 static const SMask_t smLow(255);
1090 XrdCmsSelector selR;
1091 XrdCmsNode *nP = 0;
1092 SMask_t tmask;
1093 int Snum = 0;
1094 XrdNetIF::ifType nType = static_cast<XrdNetIF::ifType>(ifWant);
1095
1096// If there is nothing to select from, return failure
1097//
1098 if (!pmask) return 0;
1099
1100// Obtain the network we need for the client
1101//
1102 selR.needNet = XrdNetIF::Mask(nType);
1103
1104// Initialize
1105//
1106 selR.needSpace = 0;
1107
1108// Packed selection can never occur in this code path so we turn it off
1109//
1110 selR.selPack = 0;
1111
1112// If we are exporting a shared-everything system then the incoming mask
1113// may have more than one server indicated. So, we need to do a full select.
1114// This is forced when isMulti is true, indicating a choice may exist. Note
1115// that the node, if any, is returned unlocked but we have the global mutex.
1116//
1117 if (isMulti || baseFS.isDFS())
1118 {STMutex.ReadLock();
1119 nP = (Config.sched_RR ? SelbyRef(pmask,selR)
1120 : Config.sched_LoadR == 0 ? SelbyLoad(pmask,selR)
1121 : SelbyLoadR(pmask, selR));
1122
1123 if (nP) hlen = nP->netIF.GetName(hbuff, port, nType) + 1;
1124 else hlen = 0;
1125 STMutex.UnLock();
1126 return hlen != 1;
1127 }
1128
1129// In shared-nothing systems the incoming mask will only have a single node.
1130// Compute the a single node number that is contained in the mask.
1131//
1132 do {if (!(tmask = pmask & smLow)) Snum += 8;
1133 else {while((tmask = tmask>>1)) Snum++; break;}
1134 } while((pmask = pmask >> 8));
1135
1136// See if the node passes muster
1137//
1138 STMutex.ReadLock();
1139 if ((nP = NodeTab[Snum]))
1140 { if (nP->isBad) nP = 0;
1141 else if (!Config.sched_RR && (nP->myLoad > Config.MaxLoad)) nP = 0;
1142 else if (!(selR.needNet & nP->hasNet)) nP = 0;
1143 if (nP)
1144 {if (isrw)
1145 if (nP->isNoStage || nP->DiskFree < nP->DiskMinF) nP = 0;
1146 else {nP->RefTotW++; nP->RefW++;}
1147 else {nP->RefTotR++; nP->RefR++;}
1148 }
1149 }
1150
1151// At this point either we have a node or we do not
1152//
1153 if (nP)
1154 {hlen = nP->netIF.GetName(hbuff, port, nType) + 1;
1155 nP->RefR++;
1156 STMutex.UnLock();
1157 return hlen != 1;
1158 }
1159 STMutex.UnLock();
1160 return 0;
1161}
1162
1163/******************************************************************************/
1164/* S e l F a i l */
1165/******************************************************************************/
1166
1167int XrdCmsCluster::SelFail(XrdCmsSelect &Sel, int rc)
1168{
1169//
1170 const char *etext, *Item = "file";
1171
1172 switch(rc)
1173 {case eExists: if (Sel.Opts & XrdCmsSelect::isMeta) Item = "directory";
1174 etext = "Unable to create %s; it already exists.";
1175 Sel.Resp.Port = kYR_RWConflict;
1176 break;
1177 case eROfs: etext = "Unable to modify %s; r/o copy already exists.";
1178 Sel.Resp.Port = kYR_RWConflict;
1179 break;
1180 case eDups: etext = "Unable to modify %s; multiple copies exist.";
1181 Sel.Resp.Port = kYR_RWConflict;
1182 break;
1183 case eNoRep: etext = "Unable to replicate %s; no new sites available.";
1184 Sel.Resp.Port = kYR_noReplicas;
1185 break;
1186 case eNoSel: if (Sel.Vec.hf & Sel.nmask)
1187 {etext = "Unable to access %s; eligible servers shunned.";
1188 if (Sel.Opts & XrdCmsSelect::isDir) Item = "directory";
1189 } else {
1190 if (Sel.Opts & XrdCmsSelect::Write)
1191 {etext = "Unable to write %s; r/w exports not found.";
1192 } else {
1193 etext = "Unable to access %s; it does not exist.";
1194 if (Sel.Opts & XrdCmsSelect::isDir) Item = "directory";
1195 }
1196 }
1197 Sel.Resp.Port = kYR_ENOENT;
1198 break;
1199 default: etext = "Unable to access %s; it does not exist.";
1200 Sel.Resp.Port = kYR_ENOENT;
1201 break;
1202 };
1203
1204 int n = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data), etext, Item);
1205 if (n < (int)sizeof(Sel.Resp.Data)) Sel.Resp.DLen = n+1;
1206 else Sel.Resp.DLen = sizeof(Sel.Resp.Data);
1207
1208 return EReplete;
1209}
1210
1211/******************************************************************************/
1212/* S p a c e */
1213/******************************************************************************/
1214
1216{
1217 XrdCmsNode *nP;
1218 SMask_t bmask;
1219 int i;
1220 bool doAll = !baseFS.isDFS();
1221
1222// Obtain a lock on the table and screen out peer nodes
1223//
1224 STMutex.ReadLock();
1225 bmask = smask & peerMask;
1226
1227// Run through the table getting space information
1228//
1229 for (i = 0; i <= STHi; i++)
1230 if ((nP = NodeTab[i]) && nP->isNode(bmask) && !(nP->isOffline))
1231 {if (doAll || !sData.Total)
1232 {sData.Total += nP->DiskTotal;
1233 sData.TotFr += nP->DiskFree;
1234 }
1235 if (nP->isRW & XrdCmsNode::allowsSS)
1236 {sData.sNum++;
1237 if (sData.sFree < nP->DiskFree)
1238 {sData.sFree = nP->DiskFree; sData.sUtil = nP->DiskUtil;}
1239 }
1240 if (nP->isRW & XrdCmsNode::allowsRW)
1241 {sData.wNum++;
1242 if (sData.wFree < nP->DiskFree)
1243 {sData.wFree = nP->DiskFree; sData.wUtil = nP->DiskUtil;
1244 sData.wMinF = nP->DiskMinF;
1245 }
1246 }
1247 }
1248 STMutex.UnLock();
1249}
1250
1251/******************************************************************************/
1252/* S t a t s */
1253/******************************************************************************/
1254
1255int XrdCmsCluster::Stats(char *bfr, int bln)
1256{
1257 static const char statfmt1[] = "<stats id=\"cms\">"
1258 "<role>%s</role></stats>";
1259 int mlen;
1260
1261// Check if actual length wanted
1262//
1263 if (!bfr) return sizeof(statfmt1) + 8;
1264
1265// Format the statistics (not much here for now)
1266//
1267 mlen = snprintf(bfr, bln, statfmt1, Config.myRType);
1268
1269 if ((bln -= mlen) <= 0) return 0;
1270 return mlen;
1271}
1272
1273/******************************************************************************/
1274/* S t a t t */
1275/******************************************************************************/
1276
1277int XrdCmsCluster::Statt(char *bfr, int bln)
1278{
1279 static const char statfmt0[] = "</stats>";
1280 static const char statfmt1[] = "<stats id=\"cmsm\">"
1281 "<role>%s</role><sel><t>%lld</t><r>%lld</r><w>%lld</w></sel>"
1282 "<node>%d";
1283 static const char statfmt2[] = "<stats id=\"%d\">"
1284 "<host>%s</host><role>%s</role>"
1285 "<run>%s</run><ref><r>%d</r><w>%d</w></ref>%s</stats>";
1286 static const char statfmt3[] = "<shr>%d<use>%d</use></shr>";
1287 static const char statfmt4[] = "</node>";
1288 static const char statfmt5[] =
1289 "<frq><add>%lld<d>%lld</d></add><rsp>%lld<m>%lld</m></rsp>"
1290 "<lf>%lld</lf><ls>%lld</ls><rf>%lld</rf><rs>%lld</rs></frq>";
1291
1292 static int AddFrq = (Config.RepStats & XrdCmsConfig::RepStat_frq);
1293 static int AddShr = (Config.RepStats & XrdCmsConfig::RepStat_shr)
1294 && Config.asMetaMan();
1295
1296 XrdCmsRRQ::Info Frq;
1297 XrdCmsSelected *sp;
1298 int mlen, tlen, n = 0;
1299 char shrBuff[80], stat[6], *stp;
1300 bool oksel;
1301
1302 class spmngr {
1303 public: XrdCmsSelected *sp;
1304
1305 spmngr() {sp = 0;}
1306 ~spmngr() {XrdCmsSelected *xsp;
1307 while((xsp = sp)) {sp = sp->next; delete xsp;}
1308 }
1309 } mngrsp;
1310
1311// Check if actual length wanted
1312//
1313 if (!bfr)
1314 {n = sizeof(statfmt0) +
1315 sizeof(statfmt1) + 12*3 + 3 + 3 +
1316 (sizeof(statfmt2) + 10*2 + 256 + 16) * STMax + sizeof(statfmt4);
1317 if (AddShr) n += sizeof(statfmt3) + 12;
1318 if (AddFrq) n += sizeof(statfmt4) + (10*8);
1319 return n;
1320 }
1321
1322// Get the statistics
1323//
1324 if (AddFrq) RRQ.Statistics(Frq);
1325 mngrsp.sp = sp = List(FULLMASK, LS_NULL, oksel);
1326
1327// Count number of nodes we have
1328//
1329 while(sp) {n++; sp = sp->next;}
1330 sp = mngrsp.sp;
1331
1332// Format the statistics
1333//
1334 long long lclTcnt = SelTcnt, lclRtot = SelRtot, lclWtot = SelWtot;
1335 mlen = snprintf(bfr, bln, statfmt1,
1336 Config.myRType, lclTcnt, lclRtot, lclWtot, n);
1337
1338 if ((bln -= mlen) <= 0) return 0;
1339 tlen = mlen; bfr += mlen; n = 0; *shrBuff = 0;
1340
1341 while(sp && bln > 0)
1342 {stp = stat;
1343 if (sp->Status & XrdCmsSelected::Offline) *stp++ = 'o';
1344 else if (sp->Status & XrdCmsSelected::Suspend) *stp++ = 's';
1345 else if (sp->Status & XrdCmsSelected::Disable) *stp++ = 'd';
1346 else *stp++ = 'a';
1347 if (sp->Status & XrdCmsSelected::isRW) *stp++ = 'w';
1348 if (sp->Status & XrdCmsSelected::NoStage) *stp++ = 'n';
1349 *stp = 0;
1350 if (AddShr) snprintf(shrBuff, sizeof(shrBuff), statfmt3,
1351 (sp->Share ? sp->Share : 100), sp->Shrin);
1352 mlen = snprintf(bfr, bln, statfmt2, n, sp->Ident,
1353 XrdCmsRole::Type(static_cast<XrdCmsRole::RoleID>(sp->RoleID)),
1354 stat, sp->RefTotR, sp->RefTotW, shrBuff);
1355 bfr += mlen; bln -= mlen; tlen += mlen;
1356 sp = sp->next; n++;
1357 }
1358
1359 if (bln <= (int)sizeof(statfmt4)) return 0;
1360 strcpy(bfr, statfmt4); mlen = sizeof(statfmt4) - 1;
1361 bfr += mlen; bln -= mlen; tlen += mlen;
1362
1363 if (AddFrq && bln > 0)
1364 {mlen = snprintf(bfr, bln, statfmt5, Frq.Add2Q, Frq.PBack, Frq.Resp,
1365 Frq.Multi, Frq.luFast, Frq.luSlow, Frq.rdFast, Frq.rdSlow);
1366 bfr += mlen; bln -= mlen; tlen += mlen;
1367 }
1368
1369// See if we overflowed. otherwise finish up
1370//
1371 if (sp || bln < (int)sizeof(statfmt0)) return 0;
1372 strcpy(bfr, statfmt0);
1373 return tlen + sizeof(statfmt0) - 1;
1374}
1375
1376/******************************************************************************/
1377/* P r i v a t e M e t h o d s */
1378/******************************************************************************/
1379/******************************************************************************/
1380/* c a l c D e l a y */
1381/******************************************************************************/
1382
1383XrdCmsNode *XrdCmsCluster::calcDelay(XrdCmsSelector &selR)
1384{
1385 if (!selR.nPick) {selR.delay = 0;
1386 selR.reason = (selR.xNoNet
1387 ? "no eligible servers reachable for"
1388 : "no eligible servers for");
1389 }
1390 else if (selR.xFull) {selR.delay = Config.DiskWT;
1391 selR.reason = "no eligible servers have space for";
1392 }
1393 else if (selR.xOvld) {selR.delay = Config.MaxDelay;
1394 selR.reason = "eligible servers overloaded for";
1395 }
1396 else if (selR.xSusp) {selR.delay = Config.SUSDelay;
1397 selR.reason = "eligible servers suspended for";
1398 }
1399 else if (selR.xOff) {selR.delay = Config.SUPDelay;
1400 selR.reason = "eligible servers offline for";
1401 }
1402 else {selR.delay = Config.SUPDelay;
1403 selR.reason = "server selection error for";
1404 }
1405 return (XrdCmsNode *)0;
1406}
1407
1408/******************************************************************************/
1409/* D r o p */
1410/******************************************************************************/
1411
1412// Warning: STMutex must be locked in write upon entry and the caller must
1413// release it if this method is called directily. Otherwise, the mutex
1414// will be obtained and released. Also, this method may only be called
1415// via Remove() either directly or via a deferred job scheduled by that
1416// method. This method actually deletes the node object.
1417
1418int XrdCmsCluster::Drop(int sent, int sinst, XrdCmsDrop *djp)
1419{
1420 EPNAME("Drop_Node")
1421 XrdCmsNode *nP;
1422 char hname[512];
1423
1424// If we are being called outside of a scheduled job, obtain the mutex
1425//
1426 if (djp) STMutex.WriteLock();
1427
1428// Make sure this node is the right one
1429//
1430 if (!(nP = NodeTab[sent]) || nP->Inst() != sinst)
1431 {if (nP && djp == nP->DropJob) {nP->DropJob = 0; nP->DropTime = 0;}
1432 if (djp) STMutex.UnLock();
1433 DEBUG(sent <<'.' <<sinst <<" cancelled.");
1434 return 0;
1435 }
1436
1437// Check if the drop has been rescheduled
1438//
1439 if (djp && time(0) < nP->DropTime)
1440 {Sched->Schedule((XrdJob *)djp, nP->DropTime);
1441 if (djp) STMutex.UnLock();
1442 return 1;
1443 }
1444
1445// Save the node name (don't want to hold a lock across a message)
1446//
1447 strlcpy(hname, nP->Ident, sizeof(hname));
1448
1449// Cleanup status
1450//
1451 NodeTab[sent] = 0;
1452 nP->isOffline = 1; // STMutex is locked in write mode
1453 nP->DropTime = 0;
1454 nP->DropJob = 0;
1455 nP->isBound = 0;
1456
1457// Remove node from the peer list (if it is one)
1458//
1459 if (nP->isPeer) {peerHost &= nP->NodeMask; peerMask = ~peerHost;}
1460
1461// Remove node entry from the alternate list and readjust the end pointer.
1462//
1463 if (nP->isMan)
1464 {memset((void *)&AltMans[sent*AltSize], (int)' ', AltSize);
1465 if (sent == AltMent)
1466 {AltMent--;
1467 while(AltMent >= 0 && NodeTab[AltMent]
1468 && !NodeTab[AltMent]->isMan) AltMent--;
1469 if (AltMent < 0) AltMend = AltMans;
1470 else AltMend = AltMans + ((AltMent+1)*AltSize);
1471 }
1472 }
1473
1474// Readjust STHi
1475//
1476 if (sent == STHi) while(STHi >= 0 && !NodeTab[STHi]) STHi--;
1477
1478// Invalidate any cached entries for this node
1479//
1480 if (nP->NodeMask) Cache.Drop(nP->NodeMask, sent, STHi);
1481
1482// We can now delete the node object if we were called via a job as we are on
1483// a different thread. Direct calls require that we schedule the deletion as
1484// it may take a long time if there are oustanding references to this node.
1485//
1486 if (djp) {STMutex.UnLock(); nP->Delete(STMutex);}
1487 else nP->DropJob = new XrdCmsDrop(nP);
1488
1489// Document the drop
1490//
1491 Say.Emsg("Drop_Node", hname, "dropped.");
1492 return 0;
1493}
1494
1495/******************************************************************************/
1496/* M u l t i p l e */
1497/******************************************************************************/
1498
1499int XrdCmsCluster::Multiple(SMask_t mVec)
1500{
1501 static const unsigned long long Left32 = 0xffffffff00000000LL;
1502 static const unsigned long long Right32 = 0x00000000ffffffffLL;
1503 static const unsigned long long Left16 = 0x00000000ffff0000LL;
1504 static const unsigned long long Right16 = 0x000000000000ffffLL;
1505 static const unsigned long long Left08 = 0x000000000000ff00LL;
1506 static const unsigned long long Right08 = 0x00000000000000ffLL;
1507 static const unsigned long long Left04 = 0x00000000000000f0LL;
1508 static const unsigned long long Right04 = 0x000000000000000fLL;
1509// 0 1 2 3 4 5 6 7 8 9 A B C D E F
1510 static const int isMult[16] = {0,0,0,1,0,1,1,1,0,1,1,1,1,1,1,1};
1511
1512 if (mVec & Left32) {if (mVec & Right32) return 1;
1513 else mVec = mVec >> 32LL;
1514 }
1515 if (mVec & Left16) {if (mVec & Right16) return 1;
1516 else mVec = mVec >> 16LL;
1517 }
1518 if (mVec & Left08) {if (mVec & Right08) return 1;
1519 else mVec = mVec >> 8LL;
1520 }
1521 if (mVec & Left04) {if (mVec & Right04) return 1;
1522 else mVec = mVec >> 4LL;
1523 }
1524 return isMult[mVec];
1525}
1526
1527/******************************************************************************/
1528/* m a x B i t s */
1529/******************************************************************************/
1530
1531bool XrdCmsCluster::maxBits(SMask_t mVec, int mbits)
1532{
1533 int count = 0;
1534
1535// Count bits. This is the fastest way assuming few bits are set
1536//
1537 while(mVec)
1538 {mVec &= (mVec - 1);
1539 count++;
1540 if (count >= mbits) return true;
1541 }
1542
1543// Indicate we have not reached the maximum bits set
1544//
1545 return false;
1546}
1547
1548/******************************************************************************/
1549/* R e c o r d */
1550/******************************************************************************/
1551
1552void XrdCmsCluster::Record(char *path, const char *reason, bool force)
1553{
1554 EPNAME("Record")
1555 static int msgcnt = 255;
1556 static XrdSysMutex mcMutex;
1557 int skipmsg;
1558
1559 DEBUG(reason <<path);
1560 mcMutex.Lock();
1561 msgcnt++; skipmsg = msgcnt & (force ? 0x0f : 0xff);
1562 mcMutex.UnLock();
1563
1564 if (!skipmsg) Say.Emsg(epname, "client deferred;", reason, path);
1565}
1566
1567/******************************************************************************/
1568/* S e l N o d e */
1569/******************************************************************************/
1570
1571int XrdCmsCluster::SelNode(XrdCmsSelect &Sel, SMask_t pmask, SMask_t amask)
1572{
1573 EPNAME("SelNode")
1574 const char *act=0;
1575 int affsel = 1, count = 0, isalt = 0, pass = 2;
1576 SMask_t mask;
1577 XrdCmsNode *nP = 0;
1578 XrdCmsSelector selR;
1579 XrdNetIF::ifType nType=(XrdNetIF::ifType)(Sel.Opts & XrdCmsSelect::ifWant);
1580
1581// Obtain the network we need for the client
1582//
1583 selR.needNet = XrdNetIF::Mask(nType);
1584
1585// Indicate whether or not stable selection is required
1586//
1587 if (!(Sel.Opts & XrdCmsSelect::Pack)) selR.selPack = 0;
1588 else {unsigned int theHash = (Sel.Opts & XrdCmsSelect::UseAH
1589 ? Sel.AltHash : Sel.Path.Hash);
1590 SMask_t sVec = pmask;
1591 for (count = 0; sVec; count++) sVec &= (sVec - 1);
1592 if (count > 1) selR.selPack = affsel = (theHash % count) + 1;
1593 else selR.selPack = 0;
1594 }
1595
1596// There is a difference bwteen needing space and needing r/w access. The former
1597// is needed when we will be writing data the latter for inode modifications.
1598//
1599 if (Sel.Opts & XrdCmsSelect::isMeta) selR.needSpace = 0;
1600 else selR.needSpace = (Sel.Opts & XrdCmsSelect::Write
1601 ? XrdCmsNode::allowsRW : 0);
1602
1603// Scan for a primary and alternate node (alternates do staging). At this
1604// point we omit all peer nodes as they are our last resort. Note that Selbyxxx
1605// returns the node unlocked but we have the global mutex so that is OK.
1606//
1607 STMutex.ReadLock();
1608 mask = pmask & peerMask;
1609 while(pass--)
1610 {if (mask)
1611 {nP = (Config.sched_RR || (Sel.Opts & XrdCmsSelect::UseRef)
1612 ? SelbyRef(mask,selR)
1613 : Config.sched_LoadR == 0 ? SelbyLoad(pmask,selR)
1614 : SelbyLoadR(pmask, selR));
1615 if (nP || (selR.nPick && selR.delay)
1616 || NodeCnt < Config.SUPCount) break;
1617 }
1618 mask = amask & peerMask; isalt = XrdCmsNode::allowsSS;
1619 if (!(Sel.Opts & XrdCmsSelect::isMeta)) selR.needSpace |= isalt;
1620 }
1621
1622// Produce affinity result trace
1623//
1624 if (Sel.Opts & XrdCmsSelect::Pack && nP)
1625 {TRACE(Redirect, "affinity " <<affsel <<'/' <<count <<'/'
1626 <<(int)selR.selPack <<(selR.selPack ? " go " : " ng ")
1627 <<nP->Name() <<' ' <<Sel.Path.Val);
1628 }
1629
1630// If we found an eligible node then dispatch the client to it. We will
1631// swap the global mutex for the node mutex to minimize interefrence.
1632//
1633 if (nP)
1634 {nP->g2nLock(STMutex);
1635 Sel.Resp.DLen = nP->netIF.GetName(Sel.Resp.Data, Sel.Resp.Port, nType);
1636 if (!Sel.Resp.DLen) {nP->UnLock(); return Unreachable(Sel, false);}
1637 Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
1638
1639 // If a message is to be sent to the selected server, send it.
1640 //
1641 if (Sel.iovN && Sel.iovP) nP->Send(Sel.iovP, Sel.iovN);
1642
1643 // Do special post proccessing when any of:
1644 // a) isalt true: Secondary selection occurred
1645 // b) Create set: File creation will occur
1646 //
1647 if (isalt || (Sel.Opts & XrdCmsSelect::Create))
1649 if (Sel.Opts & XrdCmsSelect::noBind) act = " handling ";
1650 else Cache.AddFile(Sel, nP->NodeMask);
1651 }
1652
1653 // Determine what we are actually doing here
1654 //
1655 nP->UnLock();
1656 if (!act)
1657 {if (isalt) act = (Sel.iovN ? " staging " : " assigned ");
1658 else act = " serving ";
1659 }
1660 TRACE(Stage, Sel.Resp.Data <<act <<Sel.Path.Val);
1661 return 0;
1662 }
1663
1664// No node so check if we have a sufficient number to continue. Note that we
1665// do not forward to a peer unless we have a suffficient number of local nodes.
1666//
1667 if (!selR.delay && NodeCnt < Config.SUPCount)
1668 {STMutex.UnLock();
1669 Record(Sel.Path.Val, "insufficient number of nodes", true);
1670 return Config.SUPDelay;
1671 }
1672
1673// Return delay if we should avoid selecting a peer manager
1674//
1675 if (selR.delay && selR.delay < Config.PSDelay)
1676 {STMutex.UnLock();
1677 Record(Sel.Path.Val, selR.reason);
1678 return selR.delay;
1679 }
1680
1681// At this point, we attempt a peer node selection (choice of last resort). Note
1682// that we are still holding the global lock! If we find a peer node we will
1683// swap it with the node lock.
1684//
1685 if (Sel.Opts & XrdCmsSelect::Peers)
1686 {const char *reason1 = selR.reason;
1687 int delay1 = selR.delay;
1688 bool noNet = selR.xNoNet;
1689 if ((mask = (pmask | amask) & peerHost)) nP = SelbyCost(mask, selR);
1690 if (nP)
1691 {nP->g2nLock(STMutex);
1692 Sel.Resp.DLen = nP->netIF.GetName(Sel.Resp.Data,Sel.Resp.Port,nType);
1693 if (!Sel.Resp.DLen) {nP->UnLock(); return Unreachable(Sel, false);}
1694 Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
1695 if (Sel.iovN && Sel.iovP) nP->Send(Sel.iovP, Sel.iovN);
1696 nP->UnLock();
1697 TRACE(Stage, "Peer " <<Sel.Resp.Data <<" handling " <<Sel.Path.Val);
1698 return 0;
1699 }
1700 if (!selR.delay)
1701 {selR.delay = delay1; selR.reason = reason1; selR.xNoNet = noNet;}
1702 }
1703
1704// At this point we don't need the global lock so let it go.
1705//
1706 STMutex.UnLock();
1707
1708// At this point we either don't have enough nodes or simply can't handle this
1709//
1710 if (selR.delay)
1711 {Record(Sel.Path.Val, selR.reason);
1712 return selR.delay;
1713 }
1714
1715// Return appropriate error message
1716//
1717 if (selR.xNoNet) return Unreachable(Sel, true);
1718 return Unuseable(Sel);
1719}
1720
1721/******************************************************************************/
1722/* R e f C o u n t */
1723/******************************************************************************/
1724
1725// This snippet of code occurrs often enough so that we make it a macro as we
1726// want to execute this inline.
1727//
1728#define RefCount(sP, sPMulti, NeedSpace) \
1729 if (NeedSpace) {sP->RefTotW++; sP->RefW++;} \
1730 else {sP->RefTotR++; sP->RefR++;} \
1731 if (sPMulti && sP->Share && !sP->Shrem--) \
1732 {sP->RefW += sP->Shrip; sP->RefR += sP->Shrip; \
1733 sP->Shrem = sP->Share; sP->Shrin++; \
1734 }
1735
1736/******************************************************************************/
1737/* S e l b y C o s t */
1738/******************************************************************************/
1739
1740// Cost selection is used only for peer node selection as peers do not
1741// report a load and handle their own scheduling.
1742
1743// Caller must have the STMutex locked. The returned node, if any, is unlocked.
1744
1745XrdCmsNode *XrdCmsCluster::SelbyCost(SMask_t mask, XrdCmsSelector &selR)
1746{
1747 XrdCmsNode *np, *sp = 0;
1748 bool Multi = false;
1749
1750// Scan for a node (sp points to the selected one)
1751//
1752 selR.Reset(); SelTcnt++;
1753 for (int i = 0; i <= STHi; i++)
1754 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1755 {if (!(selR.needNet & np->hasNet)) {selR.xNoNet= true; continue;}
1756 selR.nPick++;
1757 if (np->isOffline) {selR.xOff = true; continue;}
1758 if (np->isBad) {selR.xSusp = true; continue;}
1759 if (selR.needSpace && np->isNoStage) {selR.xFull = true; continue;}
1760 if (!sp) sp = np;
1761 else{if (abs(sp->myCost - np->myCost) <= Config.P_fuzz)
1762 { if (selR.selPack)
1763 {if (--selR.selPack) sp=np;
1764 else break;
1765 }
1766 else if (selR.needSpace)
1767 {if (sp->RefW > (np->RefW+Config.DiskLinger))
1768 sp=np;
1769 }
1770 else if (sp->RefR > np->RefR) sp=np;
1771 }
1772 else if (sp->myCost > np->myCost) sp=np;
1773 Multi = true;
1774 }
1775 }
1776
1777// Check for overloaded node and return result
1778//
1779 if (!sp) return calcDelay(selR);
1780 RefCount(sp, Multi, selR.needSpace);
1781 return sp;
1782}
1783
1784/******************************************************************************/
1785/* S e l b y L o a d */
1786/******************************************************************************/
1787
1788// Caller must have the STMutex locked. The returned node, if any, is unlocked.
1789
1790XrdCmsNode *XrdCmsCluster::SelbyLoad(SMask_t mask, XrdCmsSelector &selR)
1791{
1792 XrdCmsNode *np, *sp = 0;
1793 bool Multi = false, reqSS = (selR.needSpace & XrdCmsNode::allowsSS) != 0;
1794
1795// Scan for a node (preset possible, suspended, overloaded, full, and dead)
1796//
1797 selR.Reset(); SelTcnt++;
1798 for (int i = 0; i <= STHi; i++)
1799 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1800 {if (!(selR.needNet & np->hasNet)) {selR.xNoNet= true; continue;}
1801 selR.nPick++;
1802 if (np->isOffline) {selR.xOff = true; continue;}
1803 if (np->isBad) {selR.xSusp = true; continue;}
1804 if (np->myLoad > Config.MaxLoad) {selR.xOvld = true; continue;}
1805 if (selR.needSpace && (np->DiskFree < np->DiskMinF
1806 || (reqSS && np->isNoStage)))
1807 {selR.xFull = true; continue;}
1808 if (!sp) sp = np;
1809 else{if (selR.needSpace)
1810 {if (abs(sp->myMass - np->myMass) <= Config.P_fuzz)
1811 {if (sp->RefW > (np->RefW+Config.DiskLinger)) sp=np;}
1812 else if (sp->myMass > np->myMass) sp=np;
1813 } else {
1814 if (abs(sp->myLoad - np->myLoad) <= Config.P_fuzz)
1815 {if (selR.selPack)
1816 {if (--selR.selPack) sp=np;
1817 else break;
1818 }
1819 else if (sp->RefR > np->RefR) sp=np;
1820 }
1821 else if (sp->myLoad > np->myLoad) sp=np;
1822 }
1823 Multi = true;
1824 }
1825 }
1826
1827// Check for overloaded node and return result
1828//
1829 if (!sp) return calcDelay(selR);
1830 RefCount(sp, Multi, selR.needSpace);
1831 return sp;
1832}
1833
1834/******************************************************************************/
1835/* S e l b y L o a d R */
1836/******************************************************************************/
1837
1838// Caller must have the STMutex locked. The returned node, if any, is unlocked.
1839
1840XrdCmsNode *XrdCmsCluster::SelbyLoadR(SMask_t mask, XrdCmsSelector &selR)
1841{
1842 static std::random_device rand_dev;
1843 static std::default_random_engine generator(rand_dev());
1844
1845 XrdCmsNode *np = nullptr, *sp = nullptr;
1846 bool reqSS = (selR.needSpace & XrdCmsNode::allowsSS) != 0;
1847
1848 // Scan for a node (preset possible, suspended, overloaded, full, and dead)
1849
1850 selR.Reset();
1851 SelTcnt++;
1852
1853 int totWeight = 0;
1854
1855 for (int i = 0; i <= STHi; ++i) {
1856 NodeWeight[i] = 0; // make node unselectable first
1857
1858 if (!((np = NodeTab[i]) && (np->NodeMask & mask)))
1859 continue;
1860
1861 if (!(selR.needNet & np->hasNet)) { selR.xNoNet = true; continue; }
1862
1863 selR.nPick++;
1864
1865 if (np->isOffline) { selR.xOff = true; continue; }
1866 if (np->isBad) { selR.xSusp = true; continue; }
1867 if (np->myLoad > Config.MaxLoad) { selR.xOvld = true; continue; }
1868
1869 if (selR.needSpace) {
1870 if (np->DiskFree < np->DiskMinF || (reqSS && np->isNoStage)) {
1871 selR.xFull = true;
1872 continue;
1873 }
1874 }
1875
1876 // If node passes filters, give it a weight
1877 totWeight += Config.P_fuzz + (100 - np->myLoad);
1878 NodeWeight[i] = totWeight;
1879 }
1880
1881 std::uniform_int_distribution<int> distr(1, totWeight);
1882 int selected = distr(generator);
1883
1884 for (int i = 0; i <= STHi; ++i) {
1885 if (NodeWeight[i] < selected)
1886 continue;
1887
1888 sp = NodeTab[i];
1889 break;
1890 }
1891
1892 return sp ? sp : calcDelay(selR);
1893}
1894
1895/******************************************************************************/
1896/* S e l b y R e f */
1897/******************************************************************************/
1898
1899// Caller must have the STMutex locked. The returned node, if any, is unlocked.
1900
1901XrdCmsNode *XrdCmsCluster::SelbyRef(SMask_t mask, XrdCmsSelector &selR)
1902{
1903 XrdCmsNode *np, *sp = 0;
1904 bool Multi = false, reqSS = (selR.needSpace & XrdCmsNode::allowsSS) != 0;
1905
1906// Scan for a node (sp points to the selected one)
1907//
1908 selR.Reset(); SelTcnt++;
1909 for (int i = 0; i <= STHi; i++)
1910 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1911 {if (!(selR.needNet & np->hasNet)) {selR.xNoNet= true; continue;}
1912 selR.nPick++;
1913 if (np->isOffline) {selR.xOff = true; continue;}
1914 if (np->isBad) {selR.xSusp = true; continue;}
1915 if (selR.needSpace && (np->DiskFree < np->DiskMinF
1916 || (reqSS && np->isNoStage)))
1917 {selR.xFull = true; continue;}
1918 if (!sp) sp = np;
1919 else {Multi = true;
1920 if (selR.selPack)
1921 {if (--selR.selPack) sp=np;
1922 else break;
1923 }
1924 else if (selR.needSpace)
1925 {if (sp->RefW > (np->RefW+Config.DiskLinger)) sp=np;}
1926 else if (sp->RefR > np->RefR) sp=np;
1927 }
1928 }
1929
1930// Check for overloaded node and return result
1931//
1932 if (!sp) return calcDelay(selR);
1933 RefCount(sp, Multi, selR.needSpace);
1934 return sp;
1935}
1936
1937/******************************************************************************/
1938/* S e l D F S */
1939/******************************************************************************/
1940
1941int XrdCmsCluster::SelDFS(XrdCmsSelect &Sel, SMask_t amask,
1942 SMask_t &pmask, SMask_t &smask, int isRW)
1943{
1944 EPNAME("SelDFS");
1945 static const SMask_t allNodes(~0);
1946 int oldOpts, rc;
1947
1948// The first task is to find out if the file exists somewhere. If we are doing
1949// local queries, then the answer will be immediate. Otherwise, forward it.
1950//
1951 if ((Sel.Opts & XrdCmsSelect::Refresh) || !(rc = Cache.GetFile(Sel, amask)))
1952 {if (!baseFS.Local())
1953 {CmsStateRequest QReq = {{Sel.Path.Hash, kYR_state, kYR_raw, 0}};
1954 TRACE(Files, "seeking " <<Sel.Path.Val);
1955 Cache.AddFile(Sel, 0);
1956 if (Sel.Opts & XrdCmsSelect::Refresh)
1958 Cluster.Broadsend(amask, QReq.Hdr, Sel.Path.Val, Sel.Path.Len+1);
1959 return 0;
1960 }
1961 if ((rc = baseFS.Exists(Sel.Path.Val, -Sel.Path.Len)) < 0)
1962 {Cache.AddFile(Sel, 0);
1963 Sel.Vec.bf = Sel.Vec.pf = Sel.Vec.wf = Sel.Vec.hf = 0;
1964 } else {
1965 Sel.Vec.hf = amask; Sel.Vec.wf = (isRW ? amask : 0);
1966 oldOpts = Sel.Opts;
1967 if (rc != CmsHaveRequest::Pending) Sel.Vec.pf = 0;
1968 else {Sel.Vec.pf = amask; Sel.Opts |= XrdCmsSelect::Pending;}
1969 Cache.AddFile(Sel, allNodes);
1970 Sel.Opts = oldOpts;
1971 }
1972 }
1973
1974// Screen out online requests where the file is pending
1975//
1976 if (Sel.Opts & XrdCmsSelect::Online && Sel.Vec.pf)
1977 {pmask = smask = 0;
1978 return 1;
1979 }
1980
1981// If the file is to be written and the files exists then it can't be a new file
1982//
1983 if (isRW && Sel.Vec.hf)
1984 {if (Sel.Opts & XrdCmsSelect::NewFile) return SelFail(Sel,eExists);
1985 if (Sel.Opts & XrdCmsSelect::Trunc) smask = 0;
1986 return 1;
1987 }
1988
1989// Final verification that we have something to select
1990//
1991 if (!Sel.Vec.hf
1992 && (!isRW || !(Sel.Opts & (XrdCmsSelect::Trunc | XrdCmsSelect::NewFile))))
1993 return SelFail(Sel, eNoEnt);
1994 return 1;
1995}
1996
1997/******************************************************************************/
1998/* s e n d A L i s t */
1999/******************************************************************************/
2000
2001// Single entry at a time, protected by STMutex in write mode!
2002
2003void XrdCmsCluster::sendAList(XrdLink *lp)
2004{
2005 static CmsTryRequest Req = {{0, kYR_try, 0, 0}, 0};
2006 static int HdrSize = sizeof(Req.Hdr) + sizeof(Req.sLen);
2007 static char *AltNext = AltMans;
2008 static struct iovec iov[4] = {{(caddr_t)&Req, (size_t)HdrSize},
2009 {0, 0},
2010 {AltMans, 0},
2011 {(caddr_t)"\0", 1}};
2012 int dlen;
2013
2014// Calculate what to send
2015//
2016 AltNext = AltNext + AltSize;
2017 if (AltNext >= AltMend)
2018 {AltNext = AltMans;
2019 iov[1].iov_len = 0;
2020 iov[2].iov_len = dlen = AltMend - AltMans;
2021 } else {
2022 iov[1].iov_base = (caddr_t)AltNext;
2023 iov[1].iov_len = AltMend - AltNext;
2024 iov[2].iov_len = AltNext - AltMans;
2025 dlen = iov[1].iov_len + iov[2].iov_len;
2026 }
2027
2028// Complete the request (account for trailing null character)
2029//
2030 dlen++;
2031 Req.Hdr.datalen = htons(static_cast<unsigned short>(dlen+sizeof(Req.sLen)));
2032 Req.sLen = htons(static_cast<unsigned short>(dlen));
2033
2034// Send the list of alternates (rotated once)
2035//
2036 lp->Send(iov, 4, dlen+HdrSize);
2037}
2038
2039/******************************************************************************/
2040/* s e t A l t M a n */
2041/******************************************************************************/
2042
2043// Single entry at a time, protected by STMutex in write mode!
2044
2045void XrdCmsCluster::setAltMan(int snum, XrdLink *lp, int port)
2046{
2047 XrdNetAddr altAddr = *(lp->NetAddr());
2048 char *ap = &AltMans[snum*AltSize];
2049 int i;
2050
2051// Preset the buffer and pre-screen the port number
2052//
2053 if (!port || (port > 0x0000ffff)) port = Config.PortTCP;
2054 memset(ap, int(' '), AltSize);
2055
2056// First tr to use the hostname:port which may be too large (unlikely). Else
2057// Insert the ip address of this node into the list of nodes. We made sure that
2058// the size of he buffer was big enough so no need to check for overflow.
2059//
2060 altAddr.Port(port);
2061 if (Config.DoHnTry) i = altAddr.Format(ap, AltSize, XrdNetAddr::fmtName);
2062 else i = 0;
2063 if (!i) i=altAddr.Format(ap,AltSize,XrdNetAddr::fmtAddr,XrdNetAddr::prefipv4);
2064 ap[i] = ' ';
2065
2066// Compute new fence
2067//
2068 if (ap >= AltMend) {AltMend = ap + AltSize; AltMent = snum;}
2069}
2070
2071/******************************************************************************/
2072/* U n r e a c h a b l e */
2073/******************************************************************************/
2074
2075int XrdCmsCluster::Unreachable(XrdCmsSelect &Sel, bool none)
2076{
2078 const char *Amode = (Sel.Opts & XrdCmsSelect::Write ? "write" : "read");
2079 const char *Xmode = (Sel.Opts & XrdCmsSelect::Online ? "immediately " : "");
2080
2081 if (none)
2082 {Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
2083 "No servers are reachable via %s network to %s%s the file.",
2084 XrdNetIF::Name(nType), Xmode, Amode) + 1;
2085 } else {
2086 Sel.Resp.DLen = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data)-1,
2087 "Eligible server is unreachable via %s network to %s%s the file.",
2088 XrdNetIF::Name(nType), Xmode, Amode) + 1;
2089 }
2090 Sel.Resp.Port = kYR_ENETUNREACH;
2091 return EReplete;
2092}
2093
2094/******************************************************************************/
2095/* U n u s e a b l e */
2096/******************************************************************************/
2097
2098int XrdCmsCluster::Unuseable(XrdCmsSelect &Sel)
2099{
2100 const char *Amode = (Sel.Opts & XrdCmsSelect::Write ? "write" : "read");
2101 const char *Xmode = (Sel.Opts & XrdCmsSelect::Online ? "immediately " : "");
2102 const char *EType = (Sel.Opts & XrdCmsSelect::isDir ? "directory" : "file");
2103
2104 int n = snprintf(Sel.Resp.Data, sizeof(Sel.Resp.Data),
2105 "No servers are available to %s%s the %s.",
2106 Xmode, Amode, EType);
2107 if (n < (int)sizeof(Sel.Resp.Data)) Sel.Resp.DLen = n+1;
2108 else Sel.Resp.DLen = sizeof(Sel.Resp.Data);
2109
2110 Sel.Resp.Port = kYR_ENOENT;
2111 return EReplete;
2112}
void Usage(const char *msg)
#define DEBUG(x)
#define EPNAME(x)
#define RefCount(sP, sPMulti, NeedSpace)
#define QTRACE(act)
#define STMax
unsigned long long SMask_t
#define FULLMASK
#define stat(a, b)
Definition XrdPosix.hh:96
bool Debug
bool Exists
XrdOucString Path
struct myOpts opts
if(ec< 0) ec
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACE(act, x)
Definition XrdTrace.hh:63
int Exists(XrdCmsRRData &Arg, XrdCmsPInfo &Who, int noLim=0)
static int Present(const char *hName, XrdOucTList *bList=0, char *rbuff=0, int rblen=0)
int GetFile(XrdCmsSelect &Sel, SMask_t mask)
int AddFile(XrdCmsSelect &Sel, SMask_t mask)
XrdCmsPList_Anchor Paths
int UnkFile(XrdCmsSelect &Sel, SMask_t mask)
void Drop(SMask_t mask, int SNum, int xHi)
int WT4File(XrdCmsSelect &Sel, SMask_t mask)
static XrdCmsClustID * AddID(const char *cID)
static SMask_t Mask(const char *cID)
XrdCmsNode * RemNode(XrdCmsNode *nP)
static XrdCmsClustID * Find(const char *cID)
bool AddNode(XrdCmsNode *nP, bool isMan)
SMask_t getMask(const XrdNetAddr *addr)
void Space(XrdCms::SpaceData &sData, SMask_t smask)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
int Select(XrdCmsSelect &Sel)
int Locate(XrdCmsSelect &Sel)
static const int EReplete
void ResetRef(SMask_t smask, bool isLocked=false)
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
XrdCmsSelected * List(SMask_t mask, CmsLSOpts opts, bool &oksel)
static const int RetryErr
XrdCmsNode * Add(XrdLink *lp, int dport, int Status, int sport, const char *theNID, const char *theIF)
static const int NotFound
static const int Wait4CBk
void Remove(XrdCmsNode *theNode)
int Stats(char *bfr, int bln)
virtual void BlackList(XrdOucTList *blP)
friend class XrdCmsDrop
int Statt(char *bfr, int bln)
static const int RepStat_shr
static const int RepStat_frq
XrdCmsDrop(int nid, int inst)
XrdCmsNode * nodeP
XrdCmsDrop(XrdCmsNode *nP)
unsigned int Hash
Definition XrdCmsKey.hh:53
char * Val
Definition XrdCmsKey.hh:52
short Len
Definition XrdCmsKey.hh:54
static const char allowsRW
Definition XrdCmsNode.hh:84
char * Ident
Definition XrdCmsNode.hh:61
void Delete(XrdSysRWLock &gMutex)
void n2gLock(XrdSysRWLock &gMutex, bool rdlock=false)
int Send(const char *buff, int blen=0)
static const char allowsSS
Definition XrdCmsNode.hh:85
void unRef()
void Lock()
static const char isDisabled
Definition XrdCmsNode.hh:80
int isNode(SMask_t smask)
SMask_t Mask()
char * Name()
char isOffline
Definition XrdCmsNode.hh:64
void g2nLock(XrdSysRWLock &gMutex)
static const char isSuspend
Definition XrdCmsNode.hh:81
unsigned int DiskTotal
Definition XrdCmsNode.hh:87
int ID(int &INum)
char isNoStage
Definition XrdCmsNode.hh:66
void Disc(const char *reason=0, int needLock=1)
void UnLock()
void setName(XrdLink *lnkp, const char *theIF, int port)
static const char isDoomed
Definition XrdCmsNode.hh:82
static const char isBlisted
Definition XrdCmsNode.hh:79
SMask_t ssvec
SMask_t rovec
SMask_t rwvec
int Find(const char *pname, XrdCmsPInfo &masks)
SMask_t rwVec
Definition XrdCmsRRQ.hh:59
void Statistics(Info &Data)
Definition XrdCmsRRQ.hh:144
static const char * Type(RoleID rid)
Definition XrdCmsRole.hh:78
struct XrdCmsSelect::@90 Vec
struct XrdCmsSelect::@91 Resp
XrdCmsRRQInfo * InfoP
XrdCmsKey Path
static const int IdentSize
char Ident[IdentSize]
XrdCmsSelected * next
const char * reason
void Update(StateType StateT, int ActivVal, int StageVal=0)
void Set(int ncount)
static const int prefipv4
Use if mapped IPV4 actual format.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
@ fmtName
Hostname if it is resolvable o/w use fmtAddr.
int Port(int pNum=-1)
char Mask()
Definition XrdNetIF.hh:242
static const char * Name(ifType ifT)
Definition XrdNetIF.hh:266
int Port()
Definition XrdNetIF.hh:276
bool HasDest(ifType ifT=PublicV6)
Definition XrdNetIF.hh:221
int GetName(const char *&name, ifType ifT=PublicV6)
Definition XrdNetIF.hh:102
int GetDest(char *dest, int dlen, ifType ifT=PublicV6, bool prefn=false)
Definition XrdNetIF.cc:389
ifType
The enum that is used to index into ifData to get appropriate interface.
Definition XrdNetIF.hh:64
@ PrivateIF
Definition XrdNetIF.hh:68
static int Pack(struct iovec **, const char *, unsigned short &buff)
Definition XrdOucPup.cc:52
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
XrdCmsRRQ RRQ
Definition XrdCmsRRQ.cc:55
XrdCmsCache Cache
static const unsigned char kYR_Version
Definition YProtocol.hh:80
kXR_unt16 datalen
Definition YProtocol.hh:86
@ kYR_ENETUNREACH
Definition YProtocol.hh:158
@ kYR_noReplicas
Definition YProtocol.hh:164
@ kYR_ENOENT
Definition YProtocol.hh:150
@ kYR_RWConflict
Definition YProtocol.hh:163
static const int CMS_isSuper
static const int CMS_noStage
kXR_char modifier
Definition YProtocol.hh:85
XrdScheduler * Sched
XrdCmsCluster Cluster
XrdCmsBaseFS baseFS
XrdSysError Say
XrdCmsState CmsState
static const int CMS_isMan
XrdCmsConfig Config
@ kYR_state
Definition YProtocol.hh:110
@ kYR_usage
Definition YProtocol.hh:116
static const int CMS_isPeer
static const int CMS_Suspend
long long luSlow
Definition XrdCmsRRQ.hh:139
long long rdSlow
Definition XrdCmsRRQ.hh:141
long long Resp
Definition XrdCmsRRQ.hh:136
long long luFast
Definition XrdCmsRRQ.hh:138
long long Add2Q
Definition XrdCmsRRQ.hh:134
long long Multi
Definition XrdCmsRRQ.hh:137
long long rdFast
Definition XrdCmsRRQ.hh:140
long long PBack
Definition XrdCmsRRQ.hh:135