XRootD
Loading...
Searching...
No Matches
XrdPollPoll.icc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d P o l l P o l l . i c c */
4/* */
5/* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <signal.h>
31#include <cstdlib>
32
33#include "Xrd/XrdLinkCtl.hh"
34#include "Xrd/XrdPollPoll.hh"
35#include "Xrd/XrdScheduler.hh"
36
37/******************************************************************************/
38/* n e w P o l l e r */
39/******************************************************************************/
40
41XrdPoll *XrdPoll::newPoller(int pollid, int maxfd)
42{
43 int bytes, alignment, pagsz = getpagesize();
44 struct pollfd *pp;
45
46// Calculate the size of the poll table and allocate it
47//
48 bytes = maxfd * sizeof(struct pollfd);
49 alignment = (bytes < pagsz ? 1024 : pagsz);
50 if (posix_memalign((void **)&pp, alignment, bytes))
51 {Log.Emsg("Poll", ENOMEM, "create poll table");
52 return 0;
53 }
54
55// Create new poll object
56//
57 memset((void *)pp, 0, bytes);
58 return (XrdPoll *)new XrdPollPoll(pp, maxfd);
59}
60
61/******************************************************************************/
62/* C o n s t r c u t o r */
63/******************************************************************************/
64
65XrdPollPoll::XrdPollPoll(struct pollfd *pp, int numfd)
66{
67
68// Initialize the standard stuff
69//
70 PollTab = pp;
71 PollTNum= 0;
72 PollQ = 0;
73 maxent = numfd;
74}
75
76/******************************************************************************/
77/* D e s t r u c t o r */
78/******************************************************************************/
79
81{
82 if (PollTab) free(PollTab);
83}
84
85/******************************************************************************/
86/* I n c l u d e */
87/******************************************************************************/
88
90{
91 struct pollfd *pfd;
92 int ptnum;
93
94// Lock down the poll data structure
95//
96 PollMutex.Lock();
97
98// Get the next entry to be used
99//
100 ptnum = 0;
101 while((ptnum < PollTNum) && (PollTab[ptnum].fd != -1)) ptnum++;
102
103// Make sure we have enough table entries to add this link
104//
105 if (ptnum > maxent)
106 {Log.Emsg("Attach","Attach",pInfo.Link.ID,"failed; poll table overflow.");
107 PollMutex.UnLock();
108 return 0;
109 }
110
111// Initialize the polltable entry
112//
113 pfd = &(PollTab[ptnum]);
114 pfd->fd = -pInfo.FD;
115 pfd->events = POLLIN | POLLRDNORM;
116 pfd->revents = 0;
117
118// Record relevant information in the link
119//
120 pInfo.PollEnt = pfd;
121 if (ptnum == PollTNum) PollTNum++;
122
123// All done
124//
125 PollMutex.UnLock();
126 return 1;
127}
128
129/******************************************************************************/
130/* D i s a b l e */
131/******************************************************************************/
132
133void XrdPollPoll::Disable(XrdPollInfo &pInfo, const char *etxt)
134{
135 XrdSysSemaphore mySem(0);
136 PipeData cmdbuff[2];
137 int myerrno = 0;
138
139// Check if this link is in the pollQ. If so, remove it.
140//
141 if (pInfo.inQ) dqLink(&pInfo);
142
143// Simply return if the link is already disabled
144//
145 if (!pInfo.isEnabled) return;
146
147// Trace this event
148//
149 TRACEI(POLL, "Poller " <<PID <<" async disabling link FD " <<pInfo.FD);
150
151// Send a disable request to the poller thread handling this link. We need to
152// wait until the operation is actually completed before returning.
153//
154 memset(&cmdbuff, 0, sizeof(cmdbuff));
155 cmdbuff[0].req = PipeData::DiFD;
156 cmdbuff[0].Parms.Arg.fd = pInfo.FD;
157 cmdbuff[0].Parms.Arg.ent = pInfo.PollEnt - PollTab;
158 cmdbuff[1].req = PipeData::Post;
159 cmdbuff[1].Parms.theSem = &mySem;
160 PollPipe.Lock();
161 if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
163
164// Verify that all went well and if termination wanted, terminate the link
165//
166 if (myerrno) Log.Emsg("Poll", myerrno, "disable link", pInfo.Link.ID);
167 else {mySem.Wait();
168 if (etxt && Finish(pInfo, etxt))
169 Sched.Schedule((XrdJob *)&pInfo.Link);
170 }
171}
172
173/******************************************************************************/
174/* E n a b l e */
175/******************************************************************************/
176
178{
179 PipeData cmdbuff;
180 int myerrno = 0;
181
182// Simply return if the link is already enabled
183//
184 if (pInfo.isEnabled) return 1;
185
186// Add this link element to the queue
187//
188 PollMutex.Lock();
189 pInfo.Next = PollQ;
190 PollQ = &pInfo;
191 pInfo.inQ = true;
192 PollMutex.UnLock();
193
194// Send an enable request to the poller thread handling this link
195//
196 TRACEI(POLL, "sending poller " <<PID <<" enable for link " <<pInfo.FD);
197 cmdbuff.req = PipeData::EnFD;
198 cmdbuff.Parms.Arg.fd = pInfo.FD;
199 cmdbuff.Parms.Arg.ent = pInfo.PollEnt - PollTab;
200 PollPipe.Lock();
201 if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
203
204// Verify that all went well. Note that the link stays in the pollQ.
205//
206 if (myerrno)
207 {Log.Emsg("Poll", myerrno, "enable link", pInfo.Link.ID); return 0;}
208
209// All done
210//
211 return 1;
212}
213
214/******************************************************************************/
215/* E x c l u d e */
216/******************************************************************************/
217
219{
220 XrdSysSemaphore mySem(0);
221 PipeData cmdbuff[2];
222 int myerrno = 0;
223
224// Make sure this link is not enabled
225//
226 if (pInfo.isEnabled)
227 {Log.Emsg("Poll", "Detach of enabled link", pInfo.Link.ID);
228 Disable(pInfo);
229 }
230 else if (pInfo.inQ) dqLink(&pInfo);
231
232// Send a deatch request to the poller thread handling this link
233//
234 TRACEI(POLL, "sending poller " <<PID <<" detach for link " <<pInfo.FD);
235 cmdbuff[0].req = PipeData::RmFD;
236 cmdbuff[0].Parms.Arg.fd = pInfo.FD;
237 cmdbuff[0].Parms.Arg.ent = pInfo.PollEnt - PollTab;
238 cmdbuff[1].req = PipeData::Post;
239 cmdbuff[1].Parms.theSem = &mySem;
240 PollPipe.Lock();
241 if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
243
244// Verify that all went well and if termination wanted, terminate the link
245//
246 if (myerrno) Log.Emsg("Poll", myerrno, "detach link", pInfo.Link.ID);
247 else mySem.Wait();
248}
249
250/******************************************************************************/
251/* S t a r t */
252/******************************************************************************/
253
254void XrdPollPoll::Start(XrdSysSemaphore *syncsem, int &retcode)
255{
256 int numpolled, num2sched;
257 XrdJob *jfirst, *jlast;
258 XrdPollInfo *plp, *nlp, *pInfo;
259 XrdLink *lp;
260 short pollevents;
261 const short pollOK = POLLIN | POLLRDNORM;
262
263// Set up he first entry in the poll table to be our communications port
264//
265 PollTab[0].fd = ReqFD;
266 PollTab[0].events = pollOK;
267 PollTab[0].revents = 0;
268 PollTNum = 1;
269
270// Signal the caller to continue
271//
272 retcode = 0;
273 syncsem->Post();
274
275// Now do the main poll loop
276//
277 do {do {numpolled = poll(PollTab, PollTNum, -1);}
278 while(numpolled < 0 && (errno == EAGAIN || errno == EINTR));
279
280 // Check if we had a polling error
281 //
282 if (numpolled < 0)
283 {if (errno != EINTR) Restart(errno);
284 else numInterrupts++;
285 continue;
286 }
287 numEvents += numpolled;
288
289 // Check out base poll table entry, we can do this without a lock
290 //
291 if (PollTab[0].revents & pollOK)
292 {doRequests(numpolled);
293 if (--numpolled <= 0) continue;
294 }
295
296 // Checkout which links must be dispatched (do this locked)
297 //
298 PollMutex.Lock();
299 plp = 0; nlp = PollQ; jfirst = jlast = 0; num2sched = 0;
300 while ((pInfo = nlp) && numpolled > 0)
301 {if ((pollevents = pInfo->PollEnt->revents))
302 {pInfo->PollEnt->fd = -pInfo->PollEnt->fd;
303 if (plp) nlp = plp->Next = pInfo->Next;
304 else nlp = PollQ = pInfo->Next;
305 numpolled--; pInfo->inQ = false;
306 if (!(pollevents & pollOK))
307 Finish(*pInfo, Poll2Text(pollevents));
308 lp = &(pInfo->Link);
309 if (!(pInfo->isEnabled))
310 Log.Emsg("Poll", "Disabled event occurred for", lp->ID);
311 else {pInfo->isEnabled = false;
312 lp->NextJob = jfirst; jfirst = (XrdJob *)lp;
313 if (!jlast) jlast=(XrdJob *)lp;
314 num2sched++;
315 continue;
316 }
317 }
318 plp = pInfo; nlp = pInfo->Next;
319 }
320 if (numpolled) Recover(numpolled);
321 PollMutex.UnLock();
322
323 // Schedule the polled links
324 //
325 if (num2sched == 1) Sched.Schedule(jfirst);
326 else if (num2sched) Sched.Schedule(num2sched, jfirst, jlast);
327 } while(1);
328}
329
330/******************************************************************************/
331/* P r i v a t e M e t h o d s */
332/******************************************************************************/
333/******************************************************************************/
334/* d o D e t a c h */
335/******************************************************************************/
336
338{
339 int lastent;
340
341// Get some starting values
342//
343 PollMutex.Lock();
344 if ((lastent = PollTNum-1) < 0)
345 {Log.Emsg("Poll","Underflow during detach"); abort();}
346
347 if (pti == lastent)
348 do {PollTNum--;} while(PollTNum && PollTab[PollTNum-1].fd == -1);
349 PollMutex.UnLock();
350}
351
352/******************************************************************************/
353/* d o R e q u e s t s */
354/******************************************************************************/
355
356void XrdPollPoll::doRequests(int maxreq)
357{
358 const char *act;
359 int pti, ptfd, num2do;
360 XrdPollInfo *piP;
361
362// To keep ourselves from being swamped, base request read-aheads on the number
363// of pending poll events.
364//
365 num2do = (maxreq < 3 ? -1 : maxreq);
366
367// Now process all poll table manipulation requests
368//
369 while(num2do-- && getRequest())
370 { if (ReqBuff.req == PipeData::Post)
371 {ReqBuff.Parms.theSem->Post();
372 continue;
373 }
374 pti = ReqBuff.Parms.Arg.ent;
375 if ((ptfd = abs(PollTab[pti].fd)) != ReqBuff.Parms.Arg.fd)
376 {LogEvent(ReqBuff.req, PollTab[pti].fd, ReqBuff.Parms.Arg.fd);
377 continue;
378 }
379 if (!(piP = XrdLinkCtl::fd2PollInfo(ptfd)))
380 {LogEvent(ReqBuff.req, -1, ptfd); continue;}
381 if (ReqBuff.req == PipeData::EnFD)
382 {PollTab[pti].events = POLLIN | POLLRDNORM;
383 PollTab[pti].fd = ptfd;
384 piP->isEnabled = true; numEnabled++;
385 act = " enabled fd ";
386 }
387 else if (ReqBuff.req == PipeData::DiFD)
388 {PollTab[pti].fd = -ptfd;
389 act = " disabled fd ";
390 piP->isEnabled = false;
391 }
392 else if (ReqBuff.req == PipeData::RmFD)
393 {PollTab[pti].fd = -1;
394 doDetach(pti);
395 act = " detached fd ";
396 piP->isEnabled = false;
397 }
398 else {Log.Emsg("Poll", "Received an invalid poll pipe request");
399 continue;
400 }
401 TRACE(POLL, "Poller " <<PID <<act <<ReqBuff.Parms.Arg.fd
402 <<" entry " <<pti <<" now at " <<PollTNum);
403 }
404}
405
406/******************************************************************************/
407/* d q L i n k */
408/******************************************************************************/
409
410void XrdPollPoll::dqLink(XrdPollInfo *pInfo)
411{
412 XrdPollInfo *plp, *nlp;
413
414// Find matching link in the queue
415//
416 PollMutex.Lock();
417 pInfo->inQ = false;
418 plp = 0; nlp = PollQ;
419 while (nlp && (pInfo != nlp)) {plp=nlp; nlp = nlp->Next;}
420
421// If we found the link, remove it. Otherwise complain
422//
423 if (nlp) {if (plp) plp->Next = nlp->Next;
424 else PollQ = nlp->Next;
425 PollMutex.UnLock();
426 }
427 else {PollMutex.UnLock();
428 Log.Emsg("dqLink", "Link not found in Q", pInfo->Link.ID);
429 }
430}
431
432/******************************************************************************/
433/* L o g E v e n t */
434/******************************************************************************/
435
436void XrdPollPoll::LogEvent(int req, int pollfd, int cmdfd)
437{
438 const char *opn, *id1, *id2;
439 char buff[4096];
440 XrdLink *lp;
441
442 if (ReqBuff.req == PipeData::EnFD) opn = "enable";
443 else if (ReqBuff.req == PipeData::DiFD) opn = "disable";
444 else if (ReqBuff.req == PipeData::RmFD) opn = "detach";
445 else opn = "???";
446
447 if (pollfd < 0)
448 {sprintf(buff, "poll %d failed; FD %d", PID, cmdfd);
449 Log.Emsg("Poll", opn, buff, "does not map to a link");
450 return;
451 }
452
453 if ((lp = XrdLinkCtl::fd2link(pollfd))) id1 = lp->ID;
454 else id1 = "unknown";
455 if ((lp = XrdLinkCtl::fd2link(cmdfd))) id2 = lp->ID;
456 else id2 = "unknown";
457 snprintf(buff, sizeof(buff)-1,
458 "%d poll fd=%d (%s) not equal %s cmd fd=%d (%s).",
459 PID, pollfd, id1, opn, cmdfd, id2);
460
461 Log.Emsg("Poll", "cmd/poll mismatch:", buff);
462}
463
464/******************************************************************************/
465/* R e c o v e r */
466/******************************************************************************/
467
468void XrdPollPoll::Recover(int numleft)
469{
470 int i;
471 XrdPollInfo *piP;
472
473// Turn off any unaccounted links
474//
475 for (i = 1; i < PollTNum; i++)
476 if (PollTab[i].revents)
477 {if (!(piP = XrdLinkCtl::fd2PollInfo(PollTab[i].fd)))
478 PollTab[i].fd = -1;
479 else {piP->isEnabled = false;
480 PollTab[i].fd = -PollTab[i].fd;
481 Log.Emsg("Poll","Improper poll event for",piP->Link.ID);
482 }
483 }
484}
485
486/******************************************************************************/
487/* R e s t a r t */
488/******************************************************************************/
489
490void XrdPollPoll::Restart(int ecode)
491{
492 XrdPollInfo *pInfo;
493
494// Issue error message
495//
496 TRACE(POLL, PID <<'-' <<TID <<" Poll error " <<ecode);
497 Log.Emsg("Poll", errno, "poll");
498
499// For any outstanding link here, close the link and detach it
500//
501 PollMutex.Lock();
502 while((pInfo = PollQ))
503 {PollQ = pInfo->Next;
504 pInfo->PollEnt->fd = -1;
505 Finish(*pInfo, "Unexpected polling error");
506 Sched.Schedule((XrdJob *)&(pInfo->Link));
507 }
508 PollMutex.UnLock();
509}
#define write(a, b, c)
Definition XrdPosix.hh:110
#define TRACE(act, x)
Definition XrdTrace.hh:63
#define TRACEI(act, x)
Definition XrdTrace.hh:66
XrdJob * NextJob
Definition XrdJob.hh:46
static XrdLink * fd2link(int fd)
Definition XrdLinkCtl.hh:72
static XrdPollInfo * fd2PollInfo(int fd)
struct pollfd * PollEnt
XrdPollInfo * Next
XrdLink & Link
int Include(XrdPollInfo &pInfo)
XrdPollPoll(struct pollfd *pp, int numfd)
void doDetach(int pti)
void Start(XrdSysSemaphore *syncp, int &rc)
void Disable(XrdPollInfo &pInfo, const char *etxt=0)
void Exclude(XrdPollInfo &pInfo)
int Enable(XrdPollInfo &pInfo)
int numInterrupts
Definition XrdPoll.hh:134
pthread_t TID
Definition XrdPoll.hh:83
int PID
Definition XrdPoll.hh:82
XrdSysMutex PollPipe
Definition XrdPoll.hh:115
int ReqFD
Definition XrdPoll.hh:118
int numEvents
Definition XrdPoll.hh:133
int getRequest()
Definition XrdPoll.cc:232
PipeData ReqBuff
Definition XrdPoll.hh:126
static char * Poll2Text(short events)
Definition XrdPoll.cc:272
static XrdPoll * newPoller(int pollid, int numfd)
Definition XrdPollE.icc:45
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
Definition XrdPoll.cc:204
int numEnabled
Definition XrdPoll.hh:132
int CmdFD
Definition XrdPoll.hh:117
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysError Log
Definition XrdConfig.cc:111
XrdScheduler Sched
Definition XrdLinkCtl.cc:54
union XrdPoll::PipeData::@1 Parms