Support full replicas serialization and additional replicas attributes.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 1 Dec 2013 20:12:35 +0000 (21:12 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 23 Feb 2014 13:16:20 +0000 (14:16 +0100)
src/VfsNs.cpp
src/VfsNs.h

index 70dd977..0bf63b9 100644 (file)
@@ -201,7 +201,8 @@ void VfsCatalog::vfsUpdateStat(ExtendedStat &xStat, Extensible xattrs) throw (Dm
   // copy replicas
   for (it = xattrs.begin(); it != xattrs.end(); it++) {
     key = it->first;
-    if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0)
+    if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0 ||
+        key.compare(0, VFS_XATTR_LENGTH + 9, VFS_XATTR "repattrs.") == 0)
       xStat[key] = xattrs[key];
   }
 }
@@ -431,7 +432,7 @@ bool VfsCatalog::accessReplica(const std::string& replica, int mode) throw (DmEx
 void VfsCatalog::addReplica(const Replica& replica) throw (DmException)
 {
   std::map<ino_t,std::string>::const_iterator it;
-  std::string path, attr;
+  std::string path, attr1, attr2, attrval1, attrval2;
   ExtendedStat meta;
   Extensible xattrs;
   int64_t replicaid;
@@ -453,10 +454,18 @@ void VfsCatalog::addReplica(const Replica& replica) throw (DmException)
   do {
     replicaid++;
     snprintf(buf, sizeof buf, "%ld", replicaid);
-    attr = std::string(VFS_XATTR "replica.") + buf;
-  } while (meta.hasField(attr));
+    attr1 = std::string(VFS_XATTR "replica.") + buf;
+  } while (meta.hasField(attr1));
+  attr2 = std::string(VFS_XATTR "repattrs.") + buf;
 
-  vfsSetXattr(path, getLocalPath(path), attr, replica.rfn, ATTR_CREATE);
+  replicaSerialize(replica, attrval1, attrval2);
+
+  vfsSetXattr(path, getLocalPath(path), attr1, attrval1, ATTR_CREATE);
+  if (attrval2.empty()) {
+    if (meta.hasField(attr2)) vfsRemoveXattr(path, getLocalPath(path), attr2, 0);
+  } else {
+    vfsSetXattr(path, getLocalPath(path), attr2, attrval2, 0);
+  }
   debug("added '%s' to '%s'", replica.rfn.c_str(), path.c_str());
 }
 
@@ -467,7 +476,7 @@ void VfsCatalog::deleteReplica(const Replica& replica) throw (DmException)
   std::map<ino_t,std::string>::const_iterator it;
   std::string path;
   ExtendedStat meta;
-  char buf[VFS_XATTR_LENGTH + 8 + 20 + 1];
+  char buf[20];
 
   if (vfsCheckPermissions("", S_IWRITE))
     vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
@@ -481,9 +490,10 @@ void VfsCatalog::deleteReplica(const Replica& replica) throw (DmException)
   if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
     vfsThrow(EACCES, "not enough permissions for '%s' to add replica '%s'", clientName.c_str(), path.c_str());
 
-  snprintf(buf, sizeof buf, VFS_XATTR "replica." "%ld", replica.replicaid);
-  if (meta.hasField(buf)) {
-    vfsRemoveXattr(path, getLocalPath(path), buf, 0);
+  snprintf(buf, sizeof buf, "%lu", replica.replicaid);
+  if (meta.hasField(std::string(VFS_XATTR "replica.") + buf)) {
+    vfsRemoveXattr(path, getLocalPath(path), std::string(VFS_XATTR "replica.") + buf, 0);
+    vfsRemoveXattr(path, getLocalPath(path), std::string(VFS_XATTR "repattrs.") + buf, 0);
   } else {
     vfsThrow(DMLITE_NO_SUCH_REPLICA, "replica %lu of '%s' doesn't exist", replica.replicaid, path.c_str());
   }
@@ -497,7 +507,7 @@ std::vector<Replica> VfsCatalog::getReplicas(const std::string& path) throw (DmE
   ExtendedStat xStat;
   std::vector<Replica> repList;
   Extensible::const_iterator it;
-  std::string key, value;
+  std::string key, key2, value, value2;
   long int replicaid;
   Url url;
   char buf[20];
@@ -517,26 +527,16 @@ std::vector<Replica> VfsCatalog::getReplicas(const std::string& path) throw (DmE
     if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
       value = xStat.getString(key);
 
-      if (sscanf(key.c_str(), VFS_XATTR "replica.%ld", &replicaid) != 1)
+      if (sscanf(key.c_str(), VFS_XATTR "replica.%lu", &replicaid) != 1)
         vfsThrow(EINVAL, "can't parse replica attribute name '%s'", key.c_str());
 
-      url = Url(value);
-      if (url.port) snprintf(buf, sizeof buf, "%u", url.port);
-      else buf[0] = '\0';
-
-      replica = Replica();
+      snprintf(buf, sizeof buf, "%lu", replicaid);
+      key2 = std::string(VFS_XATTR "repattrs.") + buf;
+      value2 = xStat.getString(key2, "");
+      replica = replicaDeserialize(value, value2);
 
       replica.replicaid  = replicaid;
-      replica.atime      = xStat.stat.st_atime;
       replica.fileid     = xStat.stat.st_ino;
-      replica.nbaccesses = 0;
-      replica.ptime      = 0;
-      replica.ltime      = 0;
-      replica.type       = Replica::kPermanent;
-      replica.status     = Replica::kAvailable;
-      replica.server     = url.domain + (buf[0] ? std::string(":") + buf : "");
-      replica.rfn       = value;
-      replica["pool"]    = std::string("vfs");
 
       repList.push_back(replica);
     }
@@ -1367,9 +1367,9 @@ Replica VfsCatalog::getReplicaByRFN(const std::string& rfn) throw (DmException)
 void VfsCatalog::updateReplica(const Replica& replica) throw (DmException)
 {
   std::map<ino_t,std::string>::const_iterator it;
-  std::string path;
+  std::string path, attr1, attr2, attrval1,attrval2;
   ExtendedStat meta;
-  char buf[VFS_XATTR_LENGTH + 8 + 20 + 1];
+  char buf[20];
 
   if (vfsCheckPermissions("", S_IWRITE))
     vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
@@ -1383,11 +1383,19 @@ void VfsCatalog::updateReplica(const Replica& replica) throw (DmException)
   if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
     vfsThrow(EACCES, "not enough permissions for '%s' to add replica '%s'", clientName.c_str(), path.c_str());
 
-  snprintf(buf, sizeof buf, VFS_XATTR "replica." "%ld", replica.replicaid);
-  if (!meta.hasField(buf))
+  snprintf(buf, sizeof buf, "%ld", replica.replicaid);
+  attr1 = std::string(VFS_XATTR "replica.") + buf;
+  if (!meta.hasField(attr1))
     vfsThrow(DMLITE_NO_SUCH_REPLICA, "replica %lu of '%s' doesn't exist", replica.replicaid, path.c_str());
+  attr2 = std::string(VFS_XATTR "repattrs.") + buf;
 
-  vfsSetXattr(path, getLocalPath(path), buf, replica.rfn, ATTR_REPLACE);
+  replicaSerialize(replica, attrval1, attrval2);
+  vfsSetXattr(path, getLocalPath(path), attr1, attrval1, ATTR_REPLACE);
+  if (attrval2.empty()) {
+    if (meta.hasField(attr2)) vfsRemoveXattr(path, getLocalPath(path), attr2, 0);
+  } else {
+    vfsSetXattr(path, getLocalPath(path), attr2, attrval2, 0);
+  }
   debug("updated '%s' to '%s'", replica.rfn.c_str(), path.c_str());
 }
 
@@ -1523,7 +1531,9 @@ Extensible VfsCatalog::vfsGetXattrs(const std::string& path, const std::string&
             strcmp(entry->a_name + VFS_XATTR_LENGTH, "owner") == 0)
         {
           attrType = VFS_XATTR_TYPE_PERMS;
-        } else if (strncmp(entry->a_name + VFS_XATTR_LENGTH, "replica", 7) == 0) {
+        } else if (strncmp(entry->a_name + VFS_XATTR_LENGTH, "replica", 7) == 0 ||
+                   strncmp(entry->a_name + VFS_XATTR_LENGTH, "repattrs", 8) == 0)
+        {
           attrType = VFS_XATTR_TYPE_REPLICA;
         } else {
           attrType = VFS_XATTR_TYPE_NONPERMS;
@@ -1700,3 +1710,74 @@ void VfsCatalog::vfsUpdateInode(const ExtendedStat& meta, const std::string& pat
     //debug("inode %lu: %s", meta.stat.st_ino, abspath.c_str());
   }
 }
+
+
+
+void VfsCatalog::replicaSerialize(const Replica& replica, std::string& attr1, std::string& attr2) {
+  char buf[256];
+  char status, type;
+  std::string server;
+  Url url;
+
+  status = replica.status;
+  switch (replica.status) {
+    case Replica::kAvailable:
+    case Replica::kBeingPopulated:
+    case Replica::kToBeDeleted:
+      break;
+    default:
+      status = Replica::kAvailable;
+  }
+
+  type = replica.type;
+  switch (replica.type) {
+    case Replica::kVolatile:
+    case Replica::kPermanent:
+      break;
+    default:
+      type = Replica::kPermanent;
+  }
+
+  // get the server from URL, if not specified
+  if (replica.server.empty()) {
+    url = Url(replica.rfn);
+    server = url.domain;
+  } else {
+    server = replica.server;
+  }
+
+  snprintf(buf, sizeof buf, "%c %c %lu %lu %lu %lu [%s] ", status, type, replica.nbaccesses, replica.atime, replica.ptime, replica.ltime, server.c_str());
+  attr1 = std::string(buf) + replica.rfn;
+
+  if (replica.size())
+    attr2 = replica.serialize();
+  else
+    attr2.clear();
+}
+
+
+Replica VfsCatalog::replicaDeserialize(const std::string& attr1, const std::string& attr2) {
+  Replica replica;
+  int len, ret;
+  char status, type;
+  char server[201];
+  const char *buf = attr1.c_str();
+  char *p;
+
+  ret = sscanf(buf, "%c %c %lu %lu %lu %lu %200s %n", &status, &type, &replica.nbaccesses, &replica.atime, &replica.ptime, &replica.ltime, server, &len);
+  if (ret != 7 && ret != 8)
+    vfsThrowErrno("deserializing of '%s' failed", buf);
+  replica.status = (Replica::ReplicaStatus)status;
+  replica.type = (Replica::ReplicaType)type;
+
+  replica.rfn = buf + len;
+
+  p = strchr(server, ']');
+  if (p) *p = '\0';
+  replica.server = server + 1;
+
+  if (!attr2.empty())
+    replica.deserialize(attr2);
+
+  return replica;
+}
index 070141c..ea1ec72 100644 (file)
@@ -124,6 +124,9 @@ namespace dmlite {
     void vfsSetAcl(const std::string& path, const std::string& lpath, Acl acl) throw (DmException);
     void vfsUpdateInode(const ExtendedStat& meta, const std::string& lpath);
 
+    void replicaSerialize(const Replica& replica, std::string& attr1, std::string& attr2);
+    Replica replicaDeserialize(const std::string& attr1, const std::string& attr2);
+
     StackInstance* si_;
     const SecurityContext* secCtx_;
     std::string hostName_;