XRootD
Loading...
Searching...
No Matches
XrdPfcIOFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19#include <cstdio>
20#include <fcntl.h>
21
22#include "XrdSys/XrdSysError.hh"
25
26#include "XrdPfcIOFile.hh"
27#include "XrdPfcStats.hh"
28#include "XrdPfcTrace.hh"
29
30#include "XrdOuc/XrdOucEnv.hh"
32
33using namespace XrdPfc;
34
35//______________________________________________________________________________
37 IO(io, cache),
38 m_file(0),
39 m_localStat(0)
40{
41 m_file = Cache::GetInstance().GetFile(GetFilename(), this);
42}
43
44//______________________________________________________________________________
46{
47 // called from Detach() if no sync is needed or
48 // from Cache's sync thread
49 TRACEIO(Debug, "~IOFile() " << this);
50
51 delete m_localStat;
52}
53
54//______________________________________________________________________________
55int IOFile::Fstat(struct stat &sbuff)
56{
57 int res = 0;
58 if( ! m_localStat)
59 {
60 res = initCachedStat();
61 if (res) return res;
62 }
63
64 memcpy(&sbuff, m_localStat, sizeof(struct stat));
65 return 0;
66}
67
68//______________________________________________________________________________
69long long IOFile::FSize()
70{
71 return m_file->GetFileSize();
72}
73
74//______________________________________________________________________________
75int IOFile::initCachedStat()
76{
77 // Called indirectly from the constructor.
78
79 static const char* trace_pfx = "initCachedStat ";
80
81 int res = -1;
82 struct stat tmpStat;
83
84 std::string fname = GetFilename();
85 std::string iname = fname + Info::s_infoExtension;
86 if (m_cache.GetOss()->Stat(fname.c_str(), &tmpStat) == XrdOssOK)
87 {
88 XrdOssDF* infoFile = m_cache.GetOss()->newFile(Cache::GetInstance().RefConfiguration().m_username.c_str());
89 XrdOucEnv myEnv;
90 int res_open;
91 if ((res_open = infoFile->Open(iname.c_str(), O_RDONLY, 0600, myEnv)) == XrdOssOK)
92 {
93 Info info(m_cache.GetTrace());
94 if (info.Read(infoFile, iname.c_str()))
95 {
96 // The filesize from the file itself may be misleading if its download is incomplete; take it from the cinfo.
97 tmpStat.st_size = info.GetFileSize();
98 // We are arguably abusing the mtime to be the creation time of the file; then ctime becomes the
99 // last time additional data was cached.
100 tmpStat.st_mtime = info.GetCreationTime();
101 TRACEIO(Info, trace_pfx << "successfully read size " << tmpStat.st_size << " and creation time " << tmpStat.st_mtime << " from info file");
102 res = 0;
103 }
104 else
105 {
106 // file exist but can't read it
107 TRACEIO(Info, trace_pfx << "info file is incomplete or corrupt");
108 }
109 }
110 else
111 {
112 TRACEIO(Error, trace_pfx << "can't open info file " << XrdSysE2T(-res_open));
113 }
114 infoFile->Close();
115 delete infoFile;
116 }
117
118 if (res)
119 {
120 res = GetInput()->Fstat(tmpStat);
121 TRACEIO(Debug, trace_pfx << "got stat from client res = " << res << ", size = " << tmpStat.st_size);
122 // The mtime / atime / ctime for cached responses come from the file on disk in the cache hit case.
123 // To avoid weirdness when two subsequent stat queries can give wildly divergent times (one from the
124 // origin, one from the cache), set the times to "now" so we effectively only report the *time as the
125 // cache service sees it.
126 tmpStat.st_ctime = tmpStat.st_mtime = tmpStat.st_atime = time(NULL);
127 }
128
129 if (res == 0)
130 {
131 m_localStat = new struct stat;
132 memcpy(m_localStat, &tmpStat, sizeof(struct stat));
133 }
134 return res;
135}
136
137//______________________________________________________________________________
139{
140 IO::Update(iocp);
141 m_file->ioUpdated(this);
142}
143
144//______________________________________________________________________________
146{
148 return m_file->ioActive(this);
149}
150
151//______________________________________________________________________________
153{
154 // Effectively a destructor.
155
156 TRACE(Info, "DetachFinalize() " << this);
157
158 m_file->RequestSyncOfDetachStats();
159 Cache::GetInstance().ReleaseFile(m_file, this);
160
161 delete this;
162}
163
164
165//==============================================================================
166// Read and pgRead - sync / async and helpers
167//==============================================================================
168
169//______________________________________________________________________________
170int IOFile::Read(char *buff, long long off, int size)
171{
173
174 auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr);
175
176 TRACEIO(Dump, "Read() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
177
178 rh->m_cond.Lock();
179 int retval = ReadBegin(buff, off, size, rh);
180 if (retval == -EWOULDBLOCK)
181 {
182 rh->m_cond.Wait();
183 retval = rh->m_retval;
184 }
185 rh->m_cond.UnLock();
186
187 return ReadEnd(retval, rh);
188}
189
190//______________________________________________________________________________
191void IOFile::Read(XrdOucCacheIOCB &iocb, char *buff, long long off, int size)
192{
193 struct ZHandler : ReadReqRH
194 { using ReadReqRH::ReadReqRH;
195 IOFile *m_io = nullptr;
196
197 void Done(int result) override {
198 m_io->ReadEnd(result, this);
199 }
200 };
201
203
204 auto *rh = new ZHandler(ObtainReadSid(), &iocb);
205 rh->m_io = this;
206
207 TRACEIO(Dump, "Read() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
208
209 int retval = ReadBegin(buff, off, size, rh);
210 if (retval != -EWOULDBLOCK)
211 {
212 rh->Done(retval);
213 }
214}
215
216//______________________________________________________________________________
217void IOFile::pgRead(XrdOucCacheIOCB &iocb, char *buff, long long off, int size,
218 std::vector<uint32_t> &csvec, uint64_t opts, int *csfix)
219{
220 struct ZHandler : ReadReqRH
221 { using ReadReqRH::ReadReqRH;
222 IOFile *m_io = nullptr;
223 std::function<void (int)> m_lambda {0};
224
225 void Done(int result) override {
226 if (m_lambda) m_lambda(result);
227 m_io->ReadEnd(result, this);
228 }
229 };
230
232
233 auto *rh = new ZHandler(ObtainReadSid(), &iocb);
234 rh->m_io = this;
235
236 TRACEIO(Dump, "pgRead() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
237
239 rh->m_lambda = [=, &csvec](int result) {
240 if (result > 0)
241 XrdOucPgrwUtils::csCalc((const char *)buff, (ssize_t)off, (size_t)result, csvec);
242 };
243
244 int retval = ReadBegin(buff, off, size, rh);
245 if (retval != -EWOULDBLOCK)
246 {
247 rh->Done(retval);
248 }
249}
250
251//______________________________________________________________________________
252int IOFile::ReadBegin(char *buff, long long off, int size, ReadReqRH *rh)
253{
254 // protect from reads over the file size
255 if (off >= FSize()) {
256 size = 0;
257 return 0;
258 }
259 if (off < 0) {
260 return -EINVAL;
261 }
262 if (off + size > FSize()) {
263 size = FSize() - off;
264 }
265 rh->m_expected_size = size;
266
267 return m_file->Read(this, buff, off, size, rh);
268}
269
270//______________________________________________________________________________
271int IOFile::ReadEnd(int retval, ReadReqRH *rh)
272{
273 TRACEIO(Dump, "ReadEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " retval: " << retval << " expected_size: " << rh->m_expected_size);
274
275 if (retval < 0) {
276 TRACEIO(Warning, "ReadEnd() error in File::Read(), exit status=" << retval << ", error=" << XrdSysE2T(-retval) << " sid: " << Xrd::hex1 << rh->m_seq_id);
277 } else if (retval < rh->m_expected_size) {
278 TRACEIO(Warning, "ReadEnd() bytes missed " << rh->m_expected_size - retval << " sid: " << Xrd::hex1 << rh->m_seq_id);
279 }
280 if (rh->m_iocb)
281 rh->m_iocb->Done(retval);
282
283 delete rh;
284
286
287 return retval;
288}
289
290
291//==============================================================================
292// ReadV
293//==============================================================================
294
295//______________________________________________________________________________
296int IOFile::ReadV(const XrdOucIOVec *readV, int n)
297{
299
300 auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr);
301
302 TRACEIO(Dump, "ReadV() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n);
303
304 rh->m_cond.Lock();
305 int retval = ReadVBegin(readV, n, rh);
306 if (retval == -EWOULDBLOCK)
307 {
308 rh->m_cond.Wait();
309 retval = rh->m_retval;
310 }
311 rh->m_cond.UnLock();
312 return ReadVEnd(retval, rh);
313}
314
315//______________________________________________________________________________
316void IOFile::ReadV(XrdOucCacheIOCB &iocb, const XrdOucIOVec *readV, int n)
317{
318 struct ZHandler : ReadReqRH
319 { using ReadReqRH::ReadReqRH;
320 IOFile *m_io = nullptr;
321
322 void Done(int result) override { m_io-> ReadVEnd(result, this); }
323 };
324
326
327 auto *rh = new ZHandler(ObtainReadSid(), &iocb);
328 rh->m_io = this;
329
330 TRACEIO(Dump, "ReadV() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n);
331
332 int retval = ReadVBegin(readV, n, rh);
333 if (retval != -EWOULDBLOCK)
334 {
335 rh->Done(retval);
336 }
337}
338
339//______________________________________________________________________________
340int IOFile::ReadVBegin(const XrdOucIOVec *readV, int n, ReadReqRH *rh)
341{
342 long long file_size = FSize();
343 for (int i = 0; i < n; ++i)
344 {
345 const XrdOucIOVec &vr = readV[i];
346 if (vr.offset < 0 || vr.offset >= file_size ||
347 vr.offset + vr.size > file_size)
348 {
349 return -EINVAL;
350 }
351 rh->m_expected_size += vr.size;
352 }
353 rh->m_n_chunks = n;
354
355 return m_file->ReadV(this, readV, n, rh);
356}
357
358//______________________________________________________________________________
359int IOFile::ReadVEnd(int retval, ReadReqRH *rh)
360{
361 TRACEIO(Dump, "ReadVEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id <<
362 " retval: " << retval << " n_chunks: " << rh->m_n_chunks << " expected_size: " << rh->m_expected_size);
363
364 if (retval < 0) {
365 TRACEIO(Warning, "ReadVEnd() error in File::ReadV(), exit status=" << retval << ", error=" << XrdSysE2T(-retval));
366 } else if (retval < rh->m_expected_size) {
367 TRACEIO(Warning, "ReadVEnd() bytes missed " << rh->m_expected_size - retval);
368 }
369 if (rh->m_iocb)
370 rh->m_iocb->Done(retval);
371
372 delete rh;
373
375
376 return retval;
377}
#define XrdOssOK
Definition XrdOss.hh:50
#define TRACEIO(act, x)
#define stat(a, b)
Definition XrdPosix.hh:96
bool Debug
struct myOpts opts
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int Fstat(struct stat &sbuff)
static const uint64_t forceCS
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition XrdPfc.cc:411
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:398
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:159
void ReleaseFile(File *, IO *)
Definition XrdPfc.cc:492
XrdOss * GetOss() const
Definition XrdPfc.hh:385
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
long long GetFileSize()
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Downloads original file into a single file on local disk. Handles read requests as they come along.
void Update(XrdOucCacheIO &iocp) override
void DetachFinalize() override
Abstract virtual method of XrdPfc::IO Called to destruct the IO object after it is no longer used.
int Read(char *buff, long long off, int size) override
Pass Read request to the corresponding File object.
int Fstat(struct stat &sbuff) override
bool ioActive() override
Abstract virtual method of XrdPfc::IO Called to check if destruction needs to be done in a separate t...
void pgRead(XrdOucCacheIOCB &iocb, char *buff, long long off, int size, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0) override
int ReadV(const XrdOucIOVec *readV, int n) override
Pass ReadV request to the corresponding File object.
long long FSize() override
IOFile(XrdOucCacheIO *io, Cache &cache)
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
std::string GetFilename()
Definition XrdPfcIO.hh:56
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:30
Cache & m_cache
reference to Cache object
Definition XrdPfcIO.hh:52
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:72
const char * RefreshLocation()
Definition XrdPfcIO.hh:57
void Update(XrdOucCacheIO &iocp) override
Definition XrdPfcIO.cc:16
unsigned short ObtainReadSid()
Definition XrdPfcIO.hh:59
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
static const char * s_infoExtension
long long offset
XrdOucCacheIOCB * m_iocb
Definition XrdPfcFile.hh:65
unsigned short m_seq_id
Definition XrdPfcFile.hh:64
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition XrdPfcFile.hh:67