Write mode support in PoolHandler and PoolManager. PoolManager calls catalog functions.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 15 Oct 2013 11:17:00 +0000 (13:17 +0200)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 15 Oct 2013 11:17:00 +0000 (13:17 +0200)
src/VfsDriver.cpp
src/VfsIO.cpp
src/VfsIO.h
src/VfsPool.cpp
src/VfsPool.h

index 5fed071..56bd514 100644 (file)
@@ -3,6 +3,8 @@
 /// @author Alejandro Álvarez Ayllón <aalvarez@cern.ch>
 #include <dmlite/cpp/dmlite.h>
 #include <dmlite/cpp/utils/urls.h>
+#include <sys/types.h>
+#include <sys/stat.h>
 #include <errno.h>
 #include <stdlib.h>
 #include <string.h>
@@ -181,7 +183,7 @@ Location VfsPoolHandler::whereToRead(const Replica& replica) throw (DmException)
   single.url.path   = "/vfs" + rloc.path;
   single.offset = 0;
   single.size   = fstat.st_size;
-  
+
   single.url.query["token"] = dmlite::generateToken(this->driver_->userId_, single.url.path,
                                                     this->driver_->tokenPasswd_,
                                                     this->driver_->tokenLife_);
@@ -200,12 +202,56 @@ void VfsPoolHandler::removeReplica(const Replica& replica) throw (DmException)
 
 Location VfsPoolHandler::whereToWrite(const std::string& sfn) throw (DmException)
 {
-  throw DmException(EACCES, "Write mode not supported");
+  Chunk single;
+  Url wloc;
+  std::vector<std::string> components;
+  std::string curdir, parentdir;
+  struct stat fstat;
+
+  components = Url::splitPath(sfn);
+  components.pop_back();
+  parentdir = Url::joinPath(components);
+
+  //
+  // check the parent directory and try to create subtree if not available
+  //
+  // Catalog checks permissions in all parent directories, so with disk is in
+  // sync with namespace, the creating of subtree should not be needed.
+  //
+  if (stat(parentdir.c_str(), &fstat) == -1) {
+    components = Url::splitPath(sfn);
+    curdir = "";
+    for (unsigned int i = 0; i < components.size() - 1; i++) {
+      // the first component always '/', let's use it as separator
+      if (i < 2) curdir += components[i];
+      else curdir = curdir + "/" + components[i];
+      // ignore all errors - we stupidly go from '/' toward deeper levels
+      mkdir(curdir.c_str(), 0755);
+    }
+
+    // detailed error from stat() only after the attempt for the subtree
+    wrapCall(stat(curdir.c_str(), &fstat), "parent directory '%s' not available", curdir.c_str());
+  }
+
+  if (!S_ISDIR(fstat.st_mode))
+    vfsThrow(ENOTDIR, "'%s' is not directory", curdir.c_str());
+
+  wloc = Url(sfn);
+
+  single.url.domain   = this->driver_->hostName_;
+  single.url.path   = "/vfs" + wloc.path;
+  single.offset = 0;
+  single.size = 0;
+
+  single.url.query["token"] = dmlite::generateToken(this->driver_->userId_, single.url.path,
+                                                    this->driver_->tokenPasswd_,
+                                                    this->driver_->tokenLife_, true);
+
+  return Location(1, single);
 }
 
 
 
 void VfsPoolHandler::doneWriting(const Location& loc) throw (DmException)
 {
-  throw DmException(EACCES, "Write mode not supported");
 }
index 7cbb886..7ca5295 100644 (file)
@@ -74,10 +74,8 @@ IOHandler* VfsIODriver::createIOHandler(const std::string& pfn,
 
 
 
-void VfsIODriver::doneWriting(const std::string& pfn,
-                              const Extensible& params) throw (DmException)
+void VfsIODriver::doneWriting(const Location &loc) throw (DmException)
 {
-  throw DmException(EACCES, "Write mode not supported");
 }
 
 
index effff34..3b1aa7a 100644 (file)
@@ -23,8 +23,7 @@ namespace dmlite {
     IOHandler* createIOHandler(const std::string& pfn, int flags,
                               const Extensible& extras, mode_t mode) throw (DmException);
 
-    void doneWriting(const std::string& pfn,
-                     const Extensible& params) throw (DmException);
+    void doneWriting(const Location &loc) throw (DmException);
 
    private:
     StackInstance*         si_;
index bf9b4c8..15bd9e5 100644 (file)
@@ -44,8 +44,7 @@ std::string VfsPoolManager::getImplId() const throw()
 
 void VfsPoolManager::setStackInstance(StackInstance* si) throw (DmException)
 {
-  // Nothing
-  this->si_ = si;
+  this->stack_ = si;
 }
 
 
@@ -69,9 +68,9 @@ std::vector<Pool> VfsPoolManager::getPools(PoolAvailability availability) throw
   
   switch (availability)
   {
-    case kForBoth: case kForWrite:
     case kNone:
       break;
+    case kForBoth: case kForWrite:
     default:
       return std::vector<Pool>(1, pool);
   }
@@ -153,5 +152,34 @@ Location VfsPoolManager::whereToRead(ino_t) throw (DmException)
 
 Location VfsPoolManager::whereToWrite(const std::string& path) throw (DmException)
 {
-  throw DmException(EACCES, "Write mode not supported");
+  std::vector<Pool> pools;
+  PoolHandler* handler;
+  Location location;
+  unsigned i;
+
+  pools = this->getPools(PoolManager::kForWrite);
+  if (pools.size() == 0)
+    vfsThrow(ENOSPC, "There are no pools available for writing");
+
+  // Pick a random one
+  i = rand() % pools.size();
+
+  handler = this->stack_->getPoolDriver(pools[i].type)->createPoolHandler(pools[i].name);
+
+  // TODO: Remove replicas if overwrite is true and the file exists,
+  //       get the mode and ACL
+
+  // Create the entry if it does not exist already
+  // Note: the expected behavior is to truncate and keep mode if it does
+  syslog(LOG_DEBUG, "%s: calling catalog->create('%s', 0644)", __func__, path.c_str());
+  this->stack_->getCatalog()->create(path, 0644);
+  // TODO: no support for overwriting
+  //if (!acl.empty())
+  //  this->stack_->getCatalog()->setAcl(path, acl);
+
+  // Delegate it to the PoolHandler
+  location = handler->whereToWrite(path);
+  delete handler;
+
+  return location;
 }
index b5d1243..6ce84d8 100644 (file)
@@ -30,7 +30,7 @@ namespace dmlite {
     Location whereToWrite(const std::string& path) throw (DmException);
 
   private:
-    StackInstance* si_;
+    StackInstance* stack_;
     
     std::string hostName_;