else if (key == "NSPrefix") {
this->nsPrefix_ = value;
}
+ else if (key == "DiskPrefix") {
+ this->diskPrefix_ = value;
+ }
else
throw DmException(DMLITE_CFGERR(DMLITE_UNKNOWN_KEY),
"Unrecognised option " + key);
PoolDriver* VfsFactory::createPoolDriver() throw (DmException)
{
syslog(LOG_DEBUG, "%s", __func__);
- return new VfsPoolDriver(this->hostName_,
+ return new VfsPoolDriver(this->hostName_, this->diskPrefix_,
this->tokenPasswd_, this->tokenUseIp_,
this->tokenLife_);
}
VfsPoolDriver::VfsPoolDriver(const std::string& host,
+ const std::string& prefix,
const std::string& passwd, bool useIp,
unsigned life):
hostName_(host),
tokenPasswd_(passwd), tokenUseIp_(useIp), tokenLife_(life)
{
- // Nothing
+ this->prefix_ = prefix.empty() ? "" : prefix;
}
{
struct statfs fs;
- wrapCall(statfs("/", &fs));
+ wrapCall(statfs(getLocalPath("/").c_str(), &fs));
return fs.f_bsize * fs.f_blocks;
}
uint64_t VfsPoolHandler::getFreeSpace(void) throw (DmException)
{
-struct statfs fs;
+ struct statfs fs;
- wrapCall(statfs("/", &fs));
+ wrapCall(statfs(getLocalPath("/").c_str(), &fs));
return fs.f_bsize * fs.f_bfree;
}
{
struct stat fstat;
- return stat(replica.rfn.c_str(), &fstat) == 0;
+ return stat(getLocalPath(replica.rfn).c_str(), &fstat) == 0;
}
Location VfsPoolHandler::whereToRead(const Replica& replica) throw (DmException)
{
struct stat fstat;
+ std::string path;
- wrapCall(stat(replica.rfn.c_str(), &fstat));
+ if (replica.rfn.empty() || replica.rfn[0] != '/') path = "/" + replica.rfn;
+ else path = replica.rfn;
+
+ wrapCall(stat(getLocalPath(path).c_str(), &fstat), "could not find '%s'", path.c_str());
- Url rloc(replica.rfn);
+ Url rloc(path);
Chunk single;
Chunk single;
Url wloc;
std::vector<std::string> components;
- std::string curdir, parentdir;
+ std::string path, curdir, parentdir;
struct stat fstat;
- components = Url::splitPath(sfn);
+ if (sfn.empty() || sfn[0] != '/') path = "/" + sfn;
+ else path = sfn;
+
+ components = Url::splitPath(path);
components.pop_back();
parentdir = Url::joinPath(components);
// Catalog checks permissions in all parent directories, so with disk is in
// sync with namespace, the creating of subtree should not be needed.
//
- if (stat(parentdir.c_str(), &fstat) == -1) {
- components = Url::splitPath(sfn);
+ if (stat(getLocalPath(parentdir).c_str(), &fstat) == -1) {
+ components = Url::splitPath(path);
curdir = "";
for (unsigned int i = 0; i < components.size() - 1; i++) {
// the first component always '/', let's use it as separator
if (i < 2) curdir += components[i];
else curdir = curdir + "/" + components[i];
// ignore all errors - we stupidly go from '/' toward deeper levels
- mkdir(curdir.c_str(), 0755);
+ mkdir(getLocalPath(curdir).c_str(), 0755);
}
// detailed error from stat() only after the attempt for the subtree
- wrapCall(stat(curdir.c_str(), &fstat), "parent directory '%s' not available", curdir.c_str());
+ wrapCall(stat(getLocalPath(curdir).c_str(), &fstat), "parent directory '%s' not available", curdir.c_str());
}
if (!S_ISDIR(fstat.st_mode))
vfsThrow(ENOTDIR, "'%s' is not directory", curdir.c_str());
- wloc = Url(sfn);
+ wloc = Url(path);
single.url.domain = this->driver_->hostName_;
single.url.path = "/vfs" + wloc.path;
void VfsPoolHandler::doneWriting(const Location& loc) throw (DmException)
{
}
+
+
+
+const std::string VfsPoolHandler::getLocalPath(const std::string &path) {
+ if (path.empty())
+ return this->driver_->prefix_;
+
+ if (path[0] != '/')
+ return this->driver_->prefix_ + "/" + path;
+
+ return this->driver_->prefix_ + path;
+}
Location VfsPoolManager::whereToRead(const std::string& path) throw (DmException)
{
- struct stat fstat;
-
- wrapCall(stat(path.c_str(), &fstat));
-
- Url rloc(path);
-
- Chunk single;
-
- single.url.domain = this->hostName_;
- single.url.path = "/vfs" + rloc.path;
- single.offset = 0;
- single.size = fstat.st_size;
-
- single.url.query["token"] = dmlite::generateToken(this->userId_, single.url.path,
- this->tokenPasswd_,
- this->tokenLife_);
-
- return Location(1, single);
+ PoolHandler* handler;
+ Replica replica;
+ Location location;
+
+ // TODO: Calling vfs pool code directly is just simplification (it is better
+ // to compute path in PoolDriver instead to repeating it here).
+ //
+ // Proper way should be through replicas and pick the random available pool:
+ //
+ // std::vector<Replica> replicas = this->stack_->getCatalog()->getReplicas(path);
+ // Pool pool = this->getPool(replicas[i].getString("pool"));
+ // PoolHandler* handler = this->stack_->getPoolDriver(pool.type)->createPoolHandler(pool.name);
+ // if (handler->replicaIsAvailable(replicas[i])) ...
+ // ...
+ //
+ // We create a stub replica instead.
+ //
+ handler = this->stack_->getPoolDriver("vfs")->createPoolHandler("vfs");
+ replica.rfn = path;
+ location = handler->whereToRead(replica);
+ delete handler;
+
+ return location;
}