void VfsCatalog::addReplica(const Replica& replica) throw (DmException)
{
std::map<ino_t,std::string>::const_iterator it;
- std::string path, attr1, attr2, attrval1, attrval2;
+ std::string path, lpath, attr1, attr2, attrval1, attrval2;
ExtendedStat meta;
Extensible xattrs;
int64_t replicaid;
if (it == inodes.end())
vfsThrow(EINVAL, "fileid %lu not found in the catalog cache", replica.fileid);
path = it->second;
+ lpath = getLocalPath(path);
meta = this->vfsExtendedStat(path, true, true, true);
if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
rep = replicaDeserialize(value, "");
if (rep.rfn.compare(replica.rfn) == 0) {
- vfsThrow(EEXIST, "RFN '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str());
+ vfsThrow(EEXIST, "replica file name '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str());
}
}
}
+ rfnCreate(path, replica.rfn);
replicaSerialize(replica, attrval1, attrval2);
- vfsSetXattr(path, getLocalPath(path), attr1, attrval1, ATTR_CREATE);
+ vfsSetXattr(path, lpath, attr1, attrval1, ATTR_CREATE);
if (attrval2.empty()) {
- if (meta.hasField(attr2)) vfsRemoveXattr(path, getLocalPath(path), attr2, 0);
+ if (meta.hasField(attr2)) vfsRemoveXattr(path, lpath, attr2, 0);
} else {
- vfsSetXattr(path, getLocalPath(path), attr2, attrval2, 0);
+ vfsSetXattr(path, lpath, attr2, attrval2, 0);
}
+
debug("added '%s' to '%s'", replica.rfn.c_str(), path.c_str());
}
void VfsCatalog::deleteReplica(const Replica& replica) throw (DmException)
{
std::map<ino_t,std::string>::const_iterator it;
- std::string path;
+ std::string path, attr1, attr2;
ExtendedStat meta;
char buf[20];
meta = this->vfsExtendedStat(path, true, true, true);
if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
- vfsThrow(EACCES, "not enough permissions for '%s' to add replica '%s'", clientName.c_str(), path.c_str());
+ vfsThrow(EACCES, "not enough permissions for '%s' to delete replica '%s'", clientName.c_str(), path.c_str());
snprintf(buf, sizeof buf, "%lu", replica.replicaid);
- if (meta.hasField(std::string(VFS_XATTR "replica.") + buf)) {
- vfsRemoveXattr(path, getLocalPath(path), std::string(VFS_XATTR "replica.") + buf, 0);
- vfsRemoveXattr(path, getLocalPath(path), std::string(VFS_XATTR "repattrs.") + buf, 0);
+ attr1 = std::string(VFS_XATTR "replica.") + buf;
+ attr2 = std::string(VFS_XATTR "repattrs.") + buf;
+ if (meta.hasField(attr1)) {
+ Replica rep;
+
+ // get RFN rather from the catalog
+ rep = replicaDeserialize(meta.getString(attr1), meta.getString(attr2, ""));
+ rfnDelete(path, rep.rfn);
+
+ vfsRemoveXattr(path, getLocalPath(path), attr1, 0);
+ vfsRemoveXattr(path, getLocalPath(path), attr2, 0);
} else {
vfsThrow(DMLITE_NO_SUCH_REPLICA, "replica %lu of '%s' doesn't exist", replica.replicaid, path.c_str());
}
void VfsCatalog::unlink(const std::string& path) throw (DmException)
{
std::string parentPath, name, lpath;
+ ExtendedStat file;
if (vfsCheckPermissions(path, S_IWRITE))
vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
lpath = getLocalPath(path);
+ // not follow symlinks (remove them instead)
+ file = this->vfsSimpleStat(name, path, lpath, false, true, true);
+
// Sticky bit set ==> only directory or file owner can delete
if ((parent.stat.st_mode & S_ISVTX) == S_ISVTX) {
- // not follow symlinks (remove them instead)
- ExtendedStat file = this->vfsSimpleStat(name, path, lpath, false, true, false);
if (getUid(this->secCtx_) != file.stat.st_uid &&
getUid(this->secCtx_) != parent.stat.st_uid) {
vfsThrow(EACCES, "not enough permissions for '%s' to unlink '%s' (sticky bit set)", clientName.c_str(), path.c_str());
}
}
+ // delete replica entries
+ if (S_ISREG(file.stat.st_mode)) {
+ for (Extensible::const_iterator it = file.begin(); it != file.end(); it++) {
+ std::string key, value;
+ Replica rep;
+
+ key = it->first;
+ if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
+ value = Extensible::anyToString(it->second);
+ rep = replicaDeserialize(value, "");
+ rfnDelete(path, rep.rfn);
+ }
+ }
+ }
+
wrapCall(::unlink(lpath.c_str()));
}
void VfsCatalog::rename(const std::string& oldPath, const std::string& newPath) throw (DmException)
{
std::string oldParentPath, newParentPath, oldName, newName;
+ ExtendedStat old;
if (vfsCheckPermissions(oldPath, S_IWRITE) ||
vfsCheckPermissions(newPath, S_IWRITE))
if (checkPermissions(this->secCtx_, newParent.acl, newParent.stat, S_IWRITE) != 0)
vfsThrow(EACCES, "not enough permissions for '%s' on destination '%s'", clientName.c_str(), newPath.c_str());
+ old = this->vfsExtendedStat(oldPath, true, true, true);
+
// Check sticky
if (oldParent.stat.st_mode & S_ISVTX) {
- ExtendedStat old = this->vfsExtendedStat(oldPath);
if (getUid(this->secCtx_) != oldParent.stat.st_uid &&
getUid(this->secCtx_) != old.stat.st_uid &&
checkPermissions(this->secCtx_, old.acl, old.stat, S_IWRITE) != 0)
vfsThrow(EACCES, "not enough permissions for '%s' (sticky bit set on parent '%s')", clientName.c_str(), oldParentPath.c_str());
}
+ // update replica entries
+ if (S_ISREG(old.stat.st_mode)) {
+ for (Extensible::const_iterator it = old.begin(); it != old.end(); it++) {
+ std::string key, value;
+ Replica rep;
+
+ key = it->first;
+ if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
+ value = Extensible::anyToString(it->second);
+ rep = replicaDeserialize(value, "");
+ rfnDelete(oldPath, rep.rfn);
+ rfnCreate(newPath, rep.rfn);
+ }
+ }
+ }
+
+
// XXX: test-rename should probably accept both ENOTEMPTY and EEXIST instead
// of changing it here?
try {
void VfsCatalog::updateReplica(const Replica& replica) throw (DmException)
{
std::map<ino_t,std::string>::const_iterator it;
- std::string path, attr1, attr2, attrval1,attrval2;
+ std::string path, lpath, attr1, attr2, attrval1,attrval2;
ExtendedStat meta;
char buf[20];
if (it == inodes.end())
vfsThrow(EINVAL, "fileid %lu not found in the catalog cache", replica.fileid);
path = it->second;
+ lpath = getLocalPath(path);
meta = this->vfsExtendedStat(path, true, true, true);
if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
vfsThrow(DMLITE_NO_SUCH_REPLICA, "replica %lu of '%s' doesn't exist", replica.replicaid, path.c_str());
attr2 = std::string(VFS_XATTR "repattrs.") + buf;
- // check duplicity of RFN
+ // browse replicas:
+ // - check duplicity of RFN
+ // - remove previous replica entry
for (Extensible::const_iterator it = meta.begin(); it != meta.end(); it++) {
std::string key, value;
Replica rep;
value = Extensible::anyToString(it->second);
if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
repid = atoll(key.c_str() + VFS_XATTR_LENGTH + 8);
- // skip the currently updated replica (RFN may be the same)
- if (repid == replica.replicaid) continue;
rep = replicaDeserialize(value, "");
- if (rep.rfn.compare(replica.rfn) == 0) {
- vfsThrow(EEXIST, "RFN '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str());
+ if (repid == replica.replicaid) {
+ // currently updated replica
+ // ==> remove its entry
+ rfnDelete(path, rep.rfn);
+ } else {
+ // check duplicity
+ if (rep.rfn.compare(replica.rfn) == 0) {
+ vfsThrow(EEXIST, "replica file name '%s' already exists on '%s'", replica.rfn.c_str(), path.c_str());
+ }
}
}
}
+ rfnCreate(path, replica.rfn);
replicaSerialize(replica, attrval1, attrval2);
- vfsSetXattr(path, getLocalPath(path), attr1, attrval1, ATTR_REPLACE);
+ vfsSetXattr(path, lpath, attr1, attrval1, ATTR_REPLACE);
if (attrval2.empty()) {
- if (meta.hasField(attr2)) vfsRemoveXattr(path, getLocalPath(path), attr2, 0);
+ if (meta.hasField(attr2)) vfsRemoveXattr(path, lpath, attr2, 0);
} else {
- vfsSetXattr(path, getLocalPath(path), attr2, attrval2, 0);
+ vfsSetXattr(path, lpath, attr2, attrval2, 0);
}
debug("updated '%s' to '%s'", replica.rfn.c_str(), path.c_str());
}
}
+
Replica VfsCatalog::replicaDeserialize(const std::string& attr1, const std::string& attr2) {
Replica replica;
int len, ret;
const char *buf = attr1.c_str();
char *p;
+ replica = Replica();
+
ret = sscanf(buf, "%c %c %lu %lu %lu %lu %200s %n", &status, &type, &replica.nbaccesses, &replica.atime, &replica.ptime, &replica.ltime, server, &len);
if (ret != 7 && ret != 8)
vfsThrowErrno("deserializing of '%s' failed", buf);
return replica;
}
+
+
+
+void VfsCatalog::rfnCreate(const std::string& path, const std::string& rfn) throw(DmException) {
+ Url url;
+ std::string curdir, file, lpath;
+ std::vector<std::string> components;
+ struct stat fstat;
+ FILE *f;
+
+ // replicas root directory
+ curdir = this->prefix_ + "/" + VFS_REPLICAS_TREE;
+ if (mkdir(curdir.c_str(), 0755) == 0)
+ debug("created replicas root directory '%s'", curdir.c_str());
+
+ // directory for each server (conflicts with proper names are OK and unlikely)
+ url = Url(rfn);
+ if (!url.domain.empty()) {
+ curdir += "/" + url.domain;
+ if (mkdir(curdir.c_str(), 0755) == 0)
+ debug("created replicas server directory '%s' for '%s'", curdir.c_str(), url.domain.c_str());
+ }
+
+ // RFN tree
+ components = Url::splitPath(url.path);
+ file = components.back();
+ components.pop_back();
+ if (components.size() >= 1 && components[0] != "/") components[0] = "/" + components[0];
+ for (size_t i = 0; i < components.size(); i++) {
+ // the first component always '/', let's use it as separator
+ if (i < 2) curdir += components[i];
+ else curdir = curdir + "/" + components[i];
+ // ignore all errors - just go from "/" toward deeper levels
+ if (mkdir(curdir.c_str(), 0755) == 0)
+ debug("created directory '%s'", curdir.c_str());
+ }
+
+ lpath = curdir + "/" + file;
+
+ // never rewrite - when updating, the old entry should be already removed
+ if (stat(lpath.c_str(), &fstat) == 0)
+ vfsThrow(EEXIST, "replica entry '%s' already exists", rfn.c_str());
+
+ // info about replica
+ if ((f = fopen(lpath.c_str(), "w")) == NULL)
+ wrapCall(f, "could not create replica entry '%s' for '%s'", rfn.c_str(), path.c_str());
+ try {
+ wrapCall(fputs(path.c_str(), f), "could not write information about replica '%s' for '%s'", rfn.c_str(), path.c_str());
+ } catch (DmException e) {
+ fclose(f);
+ throw;
+ }
+ wrapCall(fclose(f), "could not close replica entry for '%s'", rfn.c_str());
+ debug("created entry '%s' -> '%s'", rfn.c_str(), path.c_str());
+}
+
+
+
+void VfsCatalog::rfnDelete(const std::string& path, const std::string& rfn) throw(DmException) {
+ Url url;
+ std::string lpath, curdir;
+ FILE *f;
+ std::vector<std::string> components;
+ bool wasNonEmpty;
+
+ // local path to replica entry
+ url = Url(rfn);
+ lpath = this->prefix_ + "/" + VFS_REPLICAS_TREE;
+ if (!url.domain.empty())
+ lpath += "/" + url.domain;
+ if (!url.path.empty() && url.path[0] != '/') lpath += "/";
+ lpath += url.path;
+ // RFN could be a directory ==> entry is just the file
+ if (lpath[lpath.size() - 1] == '/')
+ lpath.resize(lpath.size() - 1);
+
+ // check if we can remove replica information
+ if ((f = fopen(lpath.c_str(), "r")) == NULL) {
+ //
+ // be robust:
+ // - when entry doesn't exist, log the problem
+ // - throw up only on other errors
+ //
+ if (errno == ENOENT || errno == ENOTDIR) {
+ log(LOG_ERR, "replica entry '%s' doesn't exist, deleting replica from '%s' anyway", rfn.c_str(), path.c_str());
+ } else {
+ vfsThrowErrno("could not open replica entry '%s' for '%s'", rfn.c_str(), path.c_str());
+ }
+
+ return;
+ }
+ try {
+ fgets(this->buffer, sizeof this->buffer, f);
+ wrapCall(ferror(f), "error reading replica entry '%s' for '%s'", rfn.c_str(), path.c_str());
+ } catch (DmException e) {
+ fclose(f);
+ throw;
+ }
+ wrapCall(fclose(f), "could not close replica entry for '%s'", rfn.c_str());
+
+ // we should not delete foreign entries
+ if (path.compare(this->buffer) != 0)
+ vfsThrow(EPERM, "can't delete replica entry '%s' for '%s', belongs to '%s'", rfn.c_str(), path.c_str(), this->buffer);
+
+ if (::unlink(lpath.c_str()) == 0)
+ debug("deleted entry '%s' -> '%s'", rfn.c_str(), path.c_str());
+
+ // cleanup directories
+ try {
+ lpath = this->prefix_ + "/" + VFS_REPLICAS_TREE;
+ curdir = "";
+ if (!url.domain.empty())
+ curdir += "/" + url.domain;
+ if (!url.path.empty() && url.path[0] != '/') curdir += "/";
+ components = Url::splitPath(curdir + url.path);
+
+ wasNonEmpty = false;
+ do {
+ components.pop_back();
+ lpath = this->prefix_ + "/" + VFS_REPLICAS_TREE + Url::joinPath(components);
+
+ if (isEmptyDir(lpath)) {
+ wrapCall(rmdir(lpath.c_str()), "could not remove replica entries subdirectory '%s'", lpath.c_str());
+ debug("deleted replica entry subdirectory '%s'", lpath.c_str());
+ } else
+ wasNonEmpty = true;
+ } while (!wasNonEmpty && components.size() > 2);
+ } catch (DmException e) {
+ // not critical, but log a warning
+#ifndef DEBUG
+ syslog(LOG_WARN, "%s", e.what());
+#endif
+ }
+}
+
+
+
+///
+/// test emptiness of directory
+///
+/// On errors, just log the problem and consider the directory non-empty.
+///
+bool VfsCatalog::isEmptyDir(const std::string& lpath) throw (DmException) {
+ DIR *dir;
+ struct dirent entry, *rentry;
+ int ret;
+
+ wrapCall((dir = opendir(lpath.c_str())), "could not open open replica entries subdirectory '%s'", lpath.c_str());
+
+ rentry = NULL;
+ while ((ret = readdir_r(dir, &entry, &rentry)) == 0 && rentry) {
+ if (strcmp(entry.d_name, ".") != 0 && strcmp(entry.d_name, "..") != 0) {
+ closedir(dir);
+ return false;
+ }
+ }
+ closedir(dir);
+
+ if (ret)
+ vfsThrow(ret, "error reading replica entries subdirectory '%s'", lpath.c_str());
+
+ return true;
+}