From: František Dvořák Date: Thu, 5 Dec 2013 19:39:32 +0000 (+0100) Subject: Prepare for RFN support: X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=2d681a35f84430d6116fcb89b7bf9f9310d37d6e;p=dmlite-plugins-vfs.git Prepare for RFN support: - maintain RFN tree - files with catalog entry name as content - sanity checks for "foreign" RFN: - avoid overwriting (=no duplicitis of RFN) - check content before deleting (=not deleting RFN of other catalog entry) - update RFN tree in rename() and unlink() calls - cleanup empty directories --- diff --git a/src/Vfs.h b/src/Vfs.h index 5888334..68deab8 100644 --- a/src/Vfs.h +++ b/src/Vfs.h @@ -14,6 +14,7 @@ #define VFS_FILE ".#vfs." #define VFS_FILE_LENGTH 6 +#define VFS_REPLICAS_TREE ".#vfs.replicas" #ifdef DEBUG #ifdef __GNUC__ diff --git a/src/VfsNs.cpp b/src/VfsNs.cpp index fe05072..8e50fe0 100644 --- a/src/VfsNs.cpp +++ b/src/VfsNs.cpp @@ -432,7 +432,7 @@ bool VfsCatalog::accessReplica(const std::string& replica, int mode) throw (DmEx void VfsCatalog::addReplica(const Replica& replica) throw (DmException) { std::map::const_iterator it; - std::string path, attr1, attr2, attrval1, attrval2; + std::string path, lpath, attr1, attr2, attrval1, attrval2; ExtendedStat meta; Extensible xattrs; int64_t replicaid; @@ -445,6 +445,7 @@ void VfsCatalog::addReplica(const Replica& replica) throw (DmException) if (it == inodes.end()) vfsThrow(EINVAL, "fileid %lu not found in the catalog cache", replica.fileid); path = it->second; + lpath = getLocalPath(path); meta = this->vfsExtendedStat(path, true, true, true); if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0) @@ -469,19 +470,21 @@ void VfsCatalog::addReplica(const Replica& replica) throw (DmException) if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) { rep = replicaDeserialize(value, ""); if (rep.rfn.compare(replica.rfn) == 0) { - vfsThrow(EEXIST, "RFN '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str()); + vfsThrow(EEXIST, "replica file name '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str()); } } } + rfnCreate(path, replica.rfn); replicaSerialize(replica, attrval1, attrval2); - vfsSetXattr(path, getLocalPath(path), attr1, attrval1, ATTR_CREATE); + vfsSetXattr(path, lpath, attr1, attrval1, ATTR_CREATE); if (attrval2.empty()) { - if (meta.hasField(attr2)) vfsRemoveXattr(path, getLocalPath(path), attr2, 0); + if (meta.hasField(attr2)) vfsRemoveXattr(path, lpath, attr2, 0); } else { - vfsSetXattr(path, getLocalPath(path), attr2, attrval2, 0); + vfsSetXattr(path, lpath, attr2, attrval2, 0); } + debug("added '%s' to '%s'", replica.rfn.c_str(), path.c_str()); } @@ -490,7 +493,7 @@ void VfsCatalog::addReplica(const Replica& replica) throw (DmException) void VfsCatalog::deleteReplica(const Replica& replica) throw (DmException) { std::map::const_iterator it; - std::string path; + std::string path, attr1, attr2; ExtendedStat meta; char buf[20]; @@ -504,12 +507,20 @@ void VfsCatalog::deleteReplica(const Replica& replica) throw (DmException) meta = this->vfsExtendedStat(path, true, true, true); if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0) - vfsThrow(EACCES, "not enough permissions for '%s' to add replica '%s'", clientName.c_str(), path.c_str()); + vfsThrow(EACCES, "not enough permissions for '%s' to delete replica '%s'", clientName.c_str(), path.c_str()); snprintf(buf, sizeof buf, "%lu", replica.replicaid); - if (meta.hasField(std::string(VFS_XATTR "replica.") + buf)) { - vfsRemoveXattr(path, getLocalPath(path), std::string(VFS_XATTR "replica.") + buf, 0); - vfsRemoveXattr(path, getLocalPath(path), std::string(VFS_XATTR "repattrs.") + buf, 0); + attr1 = std::string(VFS_XATTR "replica.") + buf; + attr2 = std::string(VFS_XATTR "repattrs.") + buf; + if (meta.hasField(attr1)) { + Replica rep; + + // get RFN rather from the catalog + rep = replicaDeserialize(meta.getString(attr1), meta.getString(attr2, "")); + rfnDelete(path, rep.rfn); + + vfsRemoveXattr(path, getLocalPath(path), attr1, 0); + vfsRemoveXattr(path, getLocalPath(path), attr2, 0); } else { vfsThrow(DMLITE_NO_SUCH_REPLICA, "replica %lu of '%s' doesn't exist", replica.replicaid, path.c_str()); } @@ -608,6 +619,7 @@ std::string VfsCatalog::readLink(const std::string& path) throw (DmException) void VfsCatalog::unlink(const std::string& path) throw (DmException) { std::string parentPath, name, lpath; + ExtendedStat file; if (vfsCheckPermissions(path, S_IWRITE)) vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str()); @@ -621,16 +633,32 @@ void VfsCatalog::unlink(const std::string& path) throw (DmException) lpath = getLocalPath(path); + // not follow symlinks (remove them instead) + file = this->vfsSimpleStat(name, path, lpath, false, true, true); + // Sticky bit set ==> only directory or file owner can delete if ((parent.stat.st_mode & S_ISVTX) == S_ISVTX) { - // not follow symlinks (remove them instead) - ExtendedStat file = this->vfsSimpleStat(name, path, lpath, false, true, false); if (getUid(this->secCtx_) != file.stat.st_uid && getUid(this->secCtx_) != parent.stat.st_uid) { vfsThrow(EACCES, "not enough permissions for '%s' to unlink '%s' (sticky bit set)", clientName.c_str(), path.c_str()); } } + // delete replica entries + if (S_ISREG(file.stat.st_mode)) { + for (Extensible::const_iterator it = file.begin(); it != file.end(); it++) { + std::string key, value; + Replica rep; + + key = it->first; + if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) { + value = Extensible::anyToString(it->second); + rep = replicaDeserialize(value, ""); + rfnDelete(path, rep.rfn); + } + } + } + wrapCall(::unlink(lpath.c_str())); } @@ -1267,6 +1295,7 @@ void VfsCatalog::makeDir(const std::string& path, mode_t mode) throw (DmExceptio void VfsCatalog::rename(const std::string& oldPath, const std::string& newPath) throw (DmException) { std::string oldParentPath, newParentPath, oldName, newName; + ExtendedStat old; if (vfsCheckPermissions(oldPath, S_IWRITE) || vfsCheckPermissions(newPath, S_IWRITE)) @@ -1282,15 +1311,33 @@ void VfsCatalog::rename(const std::string& oldPath, const std::string& newPath) if (checkPermissions(this->secCtx_, newParent.acl, newParent.stat, S_IWRITE) != 0) vfsThrow(EACCES, "not enough permissions for '%s' on destination '%s'", clientName.c_str(), newPath.c_str()); + old = this->vfsExtendedStat(oldPath, true, true, true); + // Check sticky if (oldParent.stat.st_mode & S_ISVTX) { - ExtendedStat old = this->vfsExtendedStat(oldPath); if (getUid(this->secCtx_) != oldParent.stat.st_uid && getUid(this->secCtx_) != old.stat.st_uid && checkPermissions(this->secCtx_, old.acl, old.stat, S_IWRITE) != 0) vfsThrow(EACCES, "not enough permissions for '%s' (sticky bit set on parent '%s')", clientName.c_str(), oldParentPath.c_str()); } + // update replica entries + if (S_ISREG(old.stat.st_mode)) { + for (Extensible::const_iterator it = old.begin(); it != old.end(); it++) { + std::string key, value; + Replica rep; + + key = it->first; + if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) { + value = Extensible::anyToString(it->second); + rep = replicaDeserialize(value, ""); + rfnDelete(oldPath, rep.rfn); + rfnCreate(newPath, rep.rfn); + } + } + } + + // XXX: test-rename should probably accept both ENOTEMPTY and EEXIST instead // of changing it here? try { @@ -1383,7 +1430,7 @@ Replica VfsCatalog::getReplicaByRFN(const std::string& rfn) throw (DmException) void VfsCatalog::updateReplica(const Replica& replica) throw (DmException) { std::map::const_iterator it; - std::string path, attr1, attr2, attrval1,attrval2; + std::string path, lpath, attr1, attr2, attrval1,attrval2; ExtendedStat meta; char buf[20]; @@ -1394,6 +1441,7 @@ void VfsCatalog::updateReplica(const Replica& replica) throw (DmException) if (it == inodes.end()) vfsThrow(EINVAL, "fileid %lu not found in the catalog cache", replica.fileid); path = it->second; + lpath = getLocalPath(path); meta = this->vfsExtendedStat(path, true, true, true); if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0) @@ -1405,7 +1453,9 @@ void VfsCatalog::updateReplica(const Replica& replica) throw (DmException) vfsThrow(DMLITE_NO_SUCH_REPLICA, "replica %lu of '%s' doesn't exist", replica.replicaid, path.c_str()); attr2 = std::string(VFS_XATTR "repattrs.") + buf; - // check duplicity of RFN + // browse replicas: + // - check duplicity of RFN + // - remove previous replica entry for (Extensible::const_iterator it = meta.begin(); it != meta.end(); it++) { std::string key, value; Replica rep; @@ -1415,21 +1465,27 @@ void VfsCatalog::updateReplica(const Replica& replica) throw (DmException) value = Extensible::anyToString(it->second); if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) { repid = atoll(key.c_str() + VFS_XATTR_LENGTH + 8); - // skip the currently updated replica (RFN may be the same) - if (repid == replica.replicaid) continue; rep = replicaDeserialize(value, ""); - if (rep.rfn.compare(replica.rfn) == 0) { - vfsThrow(EEXIST, "RFN '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str()); + if (repid == replica.replicaid) { + // currently updated replica + // ==> remove its entry + rfnDelete(path, rep.rfn); + } else { + // check duplicity + if (rep.rfn.compare(replica.rfn) == 0) { + vfsThrow(EEXIST, "replica file name '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str()); + } } } } + rfnCreate(path, replica.rfn); replicaSerialize(replica, attrval1, attrval2); - vfsSetXattr(path, getLocalPath(path), attr1, attrval1, ATTR_REPLACE); + vfsSetXattr(path, lpath, attr1, attrval1, ATTR_REPLACE); if (attrval2.empty()) { - if (meta.hasField(attr2)) vfsRemoveXattr(path, getLocalPath(path), attr2, 0); + if (meta.hasField(attr2)) vfsRemoveXattr(path, lpath, attr2, 0); } else { - vfsSetXattr(path, getLocalPath(path), attr2, attrval2, 0); + vfsSetXattr(path, lpath, attr2, attrval2, 0); } debug("updated '%s' to '%s'", replica.rfn.c_str(), path.c_str()); } @@ -1791,6 +1847,7 @@ void VfsCatalog::replicaSerialize(const Replica& replica, std::string& attr1, st } + Replica VfsCatalog::replicaDeserialize(const std::string& attr1, const std::string& attr2) { Replica replica; int len, ret; @@ -1799,6 +1856,8 @@ Replica VfsCatalog::replicaDeserialize(const std::string& attr1, const std::stri const char *buf = attr1.c_str(); char *p; + replica = Replica(); + ret = sscanf(buf, "%c %c %lu %lu %lu %lu %200s %n", &status, &type, &replica.nbaccesses, &replica.atime, &replica.ptime, &replica.ltime, server, &len); if (ret != 7 && ret != 8) vfsThrowErrno("deserializing of '%s' failed", buf); @@ -1816,3 +1875,166 @@ Replica VfsCatalog::replicaDeserialize(const std::string& attr1, const std::stri return replica; } + + + +void VfsCatalog::rfnCreate(const std::string& path, const std::string& rfn) throw(DmException) { + Url url; + std::string curdir, file, lpath; + std::vector components; + struct stat fstat; + FILE *f; + + // replicas root directory + curdir = this->prefix_ + "/" + VFS_REPLICAS_TREE; + if (mkdir(curdir.c_str(), 0755) == 0) + debug("created replicas root directory '%s'", curdir.c_str()); + + // directory for each server (conflicts with proper names are OK and unlikely) + url = Url(rfn); + if (!url.domain.empty()) { + curdir += "/" + url.domain; + if (mkdir(curdir.c_str(), 0755) == 0) + debug("created replicas server directory '%s' for '%s'", curdir.c_str(), url.domain.c_str()); + } + + // RFN tree + components = Url::splitPath(url.path); + file = components.back(); + components.pop_back(); + if (components.size() >= 1 && components[0] != "/") components[0] = "/" + components[0]; + for (size_t i = 0; i < components.size(); i++) { + // the first component always '/', let's use it as separator + if (i < 2) curdir += components[i]; + else curdir = curdir + "/" + components[i]; + // ignore all errors - just go from "/" toward deeper levels + if (mkdir(curdir.c_str(), 0755) == 0) + debug("created directory '%s'", curdir.c_str()); + } + + lpath = curdir + "/" + file; + + // never rewrite - when updating, the old entry should be already removed + if (stat(lpath.c_str(), &fstat) == 0) + vfsThrow(EEXIST, "replica entry '%s' already exists", rfn.c_str()); + + // info about replica + if ((f = fopen(lpath.c_str(), "w")) == NULL) + wrapCall(f, "could not create replica entry '%s' for '%s'", rfn.c_str(), path.c_str()); + try { + wrapCall(fputs(path.c_str(), f), "could not write information about replica '%s' for '%s'", rfn.c_str(), path.c_str()); + } catch (DmException e) { + fclose(f); + throw; + } + wrapCall(fclose(f), "could not close replica entry for '%s'", rfn.c_str()); + debug("created entry '%s' -> '%s'", rfn.c_str(), path.c_str()); +} + + + +void VfsCatalog::rfnDelete(const std::string& path, const std::string& rfn) throw(DmException) { + Url url; + std::string lpath, curdir; + FILE *f; + std::vector components; + bool wasNonEmpty; + + // local path to replica entry + url = Url(rfn); + lpath = this->prefix_ + "/" + VFS_REPLICAS_TREE; + if (!url.domain.empty()) + lpath += "/" + url.domain; + if (!url.path.empty() && url.path[0] != '/') lpath += "/"; + lpath += url.path; + // RFN could be a directory ==> entry is just the file + if (lpath[lpath.size() - 1] == '/') + lpath.resize(lpath.size() - 1); + + // check if we can remove replica information + if ((f = fopen(lpath.c_str(), "r")) == NULL) { + // + // be robust: + // - when entry doesn't exist, log the problem + // - throw up only on other errors + // + if (errno == ENOENT || errno == ENOTDIR) { + log(LOG_ERR, "replica entry '%s' doesn't exist, deleting replica from '%s' anyway", rfn.c_str(), path.c_str()); + } else { + vfsThrowErrno("could not open replica entry '%s' for '%s'", rfn.c_str(), path.c_str()); + } + + return; + } + try { + fgets(this->buffer, sizeof this->buffer, f); + wrapCall(ferror(f), "error reading replica entry '%s' for '%s'", rfn.c_str(), path.c_str()); + } catch (DmException e) { + fclose(f); + throw; + } + wrapCall(fclose(f), "could not close replica entry for '%s'", rfn.c_str()); + + // we should not delete foreign entries + if (path.compare(this->buffer) != 0) + vfsThrow(EPERM, "can't delete replica entry '%s' for '%s', belongs to '%s'", rfn.c_str(), path.c_str(), this->buffer); + + if (::unlink(lpath.c_str()) == 0) + debug("deleted entry '%s' -> '%s'", rfn.c_str(), path.c_str()); + + // cleanup directories + try { + lpath = this->prefix_ + "/" + VFS_REPLICAS_TREE; + curdir = ""; + if (!url.domain.empty()) + curdir += "/" + url.domain; + if (!url.path.empty() && url.path[0] != '/') curdir += "/"; + components = Url::splitPath(curdir + url.path); + + wasNonEmpty = false; + do { + components.pop_back(); + lpath = this->prefix_ + "/" + VFS_REPLICAS_TREE + Url::joinPath(components); + + if (isEmptyDir(lpath)) { + wrapCall(rmdir(lpath.c_str()), "could not remove replica entries subdirectory '%s'", lpath.c_str()); + debug("deleted replica entry subdirectory '%s'", lpath.c_str()); + } else + wasNonEmpty = true; + } while (!wasNonEmpty && components.size() > 2); + } catch (DmException e) { + // not critical, but log a warning +#ifndef DEBUG + syslog(LOG_WARN, "%s", e.what()); +#endif + } +} + + + +/// +/// test emptiness of directory +/// +/// On errors, just log the problem and consider the directory non-empty. +/// +bool VfsCatalog::isEmptyDir(const std::string& lpath) throw (DmException) { + DIR *dir; + struct dirent entry, *rentry; + int ret; + + wrapCall((dir = opendir(lpath.c_str())), "could not open open replica entries subdirectory '%s'", lpath.c_str()); + + rentry = NULL; + while ((ret = readdir_r(dir, &entry, &rentry)) == 0 && rentry) { + if (strcmp(entry.d_name, ".") != 0 && strcmp(entry.d_name, "..") != 0) { + closedir(dir); + return false; + } + } + closedir(dir); + + if (ret) + vfsThrow(ret, "error reading replica entries subdirectory '%s'", lpath.c_str()); + + return true; +} diff --git a/src/VfsNs.h b/src/VfsNs.h index ea1ec72..7338211 100644 --- a/src/VfsNs.h +++ b/src/VfsNs.h @@ -126,6 +126,9 @@ namespace dmlite { void replicaSerialize(const Replica& replica, std::string& attr1, std::string& attr2); Replica replicaDeserialize(const std::string& attr1, const std::string& attr2); + void rfnCreate(const std::string& path, const std::string& rfn) throw(DmException); + void rfnDelete(const std::string& path, const std::string& rfn) throw(DmException); + bool isEmptyDir(const std::string& lpath) throw (DmException); StackInstance* si_; const SecurityContext* secCtx_;