XRootD
Loading...
Searching...
No Matches
XrdBwmLogger.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d B w m L o g g e r . c c */
4/* */
5/* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cctype>
32#include <stddef.h>
33#include <cstdlib>
34#include <cstdio>
35#include <cstring>
36
38#include "XrdSys/XrdSysError.hh"
39#include "XrdOuc/XrdOucProg.hh"
41#include "XrdNet/XrdNetOpts.hh"
44
45/******************************************************************************/
46/* L o c a l C l a s s e s */
47/******************************************************************************/
48
50{
51public:
52
53static const int msgSize = 2048;
54
57int Tlen;
58
60
62};
63
64/******************************************************************************/
65/* E x t e r n a l L i n k a g e s */
66/******************************************************************************/
67
68void *XrdBwmLoggerSend(void *pp)
69{
70 XrdBwmLogger *lP = (XrdBwmLogger *)pp;
71 lP->sendEvents();
72 return (void *)0;
73}
74
75/******************************************************************************/
76/* C o n s t r u c t o r */
77/******************************************************************************/
78
79XrdBwmLogger::XrdBwmLogger(const char *Target)
80{
81
82// Set common variables
83//
84 theTarget = strdup(Target);
85 eDest = 0;
86 theProg = 0;
87 msgFirst = msgLast = msgFree = 0;
88 tid = 0;
89 msgFD = 0;
90 endIT = 0;
91 theEOL= '\n';
92 msgsInQ = 0;
93}
94
95/******************************************************************************/
96/* D e s t r u c t o r */
97/******************************************************************************/
98
100{
101 XrdBwmLoggerMsg *tp;
102
103// Kill the notification thread. This may cause a msg block to be orphaned
104// but, in practice, this object does not really get deleted after being
105// started. So, the problem is moot.
106//
107 endIT = 1;
108 if (tid) XrdSysThread::Kill(tid);
109
110// Release all queued message bocks
111//
112 qMut.Lock();
113 while ((tp = msgFirst)) {msgFirst = tp->next; delete tp;}
114 if (theTarget) free(theTarget);
115 if (msgFD >= 0) close(msgFD);
116 if (theProg) delete theProg;
117 qMut.UnLock();
118
119// Release all free message blocks
120//
121 fMut.Lock();
122 while ((tp = msgFree)) {msgFree = tp->next; delete tp;}
123 fMut.UnLock();
124}
125
126/******************************************************************************/
127/* E v e n t */
128/******************************************************************************/
129
131{
132 static int warnings = 0;
133 XrdBwmLoggerMsg *tp;
134
135// Get a message block
136//
137 if (!(tp = getMsg()))
138 {if ((++warnings & 0xff) == 1)
139 eDest->Emsg("Notify", "Ran out of logger message objects;",
140 eInfo.Tident, "event not logged.");
141 return;
142 }
143
144// Format the message
145//
146 tp->Tlen = snprintf(tp->Text, XrdBwmLoggerMsg::msgSize,
147 "<stats id=\"bwm\"><tid>%s</tid><lfn>%s</lfn>"
148 "<lcl>%s</lcl><rmt>%s</rmt><flow>%c</flow>"
149 "<at>%lld</at><bt>%lld</bt><ct>%lld</ct>"
150 "<iq>%d</iq><oq>%d</oq><xq>%d</xq>"
151 "<sz>%lld<sz><esec>%d</esec></stats>%c",
152 eInfo.Tident, eInfo.Lfn, eInfo.lclNode, eInfo.rmtNode,
153 eInfo.Flow, (long long) eInfo.ATime,
154 (long long) eInfo.BTime, (long long) eInfo.CTime,
155 eInfo.numqIn, eInfo.numqOut, eInfo.numqXeq, eInfo.Size,
156 eInfo.ESec, theEOL);
157
158// Either log this or put the message on the queue and return
159//
160 tp->next = 0;
161 qMut.Lock();
162 if (msgLast) {msgLast->next = tp; msgLast = tp;}
163 else msgFirst = msgLast = tp;
164 qMut.UnLock();
165 qSem.Post();
166}
167
168/******************************************************************************/
169/* s e n d E v e n t s */
170/******************************************************************************/
171
173{
174 XrdBwmLoggerMsg *tp;
175 const char *theData[2] = {0,0};
176 int theDlen[2] = {0,0};
177
178// This is an endless loop that just gets things off the event queue and
179// send them out. This allows us to only hang a simgle thread should the
180// receiver get blocked, instead of the whole process.
181//
182 while(1)
183 {qSem.Wait();
184 qMut.Lock();
185 if (endIT) break;
186 if ((tp = msgFirst) && !(msgFirst = tp->next)) msgLast = 0;
187 qMut.UnLock();
188 if (tp)
189 {if (!theProg) Feed(tp->Text, tp->Tlen);
190 else {theData[0] = tp->Text; theDlen[0] = tp->Tlen;
191 theProg->Feed(theData, theDlen);
192
193 }
194 retMsg(tp);
195 }
196 }
197 qMut.UnLock();
198}
199
200/******************************************************************************/
201/* S t a r t */
202/******************************************************************************/
203
205{
206 int rc;
207
208// Set the error object pointer
209//
210 eDest = eobj;
211
212// Check if we need to create a socket to a path
213//
214 if (!strcmp("*", theTarget)) {msgFD = -1; theEOL = '\0';}
215 else if (*theTarget == '>')
216 {XrdNetSocket *msgSock;
217 if (!(msgSock = XrdNetSocket::Create(eobj, theTarget+1, 0, 0660,
218 XRDNET_FIFO))) return -1;
219 msgFD = msgSock->Detach();
220 delete msgSock;
221 }
222 else {// Allocate a new program object if we don't have one
223 //
224 if (theProg) return 0;
225 theProg = new XrdOucProg(eobj);
226
227 // Setup the program
228 //
229 if (theProg->Setup(theTarget, eobj)) return -1;
230 if ((rc = theProg->Start()))
231 {eobj->Emsg("Logger", rc, "start event collector"); return -1;}
232 }
233
234// Now start a thread to get messages and send them to the collector
235//
236 if ((rc = XrdSysThread::Run(&tid, XrdBwmLoggerSend, static_cast<void *>(this),
237 0, "Log message sender")))
238 {eobj->Emsg("Logger", rc, "create log message sender thread");
239 return -1;
240 }
241
242// All done
243//
244 return 0;
245}
246
247/******************************************************************************/
248/* P r i v a t e M e t h o d s */
249/******************************************************************************/
250/******************************************************************************/
251/* F e e d */
252/******************************************************************************/
253
254int XrdBwmLogger::Feed(const char *data, int dlen)
255{
256 int retc;
257
258// Send message to the log if need be
259//
260 if (msgFD < 0) {eDest->Say("", data); return 0;}
261
262// Write the data. since this is a udp socket all the data goes or none does
263//
264 do { retc = write(msgFD, (const void *)data, (size_t)dlen);}
265 while (retc < 0 && errno == EINTR);
266 if (retc < 0)
267 {eDest->Emsg("Feed", errno, "write to logger socket", theTarget);
268 return -1;
269 }
270
271// All done
272//
273 return 0;
274}
275
276/******************************************************************************/
277/* g e t M s g */
278/******************************************************************************/
279
280XrdBwmLoggerMsg *XrdBwmLogger::getMsg()
281{
282 XrdBwmLoggerMsg *tp;
283
284// Lock the free queue
285//
286 fMut.Lock();
287
288// Get message object but don't give out too many
289//
290 if (msgsInQ >= maxmInQ) tp = 0;
291 else {if ((tp = msgFree)) msgFree = tp->next;
292 else tp = new XrdBwmLoggerMsg();
293 msgsInQ++;
294 }
295
296// Unlock and return result
297//
298 fMut.UnLock();
299 return tp;
300}
301
302/******************************************************************************/
303/* r e t M s g */
304/******************************************************************************/
305
306void XrdBwmLogger::retMsg(XrdBwmLoggerMsg *tp)
307{
308
309// Lock the free queue, return message, unlock the queue
310//
311 fMut.Lock();
312 tp->next = msgFree;
313 msgFree = tp;
314 msgsInQ--;
315 fMut.UnLock();
316}
void * XrdBwmLoggerSend(void *pp)
#define XRDNET_FIFO
Definition XrdNetOpts.hh:83
#define close(a)
Definition XrdPosix.hh:43
#define write(a, b, c)
Definition XrdPosix.hh:110
char Text[msgSize]
XrdBwmLoggerMsg * next
static const int msgSize
const char * rmtNode
const char * lclNode
XrdBwmLogger(const char *Target)
int Start(XrdSysError *eobj)
const char * Tident
void Event(Info &eInfo)
void sendEvents(void)
static XrdNetSocket * Create(XrdSysError *Say, const char *path, const char *fn, mode_t mode, int isudp=0)
int Start(void)
int Feed(const char *data[], const int dlen[])
Definition XrdOucProg.cc:63
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static int Kill(pthread_t tid)