Prepare for RFN support:
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Dec 2013 19:39:32 +0000 (20:39 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 23 Feb 2014 13:16:20 +0000 (14:16 +0100)
- maintain RFN tree
- files with catalog entry name as content
- sanity checks for "foreign" RFN:
  - avoid overwriting (=no duplicitis of RFN)
  - check content before deleting (=not deleting RFN of other catalog entry)
- update RFN tree in rename() and unlink() calls
- cleanup empty directories

src/Vfs.h
src/VfsNs.cpp
src/VfsNs.h

index 5888334..68deab8 100644 (file)
--- a/src/Vfs.h
+++ b/src/Vfs.h
@@ -14,6 +14,7 @@
 
 #define VFS_FILE ".#vfs."
 #define VFS_FILE_LENGTH 6
+#define VFS_REPLICAS_TREE ".#vfs.replicas"
 
 #ifdef DEBUG
   #ifdef __GNUC__
index fe05072..8e50fe0 100644 (file)
@@ -432,7 +432,7 @@ bool VfsCatalog::accessReplica(const std::string& replica, int mode) throw (DmEx
 void VfsCatalog::addReplica(const Replica& replica) throw (DmException)
 {
   std::map<ino_t,std::string>::const_iterator it;
-  std::string path, attr1, attr2, attrval1, attrval2;
+  std::string path, lpath, attr1, attr2, attrval1, attrval2;
   ExtendedStat meta;
   Extensible xattrs;
   int64_t replicaid;
@@ -445,6 +445,7 @@ void VfsCatalog::addReplica(const Replica& replica) throw (DmException)
   if (it == inodes.end())
     vfsThrow(EINVAL, "fileid %lu not found in the catalog cache", replica.fileid);
   path = it->second;
+  lpath = getLocalPath(path);
 
   meta = this->vfsExtendedStat(path, true, true, true);
   if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
@@ -469,19 +470,21 @@ void VfsCatalog::addReplica(const Replica& replica) throw (DmException)
     if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
       rep = replicaDeserialize(value, "");
       if (rep.rfn.compare(replica.rfn) == 0) {
-        vfsThrow(EEXIST, "RFN '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str());
+        vfsThrow(EEXIST, "replica file name '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str());
       }
     }
   }
 
+  rfnCreate(path, replica.rfn);
   replicaSerialize(replica, attrval1, attrval2);
 
-  vfsSetXattr(path, getLocalPath(path), attr1, attrval1, ATTR_CREATE);
+  vfsSetXattr(path, lpath, attr1, attrval1, ATTR_CREATE);
   if (attrval2.empty()) {
-    if (meta.hasField(attr2)) vfsRemoveXattr(path, getLocalPath(path), attr2, 0);
+    if (meta.hasField(attr2)) vfsRemoveXattr(path, lpath, attr2, 0);
   } else {
-    vfsSetXattr(path, getLocalPath(path), attr2, attrval2, 0);
+    vfsSetXattr(path, lpath, attr2, attrval2, 0);
   }
+
   debug("added '%s' to '%s'", replica.rfn.c_str(), path.c_str());
 }
 
@@ -490,7 +493,7 @@ void VfsCatalog::addReplica(const Replica& replica) throw (DmException)
 void VfsCatalog::deleteReplica(const Replica& replica) throw (DmException)
 {
   std::map<ino_t,std::string>::const_iterator it;
-  std::string path;
+  std::string path, attr1, attr2;
   ExtendedStat meta;
   char buf[20];
 
@@ -504,12 +507,20 @@ void VfsCatalog::deleteReplica(const Replica& replica) throw (DmException)
 
   meta = this->vfsExtendedStat(path, true, true, true);
   if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
-    vfsThrow(EACCES, "not enough permissions for '%s' to add replica '%s'", clientName.c_str(), path.c_str());
+    vfsThrow(EACCES, "not enough permissions for '%s' to delete replica '%s'", clientName.c_str(), path.c_str());
 
   snprintf(buf, sizeof buf, "%lu", replica.replicaid);
-  if (meta.hasField(std::string(VFS_XATTR "replica.") + buf)) {
-    vfsRemoveXattr(path, getLocalPath(path), std::string(VFS_XATTR "replica.") + buf, 0);
-    vfsRemoveXattr(path, getLocalPath(path), std::string(VFS_XATTR "repattrs.") + buf, 0);
+  attr1 = std::string(VFS_XATTR "replica.") + buf;
+  attr2 = std::string(VFS_XATTR "repattrs.") + buf;
+  if (meta.hasField(attr1)) {
+    Replica rep;
+
+    // get RFN rather from the catalog
+    rep = replicaDeserialize(meta.getString(attr1), meta.getString(attr2, ""));
+    rfnDelete(path, rep.rfn);
+
+    vfsRemoveXattr(path, getLocalPath(path), attr1, 0);
+    vfsRemoveXattr(path, getLocalPath(path), attr2, 0);
   } else {
     vfsThrow(DMLITE_NO_SUCH_REPLICA, "replica %lu of '%s' doesn't exist", replica.replicaid, path.c_str());
   }
@@ -608,6 +619,7 @@ std::string VfsCatalog::readLink(const std::string& path) throw (DmException)
 void VfsCatalog::unlink(const std::string& path) throw (DmException)
 {
   std::string parentPath, name, lpath;
+  ExtendedStat file;
 
   if (vfsCheckPermissions(path, S_IWRITE))
     vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
@@ -621,16 +633,32 @@ void VfsCatalog::unlink(const std::string& path) throw (DmException)
 
   lpath = getLocalPath(path);
 
+  // not follow symlinks (remove them instead)
+  file = this->vfsSimpleStat(name, path, lpath, false, true, true);
+
   // Sticky bit set ==> only directory or file owner can delete
   if ((parent.stat.st_mode & S_ISVTX) == S_ISVTX) {
-    // not follow symlinks (remove them instead)
-    ExtendedStat file = this->vfsSimpleStat(name, path, lpath, false, true, false);
     if (getUid(this->secCtx_) != file.stat.st_uid &&
         getUid(this->secCtx_) != parent.stat.st_uid) {
       vfsThrow(EACCES, "not enough permissions for '%s' to unlink '%s' (sticky bit set)", clientName.c_str(), path.c_str());
     }
   }
 
+  // delete replica entries
+  if (S_ISREG(file.stat.st_mode)) {
+    for (Extensible::const_iterator it = file.begin(); it != file.end(); it++) {
+      std::string key, value;
+      Replica rep;
+
+      key = it->first;
+      if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
+        value = Extensible::anyToString(it->second);
+        rep = replicaDeserialize(value, "");
+        rfnDelete(path, rep.rfn);
+      }
+    }
+  }
+
   wrapCall(::unlink(lpath.c_str()));
 }
 
@@ -1267,6 +1295,7 @@ void VfsCatalog::makeDir(const std::string& path, mode_t mode) throw (DmExceptio
 void VfsCatalog::rename(const std::string& oldPath, const std::string& newPath) throw (DmException)
 {
   std::string oldParentPath, newParentPath, oldName, newName;
+  ExtendedStat old;
 
   if (vfsCheckPermissions(oldPath, S_IWRITE) ||
       vfsCheckPermissions(newPath, S_IWRITE))
@@ -1282,15 +1311,33 @@ void VfsCatalog::rename(const std::string& oldPath, const std::string& newPath)
   if (checkPermissions(this->secCtx_, newParent.acl, newParent.stat, S_IWRITE) != 0)
     vfsThrow(EACCES, "not enough permissions for '%s' on destination '%s'", clientName.c_str(), newPath.c_str());
 
+  old = this->vfsExtendedStat(oldPath, true, true, true);
+
   // Check sticky
   if (oldParent.stat.st_mode & S_ISVTX) {
-    ExtendedStat old = this->vfsExtendedStat(oldPath);
     if (getUid(this->secCtx_) != oldParent.stat.st_uid &&
         getUid(this->secCtx_) != old.stat.st_uid &&
         checkPermissions(this->secCtx_, old.acl, old.stat, S_IWRITE) != 0)
        vfsThrow(EACCES, "not enough permissions for '%s' (sticky bit set on parent '%s')", clientName.c_str(), oldParentPath.c_str());
   }
 
+  // update replica entries
+  if (S_ISREG(old.stat.st_mode)) {
+    for (Extensible::const_iterator it = old.begin(); it != old.end(); it++) {
+      std::string key, value;
+      Replica rep;
+
+      key = it->first;
+      if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
+        value = Extensible::anyToString(it->second);
+        rep = replicaDeserialize(value, "");
+        rfnDelete(oldPath, rep.rfn);
+        rfnCreate(newPath, rep.rfn);
+      }
+    }
+  }
+
+
   // XXX: test-rename should probably accept both ENOTEMPTY and EEXIST instead
   //      of changing it here?
   try {
@@ -1383,7 +1430,7 @@ Replica VfsCatalog::getReplicaByRFN(const std::string& rfn) throw (DmException)
 void VfsCatalog::updateReplica(const Replica& replica) throw (DmException)
 {
   std::map<ino_t,std::string>::const_iterator it;
-  std::string path, attr1, attr2, attrval1,attrval2;
+  std::string path, lpath, attr1, attr2, attrval1,attrval2;
   ExtendedStat meta;
   char buf[20];
 
@@ -1394,6 +1441,7 @@ void VfsCatalog::updateReplica(const Replica& replica) throw (DmException)
   if (it == inodes.end())
     vfsThrow(EINVAL, "fileid %lu not found in the catalog cache", replica.fileid);
   path = it->second;
+  lpath = getLocalPath(path);
 
   meta = this->vfsExtendedStat(path, true, true, true);
   if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
@@ -1405,7 +1453,9 @@ void VfsCatalog::updateReplica(const Replica& replica) throw (DmException)
     vfsThrow(DMLITE_NO_SUCH_REPLICA, "replica %lu of '%s' doesn't exist", replica.replicaid, path.c_str());
   attr2 = std::string(VFS_XATTR "repattrs.") + buf;
 
-  // check duplicity of RFN
+  // browse replicas:
+  // - check duplicity of RFN
+  // - remove previous replica entry
   for (Extensible::const_iterator it = meta.begin(); it != meta.end(); it++) {
     std::string key, value;
     Replica rep;
@@ -1415,21 +1465,27 @@ void VfsCatalog::updateReplica(const Replica& replica) throw (DmException)
     value = Extensible::anyToString(it->second);
     if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
       repid = atoll(key.c_str() + VFS_XATTR_LENGTH + 8);
-      // skip the currently updated replica (RFN may be the same)
-      if (repid == replica.replicaid) continue;
       rep = replicaDeserialize(value, "");
-      if (rep.rfn.compare(replica.rfn) == 0) {
-        vfsThrow(EEXIST, "RFN '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str());
+      if (repid == replica.replicaid) {
+        // currently updated replica
+        // ==> remove its entry
+        rfnDelete(path, rep.rfn);
+      } else {
+        // check duplicity
+        if (rep.rfn.compare(replica.rfn) == 0) {
+          vfsThrow(EEXIST, "replica file name '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str());
+        }
       }
     }
   }
 
+  rfnCreate(path, replica.rfn);
   replicaSerialize(replica, attrval1, attrval2);
-  vfsSetXattr(path, getLocalPath(path), attr1, attrval1, ATTR_REPLACE);
+  vfsSetXattr(path, lpath, attr1, attrval1, ATTR_REPLACE);
   if (attrval2.empty()) {
-    if (meta.hasField(attr2)) vfsRemoveXattr(path, getLocalPath(path), attr2, 0);
+    if (meta.hasField(attr2)) vfsRemoveXattr(path, lpath, attr2, 0);
   } else {
-    vfsSetXattr(path, getLocalPath(path), attr2, attrval2, 0);
+    vfsSetXattr(path, lpath, attr2, attrval2, 0);
   }
   debug("updated '%s' to '%s'", replica.rfn.c_str(), path.c_str());
 }
@@ -1791,6 +1847,7 @@ void VfsCatalog::replicaSerialize(const Replica& replica, std::string& attr1, st
 }
 
 
+
 Replica VfsCatalog::replicaDeserialize(const std::string& attr1, const std::string& attr2) {
   Replica replica;
   int len, ret;
@@ -1799,6 +1856,8 @@ Replica VfsCatalog::replicaDeserialize(const std::string& attr1, const std::stri
   const char *buf = attr1.c_str();
   char *p;
 
+  replica = Replica();
+
   ret = sscanf(buf, "%c %c %lu %lu %lu %lu %200s %n", &status, &type, &replica.nbaccesses, &replica.atime, &replica.ptime, &replica.ltime, server, &len);
   if (ret != 7 && ret != 8)
     vfsThrowErrno("deserializing of '%s' failed", buf);
@@ -1816,3 +1875,166 @@ Replica VfsCatalog::replicaDeserialize(const std::string& attr1, const std::stri
 
   return replica;
 }
+
+
+
+void VfsCatalog::rfnCreate(const std::string& path, const std::string& rfn) throw(DmException) {
+  Url url;
+  std::string curdir, file, lpath;
+  std::vector<std::string> components;
+  struct stat fstat;
+  FILE *f;
+
+  // replicas root directory
+  curdir = this->prefix_ + "/" + VFS_REPLICAS_TREE;
+  if (mkdir(curdir.c_str(), 0755) == 0)
+    debug("created replicas root directory '%s'", curdir.c_str());
+
+  // directory for each server (conflicts with proper names are OK and unlikely)
+  url = Url(rfn);
+  if (!url.domain.empty()) {
+    curdir += "/" + url.domain;
+    if (mkdir(curdir.c_str(), 0755) == 0)
+      debug("created replicas server directory '%s' for '%s'", curdir.c_str(), url.domain.c_str());
+  }
+
+  // RFN tree
+  components = Url::splitPath(url.path);
+  file = components.back();
+  components.pop_back();
+  if (components.size() >= 1 && components[0] != "/") components[0] = "/" + components[0];
+  for (size_t i = 0; i < components.size(); i++) {
+    // the first component always '/', let's use it as separator
+    if (i < 2) curdir += components[i];
+    else curdir = curdir + "/" + components[i];
+    // ignore all errors - just go from "/" toward deeper levels
+    if (mkdir(curdir.c_str(), 0755) == 0)
+      debug("created directory '%s'", curdir.c_str());
+  }
+
+  lpath = curdir + "/" + file;
+
+  // never rewrite - when updating, the old entry should be already removed
+  if (stat(lpath.c_str(), &fstat) == 0)
+    vfsThrow(EEXIST, "replica entry '%s' already exists", rfn.c_str());
+
+  // info about replica
+  if ((f = fopen(lpath.c_str(), "w")) == NULL)
+    wrapCall(f, "could not create replica entry '%s' for '%s'", rfn.c_str(), path.c_str());
+  try {
+    wrapCall(fputs(path.c_str(), f), "could not write information about replica '%s' for '%s'", rfn.c_str(), path.c_str());
+  } catch (DmException e) {
+    fclose(f);
+    throw;
+  }
+  wrapCall(fclose(f), "could not close replica entry for '%s'", rfn.c_str());
+  debug("created entry '%s' -> '%s'", rfn.c_str(), path.c_str());
+}
+
+
+
+void VfsCatalog::rfnDelete(const std::string& path, const std::string& rfn) throw(DmException) {
+  Url url;
+  std::string lpath, curdir;
+  FILE *f;
+  std::vector<std::string> components;
+  bool wasNonEmpty;
+
+  // local path to replica entry
+  url = Url(rfn);
+  lpath =  this->prefix_ + "/" + VFS_REPLICAS_TREE;
+  if (!url.domain.empty())
+    lpath += "/" + url.domain;
+  if (!url.path.empty() && url.path[0] != '/') lpath += "/";
+  lpath += url.path;
+  // RFN could be a directory ==> entry is just the file
+  if (lpath[lpath.size() - 1] == '/')
+    lpath.resize(lpath.size() - 1);
+
+  // check if we can remove replica information
+  if ((f = fopen(lpath.c_str(), "r")) == NULL) {
+    //
+    // be robust:
+    // - when entry doesn't exist, log the problem
+    // - throw up only on other errors
+    //
+    if (errno == ENOENT || errno == ENOTDIR) {
+      log(LOG_ERR, "replica entry '%s' doesn't exist, deleting replica from '%s' anyway", rfn.c_str(), path.c_str());
+    } else {
+      vfsThrowErrno("could not open replica entry '%s' for '%s'", rfn.c_str(), path.c_str());
+    }
+
+    return;
+  }
+  try {
+    fgets(this->buffer, sizeof this->buffer, f);
+    wrapCall(ferror(f), "error reading replica entry '%s' for '%s'", rfn.c_str(), path.c_str());
+  } catch (DmException e) {
+    fclose(f);
+    throw;
+  }
+  wrapCall(fclose(f), "could not close replica entry for '%s'", rfn.c_str());
+
+  // we should not delete foreign entries
+  if (path.compare(this->buffer) != 0)
+    vfsThrow(EPERM, "can't delete replica entry '%s' for '%s', belongs to '%s'", rfn.c_str(), path.c_str(), this->buffer);
+
+  if (::unlink(lpath.c_str()) == 0)
+    debug("deleted entry '%s' -> '%s'", rfn.c_str(), path.c_str());
+
+  // cleanup directories
+  try {
+    lpath =  this->prefix_ + "/" + VFS_REPLICAS_TREE;
+    curdir = "";
+    if (!url.domain.empty())
+      curdir += "/" + url.domain;
+    if (!url.path.empty() && url.path[0] != '/') curdir += "/";
+    components = Url::splitPath(curdir + url.path);
+
+    wasNonEmpty = false;
+    do {
+      components.pop_back();
+      lpath =  this->prefix_ + "/" + VFS_REPLICAS_TREE + Url::joinPath(components);
+
+      if (isEmptyDir(lpath)) {
+        wrapCall(rmdir(lpath.c_str()), "could not remove replica entries subdirectory '%s'", lpath.c_str());
+        debug("deleted replica entry subdirectory '%s'", lpath.c_str());
+      } else
+        wasNonEmpty = true;
+    } while (!wasNonEmpty && components.size() > 2);
+  } catch (DmException e) {
+    // not critical, but log a warning
+#ifndef DEBUG
+    syslog(LOG_WARN, "%s", e.what());
+#endif
+  }
+}
+
+
+
+///
+/// test emptiness of directory
+///
+/// On errors, just log the problem and consider the directory non-empty.
+///
+bool VfsCatalog::isEmptyDir(const std::string& lpath) throw (DmException) {
+  DIR *dir;
+  struct dirent entry, *rentry;
+  int ret;
+
+  wrapCall((dir = opendir(lpath.c_str())), "could not open open replica entries subdirectory '%s'", lpath.c_str());
+
+  rentry = NULL;
+  while ((ret = readdir_r(dir, &entry, &rentry)) == 0 && rentry) {
+    if (strcmp(entry.d_name, ".") != 0 && strcmp(entry.d_name, "..") != 0) {
+      closedir(dir);
+      return false;
+    }
+  }
+  closedir(dir);
+
+  if (ret)
+    vfsThrow(ret, "error reading replica entries subdirectory '%s'", lpath.c_str());
+
+  return true;
+}
index ea1ec72..7338211 100644 (file)
@@ -126,6 +126,9 @@ namespace dmlite {
 
     void replicaSerialize(const Replica& replica, std::string& attr1, std::string& attr2);
     Replica replicaDeserialize(const std::string& attr1, const std::string& attr2);
+    void rfnCreate(const std::string& path, const std::string& rfn) throw(DmException);
+    void rfnDelete(const std::string& path, const std::string& rfn) throw(DmException);
+    bool isEmptyDir(const std::string& lpath) throw (DmException);
 
     StackInstance* si_;
     const SecurityContext* secCtx_;