Initial support for replicas and inode cache.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 1 Dec 2013 20:12:27 +0000 (21:12 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 23 Feb 2014 13:16:20 +0000 (14:16 +0100)
src/VfsNs.cpp
src/VfsNs.h

index f79c10f..10e84bd 100644 (file)
@@ -22,8 +22,9 @@
 #define VFS_XATTR_LENGTH 5
 #define VFS_XATTR_TYPE_PERMS 1
 #define VFS_XATTR_TYPE_NONPERMS 2
-#define VFS_XATTR_TYPE_USER 4
-#define VFS_XATTR_TYPE_ALL 7
+#define VFS_XATTR_TYPE_REPLICA 4
+#define VFS_XATTR_TYPE_USER 8
+#define VFS_XATTR_TYPE_ALL 15
 #define IS_VFS_XATTR(NAME) (strncmp((NAME), VFS_XATTR, VFS_XATTR_LENGTH) == 0)
 #define IS_VFS_FILE(NAME) (strncmp((NAME), VFS_FILE, VFS_FILE_LENGTH) == 0)
 
@@ -31,6 +32,9 @@
 #define VFS_UID_NONE xStat.stat.st_uid
 #define VFS_GID_NONE xStat.stat.st_gid
 
+#define INODE_CACHE_SIZE 256
+
+
 using namespace dmlite;
 
 
@@ -152,7 +156,7 @@ std::string VfsCatalog::getWorkingDir(void) throw (DmException)
 
 
 ///
-/// Update ExtendedStat fields related to permissions.
+/// Update ExtendedStat fields related to permissions or replicas.
 /// Requires user extended attributes retrieved with VFS_XATTR_TYPE_PERMS or
 /// VFS_XATTR_TYPE_ALL.
 ///
@@ -161,9 +165,10 @@ std::string VfsCatalog::getWorkingDir(void) throw (DmException)
 ///                attributes).
 ///
 void VfsCatalog::vfsUpdateStat(ExtendedStat &xStat, Extensible xattrs) throw (DmException) {
-  std::string modeStr;
+  std::string modeStr, key;
   mode_t mode;
   char *parse;
+  Extensible::const_iterator it;
 
   if (xattrs.hasField(VFS_XATTR "acl"))
     xStat.acl = Acl(xattrs.getString(VFS_XATTR "acl"));
@@ -192,6 +197,13 @@ void VfsCatalog::vfsUpdateStat(ExtendedStat &xStat, Extensible xattrs) throw (Dm
       xStat.stat.st_mode = mode;
     }
   }
+
+  // copy replicas
+  for (it = xattrs.begin(); it != xattrs.end(); it++) {
+    key = it->first;
+    if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0)
+      xStat[key] = xattrs[key];
+  }
 }
 
 
@@ -245,7 +257,7 @@ void VfsCatalog::vfsUpdateExtendedStat(ExtendedStat &xStat, Extensible xattrs) t
 /// @param follow  follow symlinks (to use stat() or lstat())
 /// @param perms   get information related to permissions
 //
-ExtendedStat VfsCatalog::vfsSimpleStat(const std::string& name, const std::string& path, const std::string& lpath, bool follow, bool perms) throw (DmException) {
+ExtendedStat VfsCatalog::vfsSimpleStat(const std::string& name, const std::string& path, const std::string& lpath, bool follow, bool perms, bool replicas) throw (DmException) {
   ExtendedStat xStat;
   struct stat  fstat;
   Extensible xattrs;
@@ -261,9 +273,9 @@ ExtendedStat VfsCatalog::vfsSimpleStat(const std::string& name, const std::strin
   xStat.status  = ExtendedStat::kOnline;
   xStat.acl     = Acl();
 
-  if (perms) {
+  if (perms || replicas) {
     try {
-      xattrs = vfsGetXattrs(path, lpath, true, VFS_XATTR_TYPE_PERMS);
+      xattrs = vfsGetXattrs(path, lpath, true, (perms ? VFS_XATTR_TYPE_PERMS : 0) | (replicas ? VFS_XATTR_TYPE_REPLICA : 0));
     } catch (DmException& e) {
       // ignore - default owner and no ACLs
     }
@@ -285,7 +297,7 @@ ExtendedStat VfsCatalog::vfsSimpleStat(const std::string& name, const std::strin
 /// @param follow  follow symlinks
 /// @param parms   get information related to permissions
 ///
-ExtendedStat VfsCatalog::vfsExtendedStat(const std::string& path, bool follow, bool perms) throw (DmException) {
+ExtendedStat VfsCatalog::vfsExtendedStat(const std::string& path, bool follow, bool perms, bool replicas) throw (DmException) {
   ExtendedStat meta;
   std::vector<std::string> components;
   std::string dir;
@@ -301,13 +313,13 @@ ExtendedStat VfsCatalog::vfsExtendedStat(const std::string& path, bool follow, b
     if (i < 2) dir += components[i];
     else dir = dir + "/" + components[i];
 
-    meta = vfsSimpleStat(components[i], dir, getLocalPath(dir), true, true);
+    meta = vfsSimpleStat(components[i], dir, getLocalPath(dir), true, true, false);
 
     if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IEXEC) != 0)
       vfsThrow(EACCES, "not enough permissions for '%s' to list '%s'", clientName.c_str(), meta.name.c_str());
   }
 
-  return vfsSimpleStat(components.back(), path, getLocalPath(path), follow, perms);
+  return vfsSimpleStat(components.back(), path, getLocalPath(path), follow, perms, replicas);
 }
 
 
@@ -322,7 +334,7 @@ ExtendedStat VfsCatalog::extendedStat(const std::string& path, bool follow) thro
   if (vfsCheckPermissions(path, S_IREAD))
     vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
 
-  meta = vfsExtendedStat(path, follow, false);
+  meta = vfsExtendedStat(path, follow, false, false);
 
   //
   // User extended attributes are not supported on symlinks. We need the same
@@ -357,7 +369,9 @@ ExtendedStat VfsCatalog::extendedStat(const std::string& path, bool follow) thro
     meta.stat.st_nlink = count;
   }
 #endif
-  
+
+  vfsUpdateInode(meta, path);
+
   return meta;
 }
 
@@ -409,20 +423,70 @@ bool VfsCatalog::access(const std::string& path, int mode) throw (DmException)
 
 bool VfsCatalog::accessReplica(const std::string& replica, int mode) throw (DmException)
 {
-  return this->access(replica, mode);
+  vfsThrow(ENOSYS, "searching by replica file names not supported");
+  return false;
 }
 
 
 void VfsCatalog::addReplica(const Replica& replica) throw (DmException)
 {
-  vfsThrow(EACCES, "write mode not supported");
+  std::map<ino_t,std::string>::const_iterator it;
+  std::string path, attr;
+  ExtendedStat meta;
+  Extensible xattrs;
+  int64_t replicaid;
+  char buf[20];
+
+  if (vfsCheckPermissions("", S_IWRITE))
+    vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
+
+  it = inodes.find(replica.fileid);
+  if (it == inodes.end())
+    vfsThrow(EINVAL, "fileid %lu not found in the catalog cache", replica.fileid);
+  path = it->second;
+
+  meta = this->vfsExtendedStat(path, true, true, true);
+  if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
+    vfsThrow(EACCES, "not enough permissions for '%s' to add replica '%s'", clientName.c_str(), path.c_str());
+
+  replicaid = 0;
+  do {
+    replicaid++;
+    snprintf(buf, sizeof buf, "%ld", replicaid);
+    attr = std::string(VFS_XATTR "replica.") + buf;
+  } while (meta.hasField(attr));
+
+  vfsSetXattr(path, getLocalPath(path), attr, replica.rfn, ATTR_CREATE);
+  debug("added '%s' to '%s'", replica.rfn.c_str(), path.c_str());
 }
 
 
 
 void VfsCatalog::deleteReplica(const Replica& replica) throw (DmException)
 {
-  vfsThrow(EACCES, "write mode not supported");
+  std::map<ino_t,std::string>::const_iterator it;
+  std::string path;
+  ExtendedStat meta;
+  char buf[VFS_XATTR_LENGTH + 8 + 20 + 1];
+
+  if (vfsCheckPermissions("", S_IWRITE))
+    vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
+
+  it = inodes.find(replica.fileid);
+  if (it == inodes.end())
+    vfsThrow(EINVAL, "fileid %lu not found in the catalog cache", replica.fileid);
+  path = it->second;
+
+  meta = this->vfsExtendedStat(path, true, true, true);
+  if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
+    vfsThrow(EACCES, "not enough permissions for '%s' to add replica '%s'", clientName.c_str(), path.c_str());
+
+  snprintf(buf, sizeof buf, VFS_XATTR "replica." "%ld", replica.replicaid);
+  if (meta.hasField(buf)) {
+    vfsRemoveXattr(path, getLocalPath(path), buf, 0);
+  } else {
+    vfsThrow(DMLITE_NO_SUCH_REPLICA, "replica %lu of '%s' doesn't exist", replica.replicaid, path.c_str());
+  }
 }
 
 
@@ -431,35 +495,54 @@ std::vector<Replica> VfsCatalog::getReplicas(const std::string& path) throw (DmE
 {
   Replica      replica;
   ExtendedStat xStat;
+  std::vector<Replica> repList;
+  Extensible::const_iterator it;
+  std::string key, value;
+  long int replicaid;
+  Url url;
+  char buf[20];
 
   if (vfsCheckPermissions(path, S_IREAD))
     vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
 
-  xStat = this->vfsExtendedStat(path, true, true);
+  xStat = this->vfsExtendedStat(path, true, true, true);
   if (checkPermissions(this->secCtx_, xStat.acl, xStat.stat, S_IREAD) != 0)
     vfsThrow(EACCES, "not enough permissions for '%s' to read '%s'", clientName.c_str(), path.c_str());
 
   if (S_ISDIR(xStat.stat.st_mode))
     vfsThrow(EISDIR, "directories do not have replicas");
-  replica.replicaid  = 0;
-  replica.atime      = xStat.stat.st_atime;
-  replica.fileid     = xStat.stat.st_ino;
-  replica.nbaccesses = 0;
-  replica.ptime      = 0;
-  replica.ltime      = 0;
-  replica.type       = Replica::kPermanent;
-  replica.status     = Replica::kAvailable;
-  replica.server     = this->hostName_;
-  replica["pool"]    = std::string("vfs");
-  
-  std::string wd = this->getWorkingDir();
-  if (wd.empty() || path[0] == '/')
-    replica.rfn = path;
-  else
-    replica.rfn = wd + "/" + path;
 
-  return std::vector<Replica>(1, replica);
+  for (it = xStat.begin(); it != xStat.end(); it++) {
+    key = it->first;
+    if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
+      value = xStat.getString(key);
+
+      if (sscanf(key.c_str(), VFS_XATTR "replica.%ld", &replicaid) != 1)
+        vfsThrow(EINVAL, "can't parse replica attribute name '%s'", key.c_str());
+
+      url = Url(value);
+      if (url.port) snprintf(buf, sizeof buf, "%u", url.port);
+      else buf[0] = '\0';
+
+      replica = Replica();
+
+      replica.replicaid  = replicaid;
+      replica.atime      = xStat.stat.st_atime;
+      replica.fileid     = xStat.stat.st_ino;
+      replica.nbaccesses = 0;
+      replica.ptime      = 0;
+      replica.ltime      = 0;
+      replica.type       = Replica::kPermanent;
+      replica.status     = Replica::kAvailable;
+      replica.server     = url.domain + (buf[0] ? std::string(":") + buf : "");
+      replica.rfn       = value;
+      replica["pool"]    = std::string("vfs");
+
+      repList.push_back(replica);
+    }
+  }
+
+  return repList;
 }
 
 
@@ -525,7 +608,7 @@ void VfsCatalog::unlink(const std::string& path) throw (DmException)
   // Sticky bit set ==> only directory or file owner can delete
   if ((parent.stat.st_mode & S_ISVTX) == S_ISVTX) {
     // not follow symlinks (remove them instead)
-    ExtendedStat file = this->vfsSimpleStat(name, path, lpath, false, true);
+    ExtendedStat file = this->vfsSimpleStat(name, path, lpath, false, true, false);
     if (getUid(this->secCtx_) != file.stat.st_uid &&
         getUid(this->secCtx_) != parent.stat.st_uid) {
       vfsThrow(EACCES, "not enough permissions for '%s' to unlink '%s' (sticky bit set)", clientName.c_str(), path.c_str());
@@ -560,7 +643,7 @@ void VfsCatalog::create(const std::string& path, mode_t mode) throw (DmException
 
   try {
     // follow symlinks
-    file = this->vfsSimpleStat(name, path, lpath, true, true);
+    file = this->vfsSimpleStat(name, path, lpath, true, true, true);
   } catch (DmException e) {
     code = DMLITE_ERRNO(e.code());
     if (code != ENOENT) throw;
@@ -590,14 +673,20 @@ void VfsCatalog::create(const std::string& path, mode_t mode) throw (DmException
     if (!newAcl.empty())
       vfsSetAcl(path, lpath, newAcl);
   } else {
+    Extensible::const_iterator it;
+
     // Truncate
     if (S_ISDIR(file.stat.st_mode))
       vfsThrow(EISDIR, "'%s' is directory, can not truncate", path.c_str());
     if (getUid(this->secCtx_) != file.stat.st_uid &&
        checkPermissions(this->secCtx_, file.acl, file.stat, S_IWRITE) != 0) {
       vfsThrow(EACCES, "not enough permissions for '%s' to truncate '%s'", clientName.c_str(), path.c_str());
-    // TODO: check replicas
     }
+    // But check replicas first
+    for (it = file.begin(); it != file.end() && it->first.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") != 0; it++);
+    if (it != file.end())
+      vfsThrow(EACCES, "can't truncate '%s' with replicas", path.c_str());
+    // FIXME: setSize()
     wrapCall(truncate(lpath.c_str(), 0));
   }
 }
@@ -1104,6 +1193,8 @@ ExtendedStat* VfsCatalog::readDirx(Directory* dir) throw (DmException)
   }
   vfsUpdateExtendedStat(privateDir->stat, xattrs);
 
+  vfsUpdateInode(privateDir->stat, privateDir->path + '/' + ent->d_name);
+
   return &(privateDir->stat);
 }
 
@@ -1410,6 +1501,8 @@ Extensible VfsCatalog::vfsGetXattrs(const std::string& path, const std::string&
             strcmp(entry->a_name + VFS_XATTR_LENGTH, "owner") == 0)
         {
           attrType = VFS_XATTR_TYPE_PERMS;
+        } else if (strncmp(entry->a_name + VFS_XATTR_LENGTH, "replica", 7) == 0) {
+          attrType = VFS_XATTR_TYPE_REPLICA;
         } else {
           attrType = VFS_XATTR_TYPE_NONPERMS;
         }
@@ -1562,3 +1655,26 @@ void VfsCatalog::vfsSetAcl(const std::string& path, const std::string& lpath, Ac
   else
     vfsSetXattr(path, lpath, VFS_XATTR "acl", acl.serialize(), 0);
 }
+
+
+
+void VfsCatalog::vfsUpdateInode(const ExtendedStat& meta, const std::string& path) {
+  std::string abspath;
+
+  // update inode cache
+  if (S_ISREG(meta.stat.st_mode)) {
+    // absolutize
+    if (path[0] != '/') abspath = this->getWorkingDir() + "/" + path;
+    else abspath = path;
+    // remove least recently used item if needed
+    if (inodes.size() >= INODE_CACHE_SIZE) {
+      inodes.erase(inodes_lru.front());
+      inodes_lru.pop_front();
+    }
+    // add to inode cache
+    inodes[meta.stat.st_ino] = abspath;
+    inodes_lru.remove(meta.stat.st_ino);
+    inodes_lru.push_back(meta.stat.st_ino);
+    //debug("inode %lu: %s", meta.stat.st_ino, abspath.c_str());
+  }
+}
index 06e433a..070141c 100644 (file)
@@ -108,8 +108,8 @@ namespace dmlite {
     const std::string getLocalPath(const std::string &path);
     void vfsUpdateStat(ExtendedStat &xStat, Extensible xattrs) throw (DmException);
     void vfsUpdateExtendedStat(ExtendedStat &xStat, Extensible xattrs) throw (DmException);
-    ExtendedStat vfsSimpleStat(const std::string& name, const std::string& path, const std::string& lpath, bool follow, bool perms) throw (DmException);
-    ExtendedStat vfsExtendedStat(const std::string& path, bool follow = true, bool perms = true) throw (DmException);
+    ExtendedStat vfsSimpleStat(const std::string& name, const std::string& path, const std::string& lpath, bool follow, bool perms, bool replicas) throw (DmException);
+    ExtendedStat vfsExtendedStat(const std::string& path, bool follow = true, bool perms = true, bool replicas = false) throw (DmException);
     PrivateDir* vfsOpenDir(const std::string& lpath, const std::string& path) throw (DmException);
     Extensible vfsGetXattrs(const std::string& path, const std::string& lpath, bool follow, int amount) throw (DmException);
     std::string vfsGetXattr(const std::string& path, const std::string &lpath, const std::string key, int flags);
@@ -122,12 +122,15 @@ namespace dmlite {
     void vfsSetOwner(const std::string& path, const std::string& lpath, uid_t newUid, gid_t newGid, bool followSymLink) throw (DmException);
     void vfsSetOwner(const std::string& path, const std::string& lpath, const ExtendedStat& meta, uid_t newUid, gid_t newGid, bool followSymLink) throw (DmException);
     void vfsSetAcl(const std::string& path, const std::string& lpath, Acl acl) throw (DmException);
+    void vfsUpdateInode(const ExtendedStat& meta, const std::string& lpath);
 
     StackInstance* si_;
     const SecurityContext* secCtx_;
     std::string hostName_;
     std::string prefix_;
     mode_t umask_;
+    std::map<ino_t,std::string> inodes;
+    std::list<ino_t> inodes_lru;
 
    private:
     regex_t *allowRegex, *denyRegex, *allowWriteRegex, *denyWriteRegex;