XRootD
Loading...
Searching...
No Matches
XrdSsiShMam.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S s i S h M a m . c c */
4/* */
5/* (c) 2015 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <fcntl.h>
31#include <cstdint>
32#include <cstdio>
33#include <cstdlib>
34#include <cstring>
35#include <ctime>
36#include <unistd.h>
37#include <sys/file.h>
38#include <sys/mman.h>
39#include <sys/stat.h>
40#include <sys/types.h>
41#include <zlib.h>
42#include <iostream>
43
44#include "XrdSsi/XrdSsiShMam.hh"
45#include "XrdSys/XrdSysE2T.hh"
46
47
48/* Gentoo removed OF from their copy of zconf.h but we need it here.
49 See https://bugs.gentoo.org/show_bug.cgi?id=383179 for the sad history.
50 This patch modelled after https://trac.osgeo.org/gdal/changeset/24622
51*/
52#ifndef OF
53#define OF(args) args
54#endif
55
56/******************************************************************************/
57/* S h a r e d M e m o r y I n f o r m a t i o n S t r u c t u r e */
58/******************************************************************************/
59
60namespace
61{
62struct ShmInfo
63 {int verNum; // Always 1st fout bytes
64 int index; // Offset of index
65 int slots; // Number of slots in index
66 int slotsUsed; // Number of entries in use
67 int itemCount; // Number of items in this map
68 int typeSz; // Size of the data payload
69 int itemSz; // Size of each item
70 int keyPos; // Position of key in item
71 int freeItem; // Offset to item on the free list
72 int freeCount; // Number of items on the free list
73 int infoSz; // Size of header also original lowFree
74 int lowFree; // Offset to low memory that is free
75 int highUse; // Offset to high memory that is used
76 char reUse; // When non-zero items can be reused (r/o locking)
77 char multW; // When non-zero multiple writers are allowed
78 char rsvd1;
79 char rsvd2;
80 int maxKeys; // Maximum number of keys
81 int maxKeySz; // Longest allowed key (not including null byte)
82 int hashID; // The name of the hash
83 char typeID[64]; // Name of the type stored here
84 char myName[64]; // Name of the implementation
85 };
86#define SHMINFO(x) ((ShmInfo *)shmBase)->x
87
88#define SHMADDR(type, offs) (type *)(shmBase + offs)
89
90#define SHMOFFS(addr) (char *)addr - shmBase
91
92#define ITEM_KEY(x) (char *)x + sizeof(MemItem) + keyPos
93
94#define ITEM_VAL(x) (char *)x + sizeof(MemItem)
95
96#define ITEM_VOF(x) (char *)x + sizeof(MemItem) - shmBase
97
98int PageMask = ~(sysconf(_SC_PAGESIZE)-1);
99int PageSize = sysconf(_SC_PAGESIZE);
100}
101
102/******************************************************************************/
103/* L o c a l C l a s s e s */
104/******************************************************************************/
105
106namespace
107{
108class EnumJar
109{public:
110char *buff;
111int fd;
112int iNum;
113 EnumJar(int xfd, int bsz)
114 : buff(new char[bsz]), fd(xfd), iNum(0) {}
115 ~EnumJar() {if (fd >= 0) close(fd);
116 if (buff) delete [] buff;
117 }
118};
119
120class FileHelper
121{
122public:
123bool autoClose;
124
125 FileHelper(XrdSsiShMam *mp) : autoClose(false), shMamP(mp) {}
126 ~FileHelper() {if (autoClose)
127 {int rc = errno; shMamP->Detach(); errno = rc;}
128 }
129private:
130XrdSsiShMam *shMamP;
131};
132
133class MutexHelper
134{
135public:
136pthread_rwlock_t *mtxP;
137
138 MutexHelper(pthread_rwlock_t *mtx, XrdSsiShMam::LockType isrw)
139 : mtxP(mtx)
140 {if (mtx)
141 {if (isrw) pthread_rwlock_wrlock(mtx);
142 else pthread_rwlock_rdlock(mtx);
143 }
144 }
145
146 ~MutexHelper() {if (mtxP) pthread_rwlock_unlock(mtxP);}
147};
148}
149
150/******************************************************************************/
151/* F i l e D e s c r i p t o r H a n d l i n g */
152/******************************************************************************/
153
154namespace
155{
156#if ( defined(__linux__) || defined(__GNU__) ) && defined(O_CLOEXEC) && defined(F_DUPFD_CLOEXEC)
157inline int ShMam_Dup(int oldfd)
158 {return fcntl(oldfd, F_DUPFD_CLOEXEC, 0);}
159
160inline int ShMam_Open(const char *path, int flags)
161 {return open(path, flags|O_CLOEXEC);}
162
163inline int ShMam_Open(const char *path, int flags, mode_t mode)
164 {return open(path, flags|O_CLOEXEC, mode);}
165#else
166inline int ShMam_Dup(int oldfd)
167 {int newfd = dup(oldfd);
168 if (newfd >= 0) fcntl(newfd, F_SETFD, FD_CLOEXEC);
169 return newfd;
170 }
171
172inline int ShMam_Open(const char *path, int flags)
173 {int newfd = open(path, flags);
174 if (newfd >= 0) fcntl(newfd, F_SETFD, FD_CLOEXEC);
175 return newfd;
176 }
177
178inline int ShMam_Open(const char *path, int flags, mode_t mode)
179 {int newfd = open(path, flags, mode);
180 if (newfd >= 0) fcntl(newfd, F_SETFD, FD_CLOEXEC);
181 return newfd;
182 }
183#endif
184
185inline bool ShMam_Flush(int fd)
186{
187#if _POSIX_SYNCHRONIZED_IO > 0
188 return fdatasync(fd) == 0;
189#else
190 return fsync(fd) == 0;
191#endif
192}
193/*
194inline bool ShMam_Flush(void *memP, int sOpt)
195{
196 if (msync((void *)((uintptr_t)memP & PageMask), PageSize, sOpt))
197 return true;
198 std::cerr <<"ShMam: msync() failed; " <<XrdSysE2T(errno) <<std::endl;
199 return false;
200}
201*/
202/*
203inline bool ShMam_Flush(void *memP, int mLen, int sOpt)
204{ uintptr_t memB = ((uintptr_t)memP) & PageMask;
205 uintptr_t memE = ((uintptr_t)memP) + mLen;
206 int rc;
207 if ((rc = msync((void *)memB, memE-memB, sOpt)))
208 std::cerr <<"ShMam: msync() failed; " <<XrdSysE2T(errno) <<std::endl;
209 return rc == 0;
210}
211*/
212}
213
214/******************************************************************************/
215/* C o n s t r u c t o r */
216/******************************************************************************/
217
219{
220
221// Initialize common stuff
222//
223 shmTemp = 0;
224 shmSize = 0;
225 shmBase = 0;
226 shmFD =-1;
227 timeOut =-1;
228 lkCount = 0;
229 syncLast = 0;
230 syncOpt = 0;
231 syncQWR = 0;
232 syncQSZ = 0;
233 syncOn = false;
234 syncBase = false;
235 isRW = false;
236 lockRO = true;
237 lockRW = true;
238 reUse = false;
239 useAtomic = true;
240
241// Initialize r/w mutexes
242//
243 pthread_mutex_init(&lkMutex, NULL);
244 pthread_rwlock_init(&myMutex, NULL);
245}
246
247/******************************************************************************/
248/* A d d I t e m */
249/******************************************************************************/
250
251bool XrdSsiShMam::AddItem(void *newdata, void *olddata, const char *key,
252 int hash, bool replace)
253{
254 XLockHelper lockInfo(this, RWLock);
255 MemItem *theItem, *prvItem, *newItem;
256 int hEnt, kLen, iOff, retEno = 0;
257
258// Make sure we can allocate a new item
259//
260 if (!shmSize) {errno = ENOTCONN; return false;}
261 if (!isRW) {errno = EROFS; return false;}
262
263// Verify key length
264//
265 kLen = strlen(key);
266 if (kLen > maxKLen) {errno = ENAMETOOLONG; return false;}
267
268// Check if we need to remap this memory (atomic tests is not needed here).
269// We need to do this prior to file locking as the requirements may change.
270//
271 if (verNum != SHMINFO(verNum)) ReMap(RWLock);
272
273// Lock the file if we have multiple writers or recycling items
274//
275 if (lockRW && !lockInfo.FLock()) return false;
276
277// First try to find the item
278//
279 hEnt = Find(theItem, prvItem, key, hash);
280
281// If we found it then see if we can replace it. If so and we can reuse the
282// the item, then just update the data portion. Otherwise, we need to get a
283// new item and replace the existing item.
284//
285 if (hEnt)
286 {if (olddata) memcpy(olddata, ITEM_VAL(theItem), shmTypeSz);
287 if (!replace) {errno = EEXIST; return false;}
288 if (reUse)
289 {memcpy(ITEM_VAL(theItem), newdata, shmTypeSz);
290 if (syncOn) Updated(ITEM_VOF(theItem), shmTypeSz);
291 errno = EEXIST;
292 return true;
293 }
294 retEno = EEXIST;
295 }
296
297// Get a new item
298//
299 if (!(newItem = NewItem())) {errno = ENOSPC; return false;}
300
301// Construct the new item
302//
303 newItem->hash = hash;
304 memcpy(ITEM_VAL(newItem), newdata, shmTypeSz);
305 strcpy(ITEM_KEY(newItem), key);
306
307// If we are replacing an item then We need to bridge over the item we are
308// replacing in a way that doesn't make the item disappear for other readers.
309// Otherwise, we can patch in the new item either on the last item in the chain
310// or directly off the table. Note that releasing the lock creates a memory
311// fence. To understand why this this works consider the relationship between:
312// hEnt prvItem The state of the table
313// 0 0 Not found because index table slot is zero
314// 0 !0 Not found in a chain of items, prvItem is the last one
315// !0 0 Was found and is the first or only item in the chain
316// !0 !0 Was found and is in the middle or end of the chain
317//
318//
319 if (hEnt) Atomic_SET(newItem->next, theItem->next); // Atomic
320 else {hEnt = (unsigned int)hash % shmSlots;
321 if (hEnt == 0) hEnt = 1;
322 SHMINFO(itemCount)++;
323 }
324
325 iOff = SHMOFFS(newItem);
326 if (prvItem) Atomic_SET_STRICT(prvItem->next, iOff); // Atomic
327 else {SHMINFO(slotsUsed)++;
328 Atomic_SET_STRICT(shmIndex[hEnt],iOff); // Atomic
329 if (syncOn) Updated(SHMOFFS(&shmIndex[hEnt]));
330 }
331
332// Indicate which things we changed if we have syncing
333//
334 if (syncOn)
335 {Updated(0);
336 Updated(SHMOFFS(newItem));
337 if (prvItem) Updated(SHMOFFS(prvItem));
338 }
339
340// All done, return result
341//
342 errno = retEno;
343 return true;
344}
345
346/******************************************************************************/
347/* A t t a c h */
348/******************************************************************************/
349
350bool XrdSsiShMam::Attach(int tout, bool isrw)
351{
352 FileHelper fileHelp(this);
353 XLockHelper lockInfo(this, (isrw ? RWLock : ROLock));
354 struct stat Stat1, Stat2;
355 int mMode, oMode;
356 union {int *intP; Atomic(int) *antP;} xntP;
357
358// Compute open and mmap options
359//
360 if (isrw)
361 {oMode = O_RDWR;
362 mMode = PROT_READ|PROT_WRITE;
363 isRW = true;
364 } else {
365 oMode = O_RDONLY;
366 mMode = PROT_READ;
367 isRW = false;
368 }
369
370// Attempt to open the file
371//
372 timeOut = tout;
373 if (tout < 0) tout = 0x7fffffff;
374 while((shmFD = ShMam_Open(shmPath, oMode)) < 0 && tout >= 0)
375 {if (errno != ENOENT) return false;
376 if (!tout) break;
377 Snooze(3);
378 tout -= 3;
379 }
380
381// Test if we timed out
382//
383 if (tout <= 0) {errno = ETIMEDOUT; return false;}
384 fileHelp.autoClose = true;
385
386// Lock this file as we don't want it changing on us for now
387//
388 if (!lockInfo.FLock()) return false;
389
390// Get the stat information for this file
391//
392 if (fstat(shmFD, &Stat1)) return false;
393
394// The file is open, try to memory map it
395//
396 shmBase = (char *)mmap(0, Stat1.st_size, mMode, MAP_SHARED, shmFD, 0);
397 if (shmBase == MAP_FAILED) return false;
398 shmSize = Stat1.st_size;
399
400// Make sure we have a valid hash name
401//
402 if (!shmHash) memcpy(&shmHash, "c32 ", sizeof(int));
403
404// Verify tha the objects in this mapping are compatible with this object
405//
406 if (SHMINFO(typeSz) != shmTypeSz || strcmp(shmType, SHMINFO(typeID))
407 || strcmp(shmImpl, SHMINFO(myName)) || shmHash != SHMINFO(hashID))
408 {errno = EDOM; return false;}
409
410// Copy out the information we can use locally
411//
412 verNum = SHMINFO(verNum);
413 keyPos = SHMINFO(keyPos);
414 maxKLen = SHMINFO(maxKeySz);
415 xntP.intP = SHMADDR(int, SHMINFO(index)); shmIndex = xntP.antP;
416 shmSlots = SHMINFO(slots);
417 shmItemSz = SHMINFO(itemSz);
418 shmInfoSz = SHMINFO(infoSz);
419
420// Now, there is a loophole here as the file could have been exported while
421// we were trying to attach it. If this happened, the inode would change.
422// We test for this now. If it changed, tell the caller to try again.
423//
424 if (stat(shmPath, &Stat2)
425 || Stat1.st_dev != Stat2.st_dev || Stat1.st_ino != Stat2.st_ino)
426 {errno = EAGAIN; return false;}
427 accMode = Stat2.st_mode & (S_IRWXU|S_IRWXG|S_IRWXO);
428
429// Set locking based on how the table was created
430//
431 SetLocking(isrw);
432 fileHelp.autoClose = false;
433 return true;
434}
435
436/******************************************************************************/
437/* C r e a t e */
438/******************************************************************************/
439
441{
442 static const int minInfoSz = 256;
443 static const int okMode = S_IRWXU|S_IRWXG|S_IROTH;
444 static const int crMode = S_IRWXU|S_IRWXG|S_IROTH;
445 FileHelper fileHelp(this);
446 ShmInfo theInfo;
447 int n, maxEnts, totSz, indexSz;
448 union {int *intP; Atomic(int) *antP;} xntP;
449
450// Validate parameter list values
451//
452 if (parms.indexSz <= 0 || parms.maxKeys <= 0 || parms.maxKLen <= 0)
453 {errno = EINVAL; return false;}
454 if (parms.mode & ~okMode || ((parms.mode & crMode) != crMode))
455 {errno = EACCES; return false;}
456
457// We need the reuse and multw options later so calclulate them now
458//
459 reUse = (parms.reUse <= 0 ? false : true);
460 multW = (parms.multW <= 0 ? false : true);
461
462// Clear the memory segment information we will be constructing
463//
464 memset(&theInfo, 0, sizeof(theInfo));
465
466// Calculate the info header size (we round up to 1K)
467//
468 shmInfoSz = (sizeof(ShmInfo)+minInfoSz-1)/minInfoSz*minInfoSz;
469 theInfo.lowFree = theInfo.infoSz = shmInfoSz;
470
471// Calculate the size of each item (rounded to a doubleword)
472//
473 shmItemSz = (shmTypeSz + parms.maxKLen+1 + sizeof(MemItem) + 7)/8*8;
474 theInfo.itemSz = shmItemSz;
475
476// Calculate total amount we need for the items
477//
478 maxEnts = parms.maxKeys;
479 totSz = shmItemSz * maxEnts;
480 totSz = (totSz+7)/8*8;
481
482// Calculate the amount we need for the index
483//
484 indexSz = parms.indexSz*sizeof(int);
485 indexSz = (indexSz+7)/8*8;
486
487// Compute total size and adjust it to be a multiple of the page size
488//
489 totSz = totSz + indexSz + shmInfoSz;
490 totSz = (totSz/PageSize+1)*PageSize;
491
492// Generate the hashID if not specified
493//
494 if (!shmHash) memcpy(&shmHash, "c32 ", sizeof(int));
495
496// Complete the shared memory segment information structure
497//
498 theInfo.index = totSz-indexSz;
499 theInfo.slots = parms.indexSz;
500 theInfo.typeSz = shmTypeSz;
501 theInfo.highUse = theInfo.index;
502 theInfo.reUse = reUse;
503 theInfo.multW = multW;
504 theInfo.keyPos = keyPos = shmTypeSz + sizeof(MemItem);
505 theInfo.maxKeys = maxEnts;
506 theInfo.maxKeySz = maxKLen = parms.maxKLen;
507 theInfo.hashID = shmHash;
508 strncpy(theInfo.typeID, shmType, sizeof(theInfo.typeID)-1);
509 strncpy(theInfo.myName, shmImpl, sizeof(theInfo.myName)-1);
510
511// Create the new filename of the new file we will create
512//
513 n = strlen(shmPath);
514 shmTemp = (char *)malloc(n+8);
515 sprintf(shmTemp, "%s.new", shmPath);
516
517// Open the file creaing as necessary
518//
519 if ((shmFD = ShMam_Open(shmTemp, O_RDWR|O_CREAT, parms.mode)) < 0)
520 return false;
521 accMode = parms.mode;
522 fileHelp.autoClose = true;
523
524// Verify that no one else is using this file.
525//
526 if (!Lock(true, true)) {errno = EADDRINUSE; return false;}
527
528// Make the file as large as need be
529//
530 if (ftruncate(shmFD, 0) || ftruncate(shmFD, totSz)) return false;
531
532// Map the file as a writable shared segment
533//
534 shmBase = (char *)mmap(0, totSz, PROT_READ|PROT_WRITE, MAP_SHARED, shmFD, 0);
535 if (shmBase == MAP_FAILED) return false;
536 shmSize = totSz;
537 isRW = true;
538
539// Copy the segment information into the segment
540//
541 memcpy(shmBase, &theInfo, sizeof(theInfo));
542 xntP.intP = SHMADDR(int, SHMINFO(index)); shmIndex = xntP.antP;
543 shmSlots = parms.indexSz;
544
545// A created table has, by definition, a single writer until it is exported.
546// So, we simply keep the r/w lock on the file until we export the file. Other
547// threads won't change that and other process will not be able to use the file.
548//
549 lockRO = lockRW = false;
550 fileHelp.autoClose = false;
551 return true;
552}
553
554/******************************************************************************/
555/* D e l I t e m */
556/******************************************************************************/
557
558bool XrdSsiShMam::DelItem(void *data, const char *key, int hash)
559{
560 XLockHelper lockInfo(this, RWLock);
561 MemItem *theItem, *prvItem;
562 int hEnt, iOff;
563
564// Make sure we can delete an item
565//
566 if (!shmSize) {errno = ENOTCONN; return false;}
567 if (!isRW) {errno = EROFS; return false;}
568
569// Check if we need to remap this memory (atomic tests is not needed here)
570//
571 if (verNum != SHMINFO(verNum)) ReMap(RWLock);
572
573// Lock the file if we have multiple writers or recycling items
574// We need to do this prior to file locking as the requirements may change.
575//
576 if (lockRW && !lockInfo.FLock()) return false;
577
578// First try to find the item
579//
580 if (!(hEnt = Find(theItem, prvItem, key, hash)))
581 {if (data) {errno = ENOENT; return false;}
582 return true;
583 }
584
585// Return the contents of the item if the caller wishes that
586//
587 if (data) memcpy(data, ITEM_VAL(theItem), shmTypeSz);
588
589// Delete the item from the index. The update of the count need not be atomic.
590// Also fetching of the next offset need not be atomic as we are the only one.
591//
592 iOff = theItem->next;
593 SHMINFO(itemCount)--;
594 if (prvItem) Atomic_SET_STRICT(prvItem->next, iOff); // Atomic
595 else {if (!iOff) SHMINFO(slotsUsed)--;
596 Atomic_SET_STRICT(shmIndex[hEnt],iOff); // Atomic
597 }
598 RetItem(theItem);
599
600// Indicate the things we updated if need be
601//
602 if (syncOn)
603 {Updated(0);
604 Updated(SHMOFFS(theItem));
605 if (prvItem) Updated(SHMOFFS(prvItem));
606 else Updated(SHMOFFS(&shmIndex[hEnt]));
607 }
608
609// All done
610//
611 return true;
612}
613
614/******************************************************************************/
615/* D e t a c h */
616/******************************************************************************/
617
619{
620// Clean up
621//
622 if (shmFD >= 0) {close(shmFD); shmFD = -1;}
623 if (shmSize) {munmap(shmBase, shmSize); shmSize = 0;}
624 if (shmTemp) {free(shmTemp); shmTemp = 0;}
625 shmIndex = 0;
626}
627
628/******************************************************************************/
629/* E n u m e r a t e */
630/******************************************************************************/
631
633{
634 EnumJar *theJar = (EnumJar *)jar;
635
636// Close off the enumeration
637//
638 if (theJar) {delete theJar; jar = 0;}
639 return true;
640}
641
642/******************************************************************************/
643
644bool XrdSsiShMam::Enumerate(void *&jar, char *&key, void *&val)
645{
646 XLockHelper lockInfo(this, ROLock);
647 EnumJar *theJar = (EnumJar *)jar;
648 MemItem *theItem;
649 long long iTest;
650 int rc, newFD, fence, iOff, hash = 0;
651
652// Make sure we can get an item
653//
654 if (!shmSize) {errno = ENOTCONN; return false;}
655
656// If this is the first call, initialize the jar. First check if we need to
657// remap the segment. We need to do this prior to file locking as the
658// requirements may change. Then create a jar and a shadow copy of the segment.
659//
660 if (!jar)
661 {if (verNum != SHMINFO(verNum)) ReMap(ROLock);
662 if ((newFD = ShMam_Dup(shmFD)) < 0) return false;
663 theJar = new EnumJar(newFD, shmItemSz);
664 jar = theJar;
665 } else if (theJar->iNum < 0)
666 {Enumerate(jar);
667 errno = ENOENT;
668 return false;
669 }
670
671// Lock the file if we have multiple writers or recycling items
672//
673 if (lockRO && !lockInfo.FLock())
674 {rc = errno; Enumerate(jar); errno = rc; return false;}
675
676// Compute the next key we should start the search at but make sure it will not
677// generate an overflow. In the process we fetch the stopping point only once.
678//
679 iTest = (static_cast<long long>(theJar->iNum) * shmItemSz) + shmInfoSz;
680 fence = SHMINFO(lowFree); // Atomic??
681 if (iTest < fence) iOff = static_cast<int>(iTest);
682 else iOff = fence;
683
684// Now start the search. Note that pread() must do a memory fence.
685//
686 theItem = (MemItem *)(theJar->buff);
687 while(iOff < fence)
688 {rc = pread(theJar->fd, theJar->buff, shmItemSz, iOff);
689 if (rc < 0) return false;
690 if (rc != shmItemSz) break;
691 if ((hash = theItem->hash)) break; // Atomic
692 iOff += shmItemSz;
693 }
694
695// Check if we found a key
696//
697 if (!hash) {Enumerate(jar); errno = ENOENT; return false;}
698
699// Return the key and and the associated value
700//
701 key = ITEM_KEY(theItem);
702 val = ITEM_VAL(theItem);
703
704// Compute the contents of the new jar
705//
706 theJar->iNum = (iOff - shmInfoSz)/shmItemSz + 1;
707 return true;
708}
709
710/******************************************************************************/
711/* E x p o r t */
712/******************************************************************************/
713
715{
716 MutexHelper mtHelp(&myMutex, RWLock);
717
718// Make sure we are attached and in R/W mode and exportable
719//
720 if (!shmSize) {errno = ENOTCONN; return false;}
721 if (!shmTemp) {errno = ENOPROTOOPT; return false;}
722 if (!isRW) {errno = EROFS; return false;}
723
724// All that is left is to export the file using the internal interface. Tell
725// the exporter that we don't have the original file locked.
726//
727 return ExportIt(false);
728}
729
730/******************************************************************************/
731/* Private: E x p o r t I t */
732/******************************************************************************/
733
734bool XrdSsiShMam::ExportIt(bool fLocked)
735{
736 int rc, oldFD;
737
738// If data synchronization was wanted, then flush the modified pages to
739// disk before we make this file visible.
740//
741 if (syncOn) Flush();
742
743// Open the original file. If it exists then lock it. We will need to do this
744// locally as the the Lock/Unlock() functions are cognizant of threads and that
745// is not the case here. We are a singleton.
746//
747 if ((oldFD = ShMam_Open(shmPath, O_RDWR)) < 0)
748 {if (errno != ENOENT) return false;}
749 else if (!fLocked)
750 {do {rc = flock(oldFD, LOCK_EX);} while(rc < 0 && errno == EINTR);
751 if (rc) return false;
752 }
753
754// Rename the new file on top of the old one (the fd's remain in tact)
755//
756 if (rename(shmTemp, shmPath)) {if (oldFD) close(oldFD); return false;}
757 free(shmTemp); shmTemp = 0;
758
759// If there was an original file then we must indicate that a new vesion has
760// been exported so current users switch to the new version. This is a lazy
761// version update because we just need readers to eventually realize this.
762//
763 if (oldFD >= 0)
764 {int vnum; bool noGo = false;
765 if (pread(oldFD, &vnum, sizeof(vnum), 0) == (ssize_t)sizeof(vnum))
766 {vnum++;
767 if (pwrite(oldFD, &vnum, sizeof(vnum), 0) != (ssize_t)sizeof(vnum))
768 noGo = true;
769 } else noGo = true;
770 if (noGo) std::cerr <<"SsiShMam: Unable to update version for " <<shmPath
771 <<"; " <<XrdSysE2T(errno) <<std::endl;
772 close(oldFD);
773 }
774
775// We are done. However, before we return make sure the locking requirements
776// are set to reflect a global view as an unexported file had relaxed locking
777// requirements. The close unlocked the original file and now we must unlock
778// our file as create kept the file lock until the export.
779//
780 SetLocking(true);
781 UnLock(true);
782 return true;
783}
784
785/******************************************************************************/
786/* Private: F i n d */
787/******************************************************************************/
788
789int XrdSsiShMam::Find(XrdSsiShMam::MemItem *&theItem,
790 XrdSsiShMam::MemItem *&prvItem,
791 const char *key, int &hash)
792{
793 int hEnt, iOff;
794
795// If no hash was supplied, get one
796//
797 if (!hash) hash = HashVal(key);
798
799// Compute index table entry and atomically fetch the entry
800//
801 hEnt = (unsigned int)hash % shmSlots;
802 if (hEnt == 0) hEnt = 1;
803 iOff = Atomic_GET_STRICT(shmIndex[hEnt]); // Atomic?
804
805// Find the item
806//
807 prvItem = 0;
808 while(iOff)
809 {theItem = SHMADDR(MemItem, iOff);
810 if (hash == theItem->hash && !strcmp(key, ITEM_KEY(theItem)))
811 return hEnt;
812 prvItem = theItem;
813 iOff = Atomic_GET_STRICT(theItem->next); // Atomic?
814 }
815
816// We did not find the item
817//
818 return 0;
819}
820
821/******************************************************************************/
822/* Private: F l u s h */
823/******************************************************************************/
824
825bool XrdSsiShMam::Flush()
826{
827 int rc;
828
829// Do appropriate sync
830//
831#if _POSIX_SYNCHRONIZED_IO > 0
832 rc = fdatasync(shmFD) == 0;
833#else
834 rc = fsync(shmFD) == 0;
835#endif
836
837// If we failed, issue message
838//
839 if (rc)
840 {rc = errno;
841 std::cerr <<"ShMam: msync() failed; " <<XrdSysE2T(errno) <<std::endl;
842 errno = rc; rc = -1;
843 }
844
845// Return result
846//
847 return rc == 0;
848}
849
850/******************************************************************************/
851/* G e t I t e m */
852/******************************************************************************/
853
854bool XrdSsiShMam::GetItem(void *data, const char *key, int hash)
855{
856 XLockHelper lockInfo(this, ROLock);
857 MemItem *theItem, *prvItem;
858 int hEnt;
859
860// Make sure we can get an item
861//
862 if (!shmSize) {errno = ENOTCONN; return false;}
863
864// Check if we need to remap this memory (atomic tests is not needed here)
865// We need to do this prior to file locking as the requirements may change.
866//
867 if (verNum != SHMINFO(verNum)) ReMap(ROLock);
868
869// Lock the file if we have multiple writers or recycling items
870//
871 if (lockRO && !lockInfo.FLock()) return false;
872
873// First try to find the item
874//
875 if (!(hEnt = Find(theItem, prvItem, key, hash)))
876 {errno = ENOENT; return false;}
877
878// Return the contents of the item if the caller wishes that
879//
880 if (data) memcpy(data, ITEM_VAL(theItem), shmTypeSz);
881
882// All done
883//
884 return true;
885}
886
887/******************************************************************************/
888/* Private: H a s h V a l */
889/******************************************************************************/
890
891int XrdSsiShMam::HashVal(const char *key)
892{
893 uLong crc;
894 int hval, klen = strlen(key);
895
896// Get initial crc value
897//
898 crc = crc32(0L, Z_NULL, 0);
899
900// Compute the hash
901//
902 crc = crc32(crc, (const Bytef *)key, klen);
903
904// Cast it to an int (it's weird that zlib want to use a long for this). If the
905// vaue is zero make it 1 as we need to use zero as a missing value.
906//
907 hval = static_cast<int>(crc);
908 return (hval ? hval : 1);
909}
910
911/******************************************************************************/
912/* Private: L o c k */
913/******************************************************************************/
914
915// The caller must have obtained a mutex consistent with the argument passed.
916
917bool XrdSsiShMam::Lock(bool xrw, bool nowait)
918{
919 int rc, act = (xrw ? LOCK_EX : LOCK_SH);
920
921// Make sure we have a file descriptor to lock and is not already locked
922//
923 if (shmFD < 0) {errno = EBADF; return false;}
924
925// We must keep track of r/o locks as there may be many requests but we can
926// only lock the file once for all of them. R/W locks are easier to handle as
927// only one thread can ever have such a lock request. Atomics do not help
928// for R/O locks because they suffer from an unlock control race and also
929// all R/O requestors must wait if the file is locked by another process.
930//
931 if (xrw) lkCount = 1;
932 else {pthread_mutex_lock(&lkMutex);
933 if (lkCount++) {pthread_mutex_unlock(&lkMutex); return true;}
934 }
935
936// Check if we should not wait for the lock
937//
938 if (nowait) act |= LOCK_NB;
939
940// Now obtain the lock
941//
942 do {rc = flock(shmFD, act);} while(rc < 0 && errno == EINTR);
943
944// Decrement lock count if we failed (we were optimistic). Note that we still
945// have the mutex locked if this was a T/O request.
946//
947 if (rc) {if (xrw) lkCount = 0;
948 else lkCount--;
949 }
950
951// Unlock the mutex if we still have it locked and return result
952//
953 if (!xrw) pthread_mutex_unlock(&lkMutex);
954 return rc == 0;
955}
956
957/******************************************************************************/
958/* I n f o */
959/******************************************************************************/
960
961int XrdSsiShMam::Info(const char *vname, char *buff, int blen)
962{
963 MutexHelper mtHelp(&myMutex, ROLock);
964
965// Make sure we can delete an item
966//
967 if (!shmSize) {errno = ENOTCONN; return 0;}
968
969 if (!strcmp(vname, "atomics"))
970 {int n = strlen(Atomic_IMP);
971 strcpy(buff, Atomic_IMP);
972 return n;
973 }
974
975 if (!strcmp(vname, "hash"))
976 {if (!buff || blen < (int)(sizeof(int)+1)) {errno = EMSGSIZE; return -1;}
977 memcpy(buff, &SHMINFO(hashID), sizeof(int)); buff[sizeof(int)] = 0;
978 return strlen(buff);
979 }
980 if (!strcmp(vname, "impl"))
981 {int n = strlen(SHMINFO(myName));
982 if (!buff || blen < n) {errno = EMSGSIZE; return -1;}
983 strcpy(buff, SHMINFO(myName));
984 return n;
985 }
986 if (!strcmp(vname, "flockro")) return lockRO;
987 if (!strcmp(vname, "flockrw")) return lockRW;
988 if (!strcmp(vname, "indexsz")) return shmSlots;
989 if (!strcmp(vname, "indexused")) return SHMINFO(slotsUsed);
990 if (!strcmp(vname, "keys")) return SHMINFO(itemCount); // Atomic
991 if (!strcmp(vname, "keysfree"))
992 return (SHMINFO(highUse) - SHMINFO(lowFree))/shmItemSz
993 + SHMINFO(freeCount);
994 if (!strcmp(vname, "maxkeylen")) return SHMINFO(maxKeySz);
995 if (!strcmp(vname, "multw")) return multW;
996 if (!strcmp(vname, "reuse")) return reUse;
997 if (!strcmp(vname, "type"))
998 {int n = strlen(SHMINFO(typeID));
999 if (!buff || blen < n) {errno = EMSGSIZE; return -1;}
1000 strcpy(buff, SHMINFO(typeID));
1001 return n;
1002 }
1003 if (!strcmp(vname, "typesz")) return SHMINFO(typeSz);
1004
1005// Return variable not supported
1006//
1007 errno = ENOTSUP;
1008 return -1;
1009}
1010
1011/******************************************************************************/
1012/* Private: N e w I t e m */
1013/******************************************************************************/
1014
1015XrdSsiShMam::MemItem *XrdSsiShMam::NewItem()
1016{
1017 MemItem *itemP;
1018 int iOff;
1019
1020// First see if we can get this from the free chain
1021//
1022 if (reUse && SHMINFO(freeItem))
1023 {iOff = SHMINFO(freeItem);
1024 itemP = SHMADDR(MemItem, iOff);
1025 SHMINFO(freeItem) = itemP->next;
1026 SHMINFO(freeCount)--; // Atomic?
1027 } else {
1028 int newFree = SHMINFO(lowFree) + shmItemSz;
1029 if (newFree > SHMINFO(highUse)) itemP = 0;
1030 else {iOff = SHMINFO(lowFree);
1031 itemP = SHMADDR(MemItem, iOff);
1032 SHMINFO(lowFree) = newFree;
1033 }
1034 }
1035
1036// Return result
1037//
1038 return itemP;
1039}
1040
1041/******************************************************************************/
1042/* Private: R e M a p */
1043/******************************************************************************/
1044
1045bool XrdSsiShMam::ReMap(LockType iHave)
1046{
1048
1049// If the caller has a read mutex then we must change it to a r/w mutex as we
1050// may be changing all sorts of variables. It will continue holding this mutex.
1051// Fortunately, remappings do not occur very often in practice.
1052//
1053 if (iHave == ROLock)
1054 {pthread_rwlock_unlock(&myMutex);
1055 pthread_rwlock_wrlock(&myMutex);
1056 }
1057
1058// Check if the version number no longer differs, then just return. This may
1059// happen because a previous thread forced the remapping and everyone was
1060// waiting for that to happen as we hold the r/w mutex.
1061//
1062 if (verNum == SHMINFO(verNum)) return false;
1063
1064// Setup parms
1065//
1066 parms.impl = shmImpl;
1067 parms.path = shmPath;
1068 parms.typeID = shmType;
1069 parms.typeSz = shmTypeSz;
1070 parms.hashID = shmHash;
1071
1072// Attach the new segment. If we fail, then just continue
1073//
1074 XrdSsiShMam newMap(parms);
1075 if (!newMap.Attach(timeOut, isRW)) return false;
1076
1077// Swap the new map with our map
1078//
1079 SwapMap(newMap);
1080 return true;
1081}
1082
1083/******************************************************************************/
1084/* R e s i z e */
1085/******************************************************************************/
1086
1088{
1089 XLockHelper lockInfo(this, RWLock);
1090 XrdSsiShMat::NewParms newParms;
1091 MemItem *theItem;
1092 void *val;
1093 char *key;
1094 int fence, iOff, hash;
1095
1096// Make sure we can delete an item
1097//
1098 if (!shmSize) {errno = ENOTCONN; return false;}
1099 if (!isRW) {errno = EROFS; return false;}
1100
1101// Validate parameter list values
1102//
1103 if (parms.indexSz < 0 || parms.maxKeys < 0 || parms.maxKLen < 0)
1104 {errno = EINVAL; return false;}
1105
1106// A resize is not permitted on an un-exported segment
1107//
1108 if (shmTemp) {errno = EPERM; return false;}
1109
1110// Check if we need to remap this memory (atomic tests is not needed here)
1111//
1112 if (verNum != SHMINFO(verNum)) ReMap(RWLock);
1113
1114// Lock the source file
1115//
1116 if (!lockInfo.FLock()) return false;
1117
1118// Setup parms for the segment object
1119//
1120 newParms.impl = shmImpl;
1121 newParms.path = shmPath;
1122 newParms.typeID = shmType;
1123 newParms.typeSz = shmTypeSz;
1124 newParms.hashID = shmHash;
1125
1126// Create a new segment object (this cannot fail).
1127//
1128 XrdSsiShMam newMap(newParms);
1129
1130// Set the values in the parameter list for those wanting the current setting.
1131//
1132 if (!parms.indexSz) parms.indexSz = shmSlots;
1133 if (!parms.maxKeys) parms.maxKeys = SHMINFO(maxKeys);
1134 if (!parms.maxKLen) parms.maxKLen = maxKLen;
1135 if (parms.reUse < 0) parms.reUse = reUse;
1136 if (parms.multW < 0) parms.multW = multW;
1137
1138// Create the new target file
1139//
1140 parms.mode = accMode;
1141 if (!newMap.Create(parms)) return false;
1142
1143// Compute the offset of the first item and get the offset of the last item.
1144//
1145 fence = SHMINFO(lowFree); // Atomic??
1146 iOff = shmInfoSz;
1147
1148// For each item found in the current map add it to the new map
1149//
1150 while(iOff < fence)
1151 {theItem = SHMADDR(MemItem, iOff);
1152 if ((hash = theItem->hash))
1153 {key = ITEM_KEY(theItem);
1154 val = ITEM_VAL(theItem);
1155 if (!newMap.AddItem(val, 0, key, hash, true)) return false;
1156 }
1157 iOff += shmItemSz;
1158 }
1159
1160// We need to drop the lock on the file otherwise the export will hang
1161//
1162
1163// All went well, so export this the new map using the internal interface as
1164// we already have the source file locked and export normally tries to lock it.
1165//
1166 if (!newMap.ExportIt(true)) return false;
1167
1168// All that we need to do is to swap the map with our map and we are done.
1169//
1170 SwapMap(newMap);
1171 return true;
1172}
1173
1174/******************************************************************************/
1175/* Private: R e t I t e m */
1176/******************************************************************************/
1177
1178void XrdSsiShMam::RetItem(MemItem *iP)
1179{
1180
1181// Zorch the hash so this item cannot be found. This is problematic for
1182// enumerations. They may or may not include this key, but at least it will
1183// consistent at the time the enumeration happens.
1184//
1185 iP->hash = 0; // Atomic?
1186
1187// If reuse is allowed, place the item on the free list
1188//
1189 if (reUse)
1190 {iP->next = SHMINFO(freeItem);
1191 SHMINFO(freeItem) = SHMOFFS(iP);
1192 SHMINFO(freeCount)++; //Atomic??
1193 }
1194}
1195
1196/******************************************************************************/
1197/* Private: S e t L o c k i n g */
1198/******************************************************************************/
1199
1200void XrdSsiShMam::SetLocking(bool isrw)
1201{
1202 (void)isrw;
1203
1204// If we do not have atomics then file locking is mandatory
1205//
1206#ifdef NEED_ATOMIC_MUTEX
1207 lockRO = lockRW = true;
1208#else
1209// A reader must lock the file R/O if objects are being reused
1210//
1211 lockRO = reUse = SHMINFO(reUse);
1212
1213// A writer must lock the file R/W if objects are being reused or the file may
1214// have multiple writers
1215//
1216 multW = SHMINFO(multW);
1217 lockRW = reUse || multW;
1218#endif
1219}
1220
1221/******************************************************************************/
1222/* S n o o z e */
1223/******************************************************************************/
1224
1225void XrdSsiShMam::Snooze(int sec)
1226{
1227 struct timespec naptime, waketime;
1228
1229// Calculate nano sleep time
1230//
1231 naptime.tv_sec = sec;
1232 naptime.tv_nsec = 0;
1233
1234// Wait for a number of seconds
1235//
1236 while(nanosleep(&naptime, &waketime) && EINTR == errno)
1237 {naptime.tv_sec = waketime.tv_sec;
1238 naptime.tv_nsec = waketime.tv_nsec;
1239 }
1240}
1241
1242/******************************************************************************/
1243/* Private: S w a p M a p */
1244/******************************************************************************/
1245
1246void XrdSsiShMam::SwapMap(XrdSsiShMam &newMap)
1247{
1248
1249// Detach the old map
1250//
1251 Detach();
1252
1253// Swap the maps
1254//
1255 shmFD = newMap.shmFD;
1256 newMap.shmFD = -1;
1257 shmSize = newMap.shmSize;
1258 newMap.shmSize = 0;
1259 shmBase = newMap.shmBase;
1260 newMap.shmBase = 0;
1261 shmIndex = newMap.shmIndex;
1262 newMap.shmIndex = 0;
1263 lockRO = newMap.lockRO;
1264 lockRW = newMap.lockRW;
1265 reUse = newMap.reUse;
1266 multW = newMap.multW;
1267 verNum = newMap.verNum;
1268}
1269
1270/******************************************************************************/
1271/* S y n c */
1272/******************************************************************************/
1273
1275{
1276 MutexHelper mtHelp(&myMutex, RWLock);
1277
1278// Make sure we are attached and in R/W mode
1279//
1280 if (!shmSize) {errno = ENOTCONN; return false;}
1281 if (!isRW) {errno = EROFS; return false;}
1282
1283// For now do a flush as this works in Linux. We may need to generalize this
1284// for all platforms using msync, sigh.
1285//
1286 if (!Flush()) return false;
1287
1288// Reset counters
1289//
1290 syncBase = false;
1291 syncLast = 0;
1292 syncQWR = 0;
1293 return true;
1294}
1295
1296/******************************************************************************/
1297
1298bool XrdSsiShMam::Sync(int syncqsz)
1299{
1300 MutexHelper mtHelp(&myMutex, RWLock);
1301
1302// Make sure we are attached and in R/W mode
1303//
1304 if (!shmSize) {errno = ENOTCONN; return false;}
1305 if (!isRW) {errno = EROFS; return false;}
1306 if (syncqsz < 0) {errno = EINVAL; return false;}
1307
1308// Flush out pages if sync it turned on
1309//
1310 if (syncOn && !Flush()) return false;
1311
1312// Set new queue size
1313//
1314 syncQSZ = syncqsz;
1315 return true;
1316}
1317
1318/******************************************************************************/
1319
1320bool XrdSsiShMam::Sync(bool dosync, bool syncdo)
1321{
1322 MutexHelper mtHelp(&myMutex, RWLock);
1323
1324// Make sure we are attached and in R/W mode
1325//
1326 if (!shmSize) {errno = ENOTCONN; return false;}
1327 if (!isRW) {errno = EROFS; return false;}
1328
1329// Flush out pages if sync it turned on
1330//
1331 if (syncOn && !Flush()) return false;
1332
1333// Set new options
1334//
1335 syncOn = dosync;
1336 syncOpt = (syncdo ? MS_SYNC : MS_ASYNC);
1337 return true;
1338}
1339
1340/******************************************************************************/
1341/* Private: U n L o c k */
1342/******************************************************************************/
1343
1344// The caller must have obtained a mutex consistent with the argument passed.
1345
1346void XrdSsiShMam::UnLock(bool isrw)
1347{
1348 int rc;
1349
1350// Make sure we have a file descriptor to unlock
1351//
1352 if (shmFD < 0) return;
1353
1354// If this is a R/W type of lock then we can immediate release it as there
1355// could have been only one writer. Otherwise, we will need to keep track
1356// of the number of R/O locks has dropped to zero before unlocking the file.
1357// Atomics do not help here because of possible thread inversion.
1358//
1359 if (isrw) lkCount = 0;
1360 else {pthread_mutex_lock(&lkMutex);
1361 lkCount--;
1362 if (lkCount) {pthread_mutex_unlock(&lkMutex); return;}
1363 }
1364
1365// Now release the lock
1366//
1367 do {rc = flock(shmFD, LOCK_UN);} while(rc < 0 && errno == EINTR);
1368
1369// If this was a r/o unlock then we have kept the mutex and must unlock it
1370// We kept the mutex to prevent a control race condition.
1371//
1372 if (!isrw) pthread_mutex_unlock(&lkMutex);
1373}
1374
1375/******************************************************************************/
1376/* Private: U p d a t e d */
1377/******************************************************************************/
1378
1379void XrdSsiShMam::Updated(int mOff)
1380{
1381// Check if this refers to the info struct
1382//
1383 if (!mOff)
1384 {if (!syncBase) {syncBase = true; syncQWR++;}
1385 } else {
1386 if (syncLast != (mOff & PageMask))
1387 {syncLast = (mOff & PageMask); syncQWR++;}
1388 }
1389
1390// Check if we need to flush now
1391//
1392 if (syncQWR >= syncQSZ) {ShMam_Flush(shmFD); syncQWR = 0;}
1393}
1394
1395/******************************************************************************/
1396
1397void XrdSsiShMam::Updated(int mOff, int mLen)
1398{
1399 int memB = mOff & PageMask;
1400 int memE = mOff + mLen;
1401
1402// This is a range update. This is not very precise if update the same page
1403// and the we cross the page boundary. But it should be good enough.
1404//
1405 if (memB != syncLast)
1406 {syncQWR++;
1407 if (memB != (memE & PageMask)) syncQWR++;
1408 syncLast = memB;
1409 }
1410
1411// Check if we need to flush now
1412//
1413 if (syncQWR >= syncQSZ) {ShMam_Flush(shmFD); syncQWR = 0;}
1414}
int fcntl(int fd, int cmd,...)
int fdatasync(int fildes)
#define close(a)
Definition XrdPosix.hh:43
#define fsync(a)
Definition XrdPosix.hh:59
#define fstat(a, b)
Definition XrdPosix.hh:57
#define open
Definition XrdPosix.hh:71
#define stat(a, b)
Definition XrdPosix.hh:96
#define rename(a, b)
Definition XrdPosix.hh:87
#define ftruncate(a, b)
Definition XrdPosix.hh:65
#define pwrite(a, b, c, d)
Definition XrdPosix.hh:102
#define pread(a, b, c, d)
Definition XrdPosix.hh:75
#define Atomic(type)
#define Atomic_IMP
#define Atomic_SET(x, y)
#define SHMOFFS(addr)
#define ITEM_VOF(x)
#define SHMINFO(x)
#define ITEM_KEY(x)
#define ITEM_VAL(x)
#define SHMADDR(type, offs)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99
XrdSsiShMam(XrdSsiShMat::NewParms &parms)
bool Resize(XrdSsiShMat::CRZParms &parms)
bool Create(XrdSsiShMat::CRZParms &parms)
void Detach()
Detach the map from the shared memory.
bool AddItem(void *newdata, void *olddata, const char *key, int hash, bool replace=false)
bool DelItem(void *data, const char *key, int hash)
int Info(const char *vname, char *buff=0, int blen=0)
bool GetItem(void *data, const char *key, int hash)
bool Attach(int tout, bool isrw=false)
bool Enumerate(void *&jar, char *&key, void *&val)
const char * typeID
The name of the type associated with the key.
const char * impl
Implementation name.
const char * path
The path to the backing file for the table.
int hashID
The hash being used (0 means the default)
int typeSz
Size of the type in bytes.
int maxKeys
Maximum number of keys-value pairs expected in table.
int maxKLen
The maximum acceptable key length.
int mode
Filemode for the newly created file.
int indexSz
Number of four byte hash table entries to create.