#define VFS_XATTR_LENGTH 5
#define VFS_XATTR_TYPE_PERMS 1
#define VFS_XATTR_TYPE_NONPERMS 2
-#define VFS_XATTR_TYPE_USER 4
-#define VFS_XATTR_TYPE_ALL 7
+#define VFS_XATTR_TYPE_REPLICA 4
+#define VFS_XATTR_TYPE_USER 8
+#define VFS_XATTR_TYPE_ALL 15
#define IS_VFS_XATTR(NAME) (strncmp((NAME), VFS_XATTR, VFS_XATTR_LENGTH) == 0)
#define IS_VFS_FILE(NAME) (strncmp((NAME), VFS_FILE, VFS_FILE_LENGTH) == 0)
#define VFS_UID_NONE xStat.stat.st_uid
#define VFS_GID_NONE xStat.stat.st_gid
+#define INODE_CACHE_SIZE 256
+
+
using namespace dmlite;
///
-/// Update ExtendedStat fields related to permissions.
+/// Update ExtendedStat fields related to permissions or replicas.
/// Requires user extended attributes retrieved with VFS_XATTR_TYPE_PERMS or
/// VFS_XATTR_TYPE_ALL.
///
/// attributes).
///
void VfsCatalog::vfsUpdateStat(ExtendedStat &xStat, Extensible xattrs) throw (DmException) {
- std::string modeStr;
+ std::string modeStr, key;
mode_t mode;
char *parse;
+ Extensible::const_iterator it;
if (xattrs.hasField(VFS_XATTR "acl"))
xStat.acl = Acl(xattrs.getString(VFS_XATTR "acl"));
xStat.stat.st_mode = mode;
}
}
+
+ // copy replicas
+ for (it = xattrs.begin(); it != xattrs.end(); it++) {
+ key = it->first;
+ if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0)
+ xStat[key] = xattrs[key];
+ }
}
/// @param follow follow symlinks (to use stat() or lstat())
/// @param perms get information related to permissions
//
-ExtendedStat VfsCatalog::vfsSimpleStat(const std::string& name, const std::string& path, const std::string& lpath, bool follow, bool perms) throw (DmException) {
+ExtendedStat VfsCatalog::vfsSimpleStat(const std::string& name, const std::string& path, const std::string& lpath, bool follow, bool perms, bool replicas) throw (DmException) {
ExtendedStat xStat;
struct stat fstat;
Extensible xattrs;
xStat.status = ExtendedStat::kOnline;
xStat.acl = Acl();
- if (perms) {
+ if (perms || replicas) {
try {
- xattrs = vfsGetXattrs(path, lpath, true, VFS_XATTR_TYPE_PERMS);
+ xattrs = vfsGetXattrs(path, lpath, true, (perms ? VFS_XATTR_TYPE_PERMS : 0) | (replicas ? VFS_XATTR_TYPE_REPLICA : 0));
} catch (DmException& e) {
// ignore - default owner and no ACLs
}
/// @param follow follow symlinks
/// @param parms get information related to permissions
///
-ExtendedStat VfsCatalog::vfsExtendedStat(const std::string& path, bool follow, bool perms) throw (DmException) {
+ExtendedStat VfsCatalog::vfsExtendedStat(const std::string& path, bool follow, bool perms, bool replicas) throw (DmException) {
ExtendedStat meta;
std::vector<std::string> components;
std::string dir;
if (i < 2) dir += components[i];
else dir = dir + "/" + components[i];
- meta = vfsSimpleStat(components[i], dir, getLocalPath(dir), true, true);
+ meta = vfsSimpleStat(components[i], dir, getLocalPath(dir), true, true, false);
if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IEXEC) != 0)
vfsThrow(EACCES, "not enough permissions for '%s' to list '%s'", clientName.c_str(), meta.name.c_str());
}
- return vfsSimpleStat(components.back(), path, getLocalPath(path), follow, perms);
+ return vfsSimpleStat(components.back(), path, getLocalPath(path), follow, perms, replicas);
}
if (vfsCheckPermissions(path, S_IREAD))
vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
- meta = vfsExtendedStat(path, follow, false);
+ meta = vfsExtendedStat(path, follow, false, false);
//
// User extended attributes are not supported on symlinks. We need the same
meta.stat.st_nlink = count;
}
#endif
-
+
+ vfsUpdateInode(meta, path);
+
return meta;
}
bool VfsCatalog::accessReplica(const std::string& replica, int mode) throw (DmException)
{
- return this->access(replica, mode);
+ vfsThrow(ENOSYS, "searching by replica file names not supported");
+ return false;
}
void VfsCatalog::addReplica(const Replica& replica) throw (DmException)
{
- vfsThrow(EACCES, "write mode not supported");
+ std::map<ino_t,std::string>::const_iterator it;
+ std::string path, attr;
+ ExtendedStat meta;
+ Extensible xattrs;
+ int64_t replicaid;
+ char buf[20];
+
+ if (vfsCheckPermissions("", S_IWRITE))
+ vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
+
+ it = inodes.find(replica.fileid);
+ if (it == inodes.end())
+ vfsThrow(EINVAL, "fileid %lu not found in the catalog cache", replica.fileid);
+ path = it->second;
+
+ meta = this->vfsExtendedStat(path, true, true, true);
+ if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
+ vfsThrow(EACCES, "not enough permissions for '%s' to add replica '%s'", clientName.c_str(), path.c_str());
+
+ replicaid = 0;
+ do {
+ replicaid++;
+ snprintf(buf, sizeof buf, "%ld", replicaid);
+ attr = std::string(VFS_XATTR "replica.") + buf;
+ } while (meta.hasField(attr));
+
+ vfsSetXattr(path, getLocalPath(path), attr, replica.rfn, ATTR_CREATE);
+ debug("added '%s' to '%s'", replica.rfn.c_str(), path.c_str());
}
void VfsCatalog::deleteReplica(const Replica& replica) throw (DmException)
{
- vfsThrow(EACCES, "write mode not supported");
+ std::map<ino_t,std::string>::const_iterator it;
+ std::string path;
+ ExtendedStat meta;
+ char buf[VFS_XATTR_LENGTH + 8 + 20 + 1];
+
+ if (vfsCheckPermissions("", S_IWRITE))
+ vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
+
+ it = inodes.find(replica.fileid);
+ if (it == inodes.end())
+ vfsThrow(EINVAL, "fileid %lu not found in the catalog cache", replica.fileid);
+ path = it->second;
+
+ meta = this->vfsExtendedStat(path, true, true, true);
+ if (checkPermissions(this->secCtx_, meta.acl, meta.stat, S_IWRITE) != 0)
+ vfsThrow(EACCES, "not enough permissions for '%s' to add replica '%s'", clientName.c_str(), path.c_str());
+
+ snprintf(buf, sizeof buf, VFS_XATTR "replica." "%ld", replica.replicaid);
+ if (meta.hasField(buf)) {
+ vfsRemoveXattr(path, getLocalPath(path), buf, 0);
+ } else {
+ vfsThrow(DMLITE_NO_SUCH_REPLICA, "replica %lu of '%s' doesn't exist", replica.replicaid, path.c_str());
+ }
}
{
Replica replica;
ExtendedStat xStat;
+ std::vector<Replica> repList;
+ Extensible::const_iterator it;
+ std::string key, value;
+ long int replicaid;
+ Url url;
+ char buf[20];
if (vfsCheckPermissions(path, S_IREAD))
vfsThrow(EACCES, "not enough permissions for '%s'", clientName.c_str());
- xStat = this->vfsExtendedStat(path, true, true);
+ xStat = this->vfsExtendedStat(path, true, true, true);
if (checkPermissions(this->secCtx_, xStat.acl, xStat.stat, S_IREAD) != 0)
vfsThrow(EACCES, "not enough permissions for '%s' to read '%s'", clientName.c_str(), path.c_str());
if (S_ISDIR(xStat.stat.st_mode))
vfsThrow(EISDIR, "directories do not have replicas");
-
- replica.replicaid = 0;
- replica.atime = xStat.stat.st_atime;
- replica.fileid = xStat.stat.st_ino;
- replica.nbaccesses = 0;
- replica.ptime = 0;
- replica.ltime = 0;
- replica.type = Replica::kPermanent;
- replica.status = Replica::kAvailable;
- replica.server = this->hostName_;
- replica["pool"] = std::string("vfs");
-
- std::string wd = this->getWorkingDir();
- if (wd.empty() || path[0] == '/')
- replica.rfn = path;
- else
- replica.rfn = wd + "/" + path;
- return std::vector<Replica>(1, replica);
+ for (it = xStat.begin(); it != xStat.end(); it++) {
+ key = it->first;
+ if (key.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") == 0) {
+ value = xStat.getString(key);
+
+ if (sscanf(key.c_str(), VFS_XATTR "replica.%ld", &replicaid) != 1)
+ vfsThrow(EINVAL, "can't parse replica attribute name '%s'", key.c_str());
+
+ url = Url(value);
+ if (url.port) snprintf(buf, sizeof buf, "%u", url.port);
+ else buf[0] = '\0';
+
+ replica = Replica();
+
+ replica.replicaid = replicaid;
+ replica.atime = xStat.stat.st_atime;
+ replica.fileid = xStat.stat.st_ino;
+ replica.nbaccesses = 0;
+ replica.ptime = 0;
+ replica.ltime = 0;
+ replica.type = Replica::kPermanent;
+ replica.status = Replica::kAvailable;
+ replica.server = url.domain + (buf[0] ? std::string(":") + buf : "");
+ replica.rfn = value;
+ replica["pool"] = std::string("vfs");
+
+ repList.push_back(replica);
+ }
+ }
+
+ return repList;
}
// Sticky bit set ==> only directory or file owner can delete
if ((parent.stat.st_mode & S_ISVTX) == S_ISVTX) {
// not follow symlinks (remove them instead)
- ExtendedStat file = this->vfsSimpleStat(name, path, lpath, false, true);
+ ExtendedStat file = this->vfsSimpleStat(name, path, lpath, false, true, false);
if (getUid(this->secCtx_) != file.stat.st_uid &&
getUid(this->secCtx_) != parent.stat.st_uid) {
vfsThrow(EACCES, "not enough permissions for '%s' to unlink '%s' (sticky bit set)", clientName.c_str(), path.c_str());
try {
// follow symlinks
- file = this->vfsSimpleStat(name, path, lpath, true, true);
+ file = this->vfsSimpleStat(name, path, lpath, true, true, true);
} catch (DmException e) {
code = DMLITE_ERRNO(e.code());
if (code != ENOENT) throw;
if (!newAcl.empty())
vfsSetAcl(path, lpath, newAcl);
} else {
+ Extensible::const_iterator it;
+
// Truncate
if (S_ISDIR(file.stat.st_mode))
vfsThrow(EISDIR, "'%s' is directory, can not truncate", path.c_str());
if (getUid(this->secCtx_) != file.stat.st_uid &&
checkPermissions(this->secCtx_, file.acl, file.stat, S_IWRITE) != 0) {
vfsThrow(EACCES, "not enough permissions for '%s' to truncate '%s'", clientName.c_str(), path.c_str());
- // TODO: check replicas
}
+ // But check replicas first
+ for (it = file.begin(); it != file.end() && it->first.compare(0, VFS_XATTR_LENGTH + 8, VFS_XATTR "replica.") != 0; it++);
+ if (it != file.end())
+ vfsThrow(EACCES, "can't truncate '%s' with replicas", path.c_str());
+ // FIXME: setSize()
wrapCall(truncate(lpath.c_str(), 0));
}
}
}
vfsUpdateExtendedStat(privateDir->stat, xattrs);
+ vfsUpdateInode(privateDir->stat, privateDir->path + '/' + ent->d_name);
+
return &(privateDir->stat);
}
strcmp(entry->a_name + VFS_XATTR_LENGTH, "owner") == 0)
{
attrType = VFS_XATTR_TYPE_PERMS;
+ } else if (strncmp(entry->a_name + VFS_XATTR_LENGTH, "replica", 7) == 0) {
+ attrType = VFS_XATTR_TYPE_REPLICA;
} else {
attrType = VFS_XATTR_TYPE_NONPERMS;
}
else
vfsSetXattr(path, lpath, VFS_XATTR "acl", acl.serialize(), 0);
}
+
+
+
+void VfsCatalog::vfsUpdateInode(const ExtendedStat& meta, const std::string& path) {
+ std::string abspath;
+
+ // update inode cache
+ if (S_ISREG(meta.stat.st_mode)) {
+ // absolutize
+ if (path[0] != '/') abspath = this->getWorkingDir() + "/" + path;
+ else abspath = path;
+ // remove least recently used item if needed
+ if (inodes.size() >= INODE_CACHE_SIZE) {
+ inodes.erase(inodes_lru.front());
+ inodes_lru.pop_front();
+ }
+ // add to inode cache
+ inodes[meta.stat.st_ino] = abspath;
+ inodes_lru.remove(meta.stat.st_ino);
+ inodes_lru.push_back(meta.stat.st_ino);
+ //debug("inode %lu: %s", meta.stat.st_ino, abspath.c_str());
+ }
+}