From bfe633b931be16f1a537272d7d1cdba9ed6b47a6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Zden=C4=9Bk=20Salvet?= Date: Fri, 22 Oct 2004 06:52:49 +0000 Subject: [PATCH] Finish the implementation of the FTP backend. --- org.glite.jp.primary/src/backend.h | 3 +- org.glite.jp.primary/src/ftp_backend.c | 1562 +++++++++++++++++++++++++++++++- org.glite.jp.primary/src/tags.c | 171 ++++ 3 files changed, 1716 insertions(+), 20 deletions(-) diff --git a/org.glite.jp.primary/src/backend.h b/org.glite.jp.primary/src/backend.h index 8dde271..bb143e9 100644 --- a/org.glite.jp.primary/src/backend.h +++ b/org.glite.jp.primary/src/backend.h @@ -66,7 +66,8 @@ int glite_jppsbe_pread( void *handle, void *buf, size_t nbytes, - off_t offset + off_t offset, + ssize_t *nbytes_ret ); int glite_jppsbe_pwrite( diff --git a/org.glite.jp.primary/src/ftp_backend.c b/org.glite.jp.primary/src/ftp_backend.c index d076125..f25c514 100644 --- a/org.glite.jp.primary/src/ftp_backend.c +++ b/org.glite.jp.primary/src/ftp_backend.c @@ -1,20 +1,209 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include "glite/jp/types.h" #include "glite/jp/context.h" +#include "glite/jp/strmd5.h" +#include "tags.h" #include "backend.h" +#define UPLOAD_SUFFIX ".upload" +#define LOCK_SUFFIX ".lock" + +struct ftpbe_config { + char *internal_path; + char *external_path; + char *gridmap; + char *logname; +}; + +static struct ftpbe_config *config = NULL; + +struct fhandle_rec { + int fd; + int fd_append; +}; +typedef struct fhandle_rec *fhandle; + +static struct option ftpbe_opts[] = { + { "ftp-internal-path", 1, NULL, 'I' }, + { "ftp-external-path", 1, NULL, 'E' }, + { "ftp-gridmap", 1, NULL, 'G' }, + { NULL, 0, NULL, 0 } +}; + +static struct { + glite_jp_fileclass_t type; + char * fname; + } class_to_fname_tab[] = { + { GLITE_JP_FILECLASS_INPUT, "input" }, + { GLITE_JP_FILECLASS_OUTPUT, "output" }, + { GLITE_JP_FILECLASS_LBLOG, "lblog" }, + { GLITE_JP_FILECLASS_TAGS, "tags" }, + { GLITE_JP_FILECLASS_UNDEF, NULL } + }; + +static char *class_to_fname(glite_jp_fileclass_t type) +{ + int i; + + for (i = 0; class_to_fname_tab[i].type != GLITE_JP_FILECLASS_UNDEF; i++) + if (type == class_to_fname_tab[i].type) + return class_to_fname_tab[i].fname; + + return NULL; +} + +static glite_jp_fileclass_t fname_to_class(char* fname) +{ + int i; + + for (i = 0; class_to_fname_tab[i].type != GLITE_JP_FILECLASS_UNDEF; i++) + if (!strcmp(fname, class_to_fname_tab[i].fname)) + return class_to_fname_tab[i].type; + + return GLITE_JP_FILECLASS_UNDEF; +} + +static int config_check( + glite_jp_context_t ctx, + struct ftpbe_config *config) +{ + return config == NULL || + config->internal_path == NULL || + config->external_path == NULL || + config->gridmap == NULL || + config->logname == NULL; + + /* XXX check reality */ +} + +static int jobid_unique_pathname(glite_jp_context_t ctx, const char *job, + char **unique, char **ju_path, int get_path) +{ + char *p; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + p = strrchr(job, '/'); + if (!p) { + err.code = EINVAL; + err.desc = "Malformed jobid"; + return glite_jp_stack_error(ctx,&err); + } + /* XXX thorough checks */ + if (!(*unique = strdup(p+1))) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + if (get_path) { + if (!(*ju_path = strdup(p+1))) { + free(*unique); + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + *(*ju_path + 10) = '\0'; + } + return 0; +} + +static int mkdirpath(const char* path, int prefixlen) +{ + char *wpath, *p; + int goout, ret; + + wpath = strdup(path); + if (!wpath) { + errno = ENOMEM; + return -1; + } + + p = wpath + prefixlen; + goout = 0; + while (!goout) { + while (*p == '/') p++; + while (*p != '/' && *p != '\0') p++; + goout = (*p == '\0'); + *p = '\0'; + ret = mkdir(wpath, S_IRUSR | S_IWUSR | S_IXUSR); + if (ret < 0 && errno != EEXIST) break; + *p = '/'; + } + free(wpath); + return ret; +} + +static long regtime_trunc(long tv_sec) +{ + return tv_sec / (86400*7); +} + +static long regtime_ceil(long tv_sec) +{ + return (tv_sec % (86400*7)) ? tv_sec/(86400*7)+1 : tv_sec/(86400*7) ; +} + +/********************************************************************************/ int glite_jppsbe_init( glite_jp_context_t ctx, int *argc, char *argv[] ) { + glite_jp_error_t err; + int opt; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + config = (struct ftpbe_config *) calloc(1, sizeof *config); + if (!config) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + + config->logname = getlogin(); + + while ((opt = getopt_long(*argc, argv, "I:E:G:", ftpbe_opts, NULL)) != EOF) { + switch (opt) { + case 'I': config->internal_path = optarg; break; + case 'E': config->external_path = optarg; break; + case 'G': config->gridmap = optarg; break; + default: break; + } + } + + if (config_check(ctx, config)) { + err.code = EINVAL; + err.desc = "Invalid FTP backend configuration"; + return glite_jp_stack_error(ctx,&err); + } + + return 0; } int glite_jppsbe_init_slave( glite_jp_context_t ctx ) { + /* Nothing to do */ } int glite_jppsbe_register_job( @@ -23,8 +212,152 @@ int glite_jppsbe_register_job( const char *owner ) { + glite_jp_error_t err; + char *int_dir = NULL; + char *int_fname = NULL; + char *data_dir = NULL; + char *data_fname = NULL; + char *ju = NULL; + char *ju_path = NULL; + char *ownerhash = NULL; + FILE *regfile = NULL; + struct timeval reg_tv; + long reg_tv_trunc; + glite_jp_clear_error(ctx); - puts(__FUNCTION__); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job != NULL); + assert(owner != NULL); + + gettimeofday(®_tv, NULL); + reg_tv_trunc = regtime_trunc(reg_tv.tv_sec); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&int_dir, "%s/regs/%s", + config->internal_path, ju_path) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + + if (mkdirpath(int_dir, strlen(config->internal_path)) < 0) { + free(int_dir); + err.code = errno; + err.desc = "Cannot mkdir jobs's reg directory"; + return glite_jp_stack_error(ctx,&err); + } + free(int_dir); + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + free(int_dir); + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + + regfile = fopen(int_fname, "w"); + if (regfile == NULL) { + err.code = errno; + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + + ownerhash = str2md5(owner); /* static buffer */ + + if (fprintf(regfile, "%d %ld.%06ld %s %s %d %s\n", 1, + (long)reg_tv.tv_sec, (long)reg_tv.tv_usec, job, + ownerhash, strlen(owner), owner) < 1 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot write jobs's reg info file"; + goto error_out; + } + if (fclose(regfile) != 0 ) { + err.code = errno; + err.desc = "Cannot close(write) jobs's reg info file"; + goto error_out; + } + + if (asprintf(&data_dir, "%s/data/%s/%d/%s", + config->internal_path, ownerhash, regtime_trunc(reg_tv.tv_sec), ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + if (asprintf(&data_fname, "%s/_info", data_dir) == -1) { + err.code = ENOMEM; + goto error_out; + } + if (mkdirpath(data_dir, strlen(config->internal_path)) < 0) { + err.code = errno; + err.desc = "Cannot mkdir jobs's data directory"; + goto error_out; + } + + if (link(int_fname, data_fname) < 0) { + err.code = errno; + err.desc = "Cannot link job's reg and data info files"; + goto error_out; + } + +error_out: + free(int_dir); + if (err.code && int_fname) unlink(int_fname); + free(int_fname); + if (err.code && data_fname) unlink(data_fname); + free(data_fname); + if (err.code && data_dir) rmdir(data_dir); + free(data_dir); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +static int add_to_gridmap(glite_jp_context_t ctx, const char *dn) +{ + FILE *gridmap = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + gridmap = fopen(config->gridmap, "a"); + if (!gridmap) { + err.code = errno; + err.desc = "Cannot open gridmap file"; + return glite_jp_stack_error(ctx,&err); + } + if (fprintf(gridmap, "\"%s\" %s\n", dn, config->logname) < 6 || + ferror(gridmap)) { + err.code = EIO; + err.desc = "Cannot write to gridmap file"; + fclose(gridmap); + return glite_jp_stack_error(ctx,&err); + } + fclose(gridmap); + return 0; +} + +static int remove_from_gridmap(glite_jp_context_t ctx, const char *dn) +{ + FILE *gridmap = NULL; + char *temp_name = NULL; + FILE *temp_file = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + /* XXX */ return 0; } @@ -37,9 +370,123 @@ int glite_jppsbe_start_upload( time_t *commit_before_inout ) { + char *int_fname = NULL; + char *lock_fname = NULL; + FILE *lockfile = NULL; + FILE *regfile = NULL; + char *fname = NULL; + char *data_dir = NULL; + char *data_lock = NULL; + char *ju = NULL; + char *ju_path = NULL; + char *peername = NULL; + int info_version; + long reg_time; + char ownerhash[33]; + glite_jp_error_t err; + glite_jp_clear_error(ctx); - puts(__FUNCTION__); - return 0; + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job!=NULL); + assert(destination_out!=NULL); + + fname = class_to_fname(class); + assert(fname!=NULL); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + peername = glite_jp_peer_name(ctx); + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + if (errno == ENOENT) + err.desc = "Job not registered"; + else + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%*ld %*s %s ", &info_version, + ®_time, ownerhash) < 3 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + fclose(regfile); + + /* XXX authorization */ + + if (asprintf(&data_dir, "%s/data/%s/%d/%s", + config->internal_path, ownerhash, regtime_trunc(reg_time), ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + + if (asprintf(&lock_fname, "%s/%s" LOCK_SUFFIX, + data_dir, fname) == -1) { + err.code = ENOMEM; + goto error_out; + } + + if (commit_before_inout != NULL) + *commit_before_inout = (time_t) LONG_MAX; /* XXX no timeout enforced */ + + lockfile = fopen(lock_fname, "w"); + if (lockfile == NULL) { + err.code = errno; + err.desc = "Cannot open uploads's lock file"; + goto error_out; + } + + if (fprintf(lockfile, "%ld %d %s\n", (long)*commit_before_inout, + peername ? peername : 0, + peername ? peername : "") < 1 || ferror(regfile)) { + fclose(lockfile); + err.code = errno; + err.desc = "Cannot write upload's lock file"; + goto error_out; + } + if (fclose(lockfile) != 0 ) { + err.code = errno; + err.desc = "Cannot close(write) upload's lock file"; + goto error_out; + } + + if (asprintf(destination_out, "%s/data/%s/%d/%s/%s" UPLOAD_SUFFIX, + config->external_path, ownerhash, regtime_trunc(reg_time), ju, fname) == -1) { + err.code = ENOMEM; + goto error_out; + } + + if (add_to_gridmap(ctx, peername)) { + err.code = EIO; + err.desc = "Cannot add peer DN to ftp server authorization file"; + goto error_out; + } + +error_out: + free(int_fname); + free(data_dir); + if (err.code && data_lock) unlink(data_lock); + free(data_lock); + free(ju); free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } } int glite_jppsbe_commit_upload( @@ -47,9 +494,111 @@ int glite_jppsbe_commit_upload( const char *destination ) { + size_t dest_len; + size_t suff_len; + size_t extp_len; + long commit_before; + int lockpeerlen; + char *lockpeername = NULL; + char *peername = NULL; + char *dest_rw = NULL; + char *dest_rw_suff = NULL; + char *dest_rw_lock = NULL; + FILE *lockfile = NULL; + glite_jp_error_t err; + glite_jp_clear_error(ctx); - puts(__FUNCTION__); - return 0; + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(destination != NULL); + + suff_len = strlen(UPLOAD_SUFFIX); + dest_len = strlen(destination); + extp_len = strlen(config->external_path); + + if (dest_len < suff_len || + strcmp(UPLOAD_SUFFIX, destination + (dest_len - suff_len)) || + strncmp(destination, config->external_path, extp_len)) { + err.code = EINVAL; + err.desc = "Forged destination path"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&dest_rw_suff, "%s%s", config->internal_path, + destination + extp_len) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + dest_rw = strdup(dest_rw_suff); + if (!dest_rw) { + err.code = ENOMEM; + goto error_out; + } + *(dest_rw + (strlen(dest_rw_suff) - suff_len)) = '\0'; + + if (asprintf(&dest_rw_lock, "%s" LOCK_SUFFIX, dest_rw) == -1) { + err.code = ENOMEM; + goto error_out; + } + + lockfile = fopen(dest_rw_lock, "r"); + if (lockfile == NULL) { + err.code = errno; + err.desc = "Cannot open upload's lock file"; + goto error_out; + } + if (fscanf(lockfile, "%ld %d ", &commit_before, &lockpeerlen) < 2 || ferror(lockfile)) { + fclose(lockfile); + err.code = errno; + err.desc = "Cannot read upload's lock file"; + goto error_out; + } + if (lockpeerlen) { + lockpeername = (char*) calloc(1, lockpeerlen+1); + if (!lockpeername) { + err.code = ENOMEM; + goto error_out; + } + if (fgets(lockpeername, lockpeerlen+1, lockfile) == NULL) { + fclose(lockfile); + err.code = errno; + err.desc = "Cannot read upload's lock file"; + goto error_out; + } + } + fclose(lockfile); + + peername = glite_jp_peer_name(ctx); + if (lockpeername && (!peername || strcmp(lockpeername, peername))) { + err.code = EPERM; + err.desc = "Upload started by client of different identity"; + goto error_out; + } + + if (rename(dest_rw_suff, dest_rw) < 0) { + err.code = errno; + err.desc = "Cannot move upload file to the final place"; + goto error_out; + } + + if (unlink(dest_rw_lock) < 0) { + err.code = errno; + err.desc = "Cannot unlink upload's lock file"; + goto error_out; + } + +error_out: + free(dest_rw); + free(dest_rw_suff); + free(dest_rw_lock); + free(peername); + free(lockpeername); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } } int glite_jppsbe_destination_info( @@ -59,9 +608,92 @@ int glite_jppsbe_destination_info( glite_jp_fileclass_t *class ) { + size_t dest_len; + size_t suff_len; + size_t extp_len; + char *dest_rw = NULL; + char *dest_rw_suff = NULL; + char *dest_rw_info = NULL; + FILE *infofile = NULL; + char *classname = NULL; + char jobstr[256+1]; + glite_jp_error_t err; + + assert(destination != NULL); + assert(job != NULL); + assert(class != NULL); + glite_jp_clear_error(ctx); - puts(__FUNCTION__); - return 0; + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + suff_len = strlen(UPLOAD_SUFFIX); + dest_len = strlen(destination); + extp_len = strlen(config->external_path); + + if (dest_len < suff_len || + strcmp(UPLOAD_SUFFIX, destination + (dest_len - suff_len)) || + strncmp(destination, config->external_path, extp_len)) { + err.code = EINVAL; + err.desc = "Forged destination path"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&dest_rw_suff, "%s%s", config->internal_path, + destination + extp_len) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + dest_rw = strdup(dest_rw_suff); + if (!dest_rw) { + err.code = ENOMEM; + goto error_out; + } + *(dest_rw + (strlen(dest_rw_suff) - suff_len)) = '\0'; + + classname = strrchr(dest_rw,'/'); + if (classname == NULL) { + err.code = EINVAL; + err.desc = "Forged destination path"; + goto error_out; + } + *classname++ ='\0'; + *class = fname_to_class(classname); + if (!class == GLITE_JP_FILECLASS_UNDEF) { + err.code = EINVAL; + err.desc = "Forged destination path"; + goto error_out; + } + + if (asprintf(&dest_rw_info, "%s/_info", dest_rw) == -1) { + err.code = ENOMEM; + goto error_out; + } + + infofile = fopen(dest_rw_info, "r"); + if (infofile == NULL) { + err.code = errno; + err.desc = "Cannot open _info file"; + goto error_out; + } + if (fscanf(infofile, "%*d %*ld.%*ld %256s ", jobstr) < 1 || ferror(infofile)) { + fclose(infofile); + err.code = errno; + err.desc = "Cannot read _info file"; + goto error_out; + } + *job = strdup(jobstr); + fclose(infofile); + +error_out: + free(dest_rw); + free(dest_rw_suff); + free(dest_rw_info); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } } @@ -72,9 +704,143 @@ int glite_jppsbe_get_job_url( char **url_out ) { + FILE *regfile = NULL; + char *int_fname = NULL; + char *fname = NULL; + char *ju = NULL; + char *ju_path = NULL; + int info_version; + long reg_time; + char ownerhash[33]; + glite_jp_error_t err; + glite_jp_clear_error(ctx); - puts(__FUNCTION__); - return 0; + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job!=NULL); + assert(class != GLITE_JP_FILECLASS_UNDEF); + assert(url_out != NULL); + + fname = class_to_fname(class); + assert(fname!=NULL); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + if (errno == ENOENT) + err.desc = "Job not registered"; + else + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%*ld %*s %s", &info_version, + ®_time, ownerhash) < 3 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + fclose(regfile); + + if (asprintf(url_out, "%s/data/%s/%d/%s/%s", + config->external_path, ownerhash, regtime_trunc(reg_time), ju, fname) == -1) { + err.code = ENOMEM; + goto error_out; + } + +error_out: + free(int_fname); + free(ju); free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +static int get_job_fname( + glite_jp_context_t ctx, + const char *job, + glite_jp_fileclass_t class, + char **fname_out +) +{ + FILE *regfile = NULL; + char *fname = NULL; + char *int_fname = NULL; + char *ju = NULL; + char *ju_path = NULL; + int info_version; + long reg_time; + char ownerhash[33]; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job!=NULL); + assert(class != GLITE_JP_FILECLASS_UNDEF); + assert(fname_out != NULL); + + fname = class_to_fname(class); + assert(fname!=NULL); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + if (errno == ENOENT) + err.desc = "Job not registered"; + else + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%*ld %*s %s", &info_version, + ®_time, ownerhash) < 3 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + fclose(regfile); + + if (asprintf(fname_out, "%s/data/%s/%d/%s/%s", + config->internal_path, ownerhash, regtime_trunc(reg_time), ju, fname) == -1) { + err.code = ENOMEM; + goto error_out; + } + +error_out: + free(int_fname); + free(ju); free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } } int glite_jppsbe_open_file( @@ -85,9 +851,52 @@ int glite_jppsbe_open_file( void **handle_out ) { + fhandle handle = NULL; + char* fname = NULL; + glite_jp_error_t err; + + assert(handle_out != NULL); + glite_jp_clear_error(ctx); - puts(__FUNCTION__); - return 0; + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (get_job_fname(ctx, job, class, &fname)) { + err.code = ctx->error->code; + err.desc = "Cannot construct internal filename"; + return glite_jp_stack_error(ctx,&err); + } + + handle = (fhandle) calloc(1,sizeof(*handle)); + if (handle == NULL) { + err.code = ENOMEM; + goto error_out; + } + + handle->fd = open(fname, mode, S_IRUSR | S_IWUSR); + if (handle->fd < 0) { + err.code = errno; + err.desc = "Cannot open requested file"; + free(handle); + goto error_out; + } + handle->fd_append = open(fname, mode | O_APPEND, S_IRUSR | S_IWUSR); + if (handle->fd_append < 0) { + err.code = errno; + err.desc = "Cannot open requested file for append"; + close(handle->fd); + free(handle); + goto error_out; + } + *handle_out = (void*) handle; + +error_out: + free(fname); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } } int glite_jppsbe_close_file( @@ -95,9 +904,32 @@ int glite_jppsbe_close_file( void *handle ) { + glite_jp_error_t err; + + assert(handle != NULL); + glite_jp_clear_error(ctx); - puts(__FUNCTION__); - return 0; + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (close(((fhandle)handle)->fd_append) < 0) { + err.code = errno; + err.desc = "Error closing file descriptor (fd_append)"; + goto error_out; + } + if (close(((fhandle)handle)->fd) < 0) { + err.code = errno; + err.desc = "Error closing file descriptor"; + goto error_out; + } + +error_out: + free(handle); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } } int glite_jppsbe_pread( @@ -105,11 +937,27 @@ int glite_jppsbe_pread( void *handle, void *buf, size_t nbytes, - off_t offset + off_t offset, + ssize_t *nbytes_ret ) { + ssize_t ret; + glite_jp_error_t err; + + assert(handle != NULL); + assert(buf != NULL); + glite_jp_clear_error(ctx); - puts(__FUNCTION__); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if ((ret = pread(((fhandle)handle)->fd, buf, nbytes, offset)) < 0) { + err.code = errno; + err.desc = "Error in pread()"; + return glite_jp_stack_error(ctx,&err); + } + *nbytes_ret = ret; + return 0; } @@ -121,8 +969,21 @@ int glite_jppsbe_pwrite( off_t offset ) { + glite_jp_error_t err; + + assert(handle != NULL); + assert(buf != NULL); + glite_jp_clear_error(ctx); - puts(__FUNCTION__); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (pwrite(((fhandle)handle)->fd, buf, nbytes, offset) < 0) { + err.code = errno; + err.desc = "Error in pwrite()"; + return glite_jp_stack_error(ctx,&err); + } + return 0; } @@ -133,22 +994,519 @@ int glite_jppsbe_append( size_t nbytes ) { + glite_jp_error_t err; + + assert(handle != NULL); + assert(buf != NULL); + glite_jp_clear_error(ctx); - puts(__FUNCTION__); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (write(((fhandle)handle)->fd_append, buf, nbytes) < 0) { + err.code = errno; + err.desc = "Error in write()"; + return glite_jp_stack_error(ctx,&err); + } + return 0; } +static int get_job_info( + glite_jp_context_t ctx, + const char *job, + char **owner, + struct timeval *tv_reg +) +{ + char *ju = NULL; + char *ju_path = NULL; + FILE *regfile = NULL; + long reg_time_sec; + long reg_time_usec; + int ownerlen = 0; + int info_version; + char *int_fname = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + if (errno == ENOENT) + err.desc = "Job not registered"; + else + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%ld %*s %*s %d ", &info_version, + ®_time_sec, ®_time_usec, &ownerlen) < 4 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + if (ownerlen) { + *owner = (char *) calloc(1, ownerlen+1); + if (!*owner) { + err.code = ENOMEM; + goto error_out; + } + if (fgets(*owner, ownerlen+1, regfile) == NULL) { + fclose(regfile); + free(*owner); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + } + fclose(regfile); + + tv_reg->tv_sec = reg_time_sec; + tv_reg->tv_usec = reg_time_usec; + +error_out: + free(int_fname); + free(ju); + free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +static int get_job_info_int( + glite_jp_context_t ctx, + const char *int_fname, + char **jobid, + char **owner, + struct timeval *tv_reg +) +{ + FILE *regfile = NULL; + long reg_time_sec; + long reg_time_usec; + int ownerlen = 0; + int info_version; + char jobid_buf[256]; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%ld %s %*s %d ", &info_version, + ®_time_sec, ®_time_usec, jobid_buf, &ownerlen) < 5 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + *jobid = strdup(jobid_buf); + if (ownerlen) { + *owner = (char *) calloc(1, ownerlen+1); + if (!*owner) { + err.code = ENOMEM; + goto error_out; + } + if (fgets(*owner, ownerlen+1, regfile) == NULL) { + fclose(regfile); + free(*owner); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + } + fclose(regfile); + + tv_reg->tv_sec = reg_time_sec; + tv_reg->tv_usec = reg_time_usec; + +error_out: + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + int glite_jppsbe_get_job_metadata( glite_jp_context_t ctx, const char *job, glite_jp_attrval_t attrs_inout[] ) { + int got_info = 0; + struct timeval tv_reg; + char *owner = NULL; + int got_tags = 0; + void *tags_handle = NULL; + glite_jp_tagval_t* tags = NULL; + int i,j; + glite_jp_error_t err; + + assert(job != NULL); + assert(attrs_inout != NULL); + glite_jp_clear_error(ctx); - puts(__FUNCTION__); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + for (i = 0; attrs_inout[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (attrs_inout[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + case GLITE_JP_ATTR_TIME: + if (!got_info) { + if (get_job_info(ctx, job, &owner, &tv_reg)) { + err.code = ctx->error->code; + err.desc = "Cannot retrieve job info"; + goto error_out; + } + got_info = 1; + } + break; + case GLITE_JP_ATTR_TAG: + if (!got_tags) { + if (glite_jppsbe_open_file(ctx, job, GLITE_JP_FILECLASS_TAGS, + O_RDONLY, &tags_handle)) { + err.code = ctx->error->code; + err.desc = "Cannot open tag file"; + goto error_out; + } + if (glite_jpps_tag_readall(ctx, tags_handle, &tags)) { + err.code = ctx->error->code; + err.desc = "Cannot read tags"; + glite_jppsbe_close_file(ctx, tags_handle); + goto error_out; + } + glite_jppsbe_close_file(ctx, tags_handle); + got_tags = 1; + } + break; + default: + err.code = EINVAL; + err.desc = "Invalid attribute type"; + goto error_out; + break; + } + + switch (attrs_inout[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + attrs_inout[i].value.s = strdup(owner); + if (!attrs_inout[i].value.s) { + err.code = ENOMEM; + err.desc = "Cannot copy owner string"; + goto error_out; + } + break; + case GLITE_JP_ATTR_TIME: + attrs_inout[i].value.time = tv_reg; + break; + case GLITE_JP_ATTR_TAG: + for (j = 0; tags[j].name != NULL; j++) { + if (!strcmp(tags[j].name, attrs_inout[i].attr.name)) { + if (glite_jpps_tagval_copy(ctx, &tags[j], + &attrs_inout[i].value.tag)) { + err.code = ENOMEM; + err.desc = "Cannot copy tag value"; + goto error_out; + } + break; + } + } + if (!tags[j].name) attrs_inout[i].value.tag.name = NULL; + break; + default: + break; + } + } + +error_out: + free(owner); + if (tags) for (j = 0; tags[j].name != NULL; j++) { + free(tags[j].name); + free(tags[j].value); + } + free(tags); + + if (err.code) { + while (i > 0) { + i--; + switch (attrs_inout[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + free(attrs_inout[i].value.s); + break; + case GLITE_JP_ATTR_TAG: + free(attrs_inout[i].value.tag.name); + free(attrs_inout[i].value.tag.value); + default: + break; + } + } + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} +static int compare_timeval(struct timeval a, struct timeval b) +{ + if (a.tv_sec < b.tv_sec) return -1; + if (a.tv_sec > b.tv_sec) return 1; + if (a.tv_usec < b.tv_usec) return -1; + if (a.tv_usec > b.tv_usec) return 1; return 0; } +static int query_phase2( + glite_jp_context_t ctx, + const char *ownerhash, + long regtime_tr, + int q_tags, + int md_tags, + const glite_jp_query_rec_t query[], + glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +); + +static int query_phase2( + glite_jp_context_t ctx, + const char *ownerhash, + long regtime_tr, + int q_tags, + int md_tags, + const glite_jp_query_rec_t query[], + glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +) +{ + char *time_dirname = NULL; + DIR *time_dirp = NULL; + struct dirent *jobent; + char *info_fname = NULL; + char *jobid = NULL; + char *owner = NULL; + struct timeval tv_reg; + void *tags_handle = NULL; + int matching; + int i, j; + glite_jp_tagval_t* tags = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (asprintf(&time_dirname, "%s/data/%s/%d", config->internal_path, + ownerhash, regtime_tr) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + time_dirp = opendir(time_dirname); + if (!time_dirp) { + free(time_dirname); + return 0; /* found nothing */ + } + while ((jobent = readdir(time_dirp)) != NULL) { + if (!strcmp(jobent->d_name, ".")) continue; + if (!strcmp(jobent->d_name, "..")) continue; + if (asprintf(&info_fname, "%s/%s/_info", time_dirname, + jobent->d_name) == -1) { + err.code = ENOMEM; + goto error_out; + } + if (get_job_info_int(ctx, info_fname, &jobid, &owner, &tv_reg)) { + err.code = EIO; + err.desc = "Cannot retrieve job info"; + goto error_out; + } + if (q_tags || md_tags) { + if (glite_jppsbe_open_file(ctx, jobid, GLITE_JP_FILECLASS_TAGS, + O_RDONLY, &tags_handle)) { + err.code = ctx->error->code; + err.desc = "Cannot open tag file"; + goto error_out; + } + if (glite_jpps_tag_readall(ctx, tags_handle, &tags)) { + err.code = ctx->error->code; + err.desc = "Cannot read tags"; + glite_jppsbe_close_file(ctx, tags_handle); + goto error_out; + } + glite_jppsbe_close_file(ctx, tags_handle); + tags_handle = NULL; + } + + matching = 1; + for (i = 0; matching && query[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (query[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + if (query[i].value.s == NULL || + strcmp(query[i].value.s, owner)) matching = 0; + break; + case GLITE_JP_ATTR_TIME: + switch (query[i].op) { + case GLITE_JP_QUERYOP_EQUAL: + matching = !compare_timeval(tv_reg, query[i].value.time); + break; + case GLITE_JP_QUERYOP_UNEQUAL: + matching = compare_timeval(tv_reg, query[i].value.time); + break; + case GLITE_JP_QUERYOP_LESS: + matching = compare_timeval(tv_reg, query[i].value.time) < 0; + break; + case GLITE_JP_QUERYOP_GREATER: + matching = compare_timeval(tv_reg, query[i].value.time) > 0; + break; + case GLITE_JP_QUERYOP_WITHIN: + matching = compare_timeval(tv_reg, query[i].value.time) >= 0 + && compare_timeval(tv_reg, query[i].value2.time) <= 0; + break; + } + break; + case GLITE_JP_ATTR_TAG: + if (!tags) { + matching = 0; + break; + } + for (j = 0; tags[j].name != NULL; j++) { + if (!strcmp(tags[j].name, query[i].attr.name)) { + switch (query[i].op) { + case GLITE_JP_QUERYOP_EQUAL: + matching = !strcmp(tags[j].value, query[i].value.s); + break; + case GLITE_JP_QUERYOP_UNEQUAL: + matching = strcmp(tags[j].value, query[i].value.s); + break; + case GLITE_JP_QUERYOP_LESS: + matching = strcmp(tags[j].value, query[i].value.s) < 0; + break; + case GLITE_JP_QUERYOP_GREATER: + matching = strcmp(tags[j].value, query[i].value.s) > 0; + break; + case GLITE_JP_QUERYOP_WITHIN: + matching = strcmp(tags[j].value, query[i].value.s) >= 0 \ + && strcmp(tags[j].value, query[i].value2.s) <= 0 ; + break; + default: + break; + } + } + } + break; + default: + break; + } + } + if (!matching) { + free(info_fname); info_fname = NULL; + free(jobid); jobid = NULL; + if (tags) for (j = 0; tags[j].name != NULL; j++) { + free(tags[j].name); + free(tags[j].value); + } + free(tags); tags = NULL; + continue; + } + + for (i = 0; metadata[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + metadata[i].value.s = owner; + break; + case GLITE_JP_ATTR_TIME: + metadata[i].value.time = tv_reg; + break; + case GLITE_JP_ATTR_TAG: + for (j = 0; tags[j].name != NULL; j++) { + if (!strcmp(tags[j].name, metadata[i].attr.name)) { + if (glite_jpps_tagval_copy(ctx, &tags[j], + &metadata[i].value.tag)) { + err.code = ENOMEM; + err.desc = "Cannot copy tag value"; + goto error_out; + } + break; + } + } + if (!tags[j].name) { + metadata[i].value.tag.name = NULL; + metadata[i].value.tag.value = NULL; + } + break; + default: + break; + } + } + (*callback)(ctx, jobid, metadata); + free(jobid); jobid = NULL; + while (i > 0) { + i--; + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_TAG: + free(metadata[i].value.tag.name); + free(metadata[i].value.tag.value); + default: + break; + } + } + } + +error_out: + if (tags) for (j = 0; tags[j].name != NULL; j++) { + free(tags[j].name); + free(tags[j].value); + } + if (tags_handle) glite_jppsbe_close_file(ctx, tags_handle); + free(info_fname); + free(owner); + free(jobid); + closedir(time_dirp); + free(time_dirname); + if (err.code) { + while (i > 0) { + i--; + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_TAG: + free(metadata[i].value.tag.name); + free(metadata[i].value.tag.value); + default: + break; + } + } + return glite_jp_stack_error(ctx,&err); + } else + return 0; +} + int glite_jppsbe_query( glite_jp_context_t ctx, const glite_jp_query_rec_t query[], @@ -160,7 +1518,173 @@ int glite_jppsbe_query( ) ) { + /* XXX clone metadata */ + int i; + char *q_exact_owner = NULL; + char *ownerhash = NULL; + long q_min_time = 0; + long q_max_time = LONG_MAX; + long q_min_time_tr; + long q_max_time_tr; + int q_with_tags = 0; + int md_info = 0; + int md_tags = 0; + char *owner_dirname = NULL; + DIR *owner_dirp = NULL; + struct dirent *ttimeent; + char *data_dirname = NULL; + DIR *data_dirp = NULL; + struct dirent *ownerent; + long ttime = 0; + glite_jp_attrval_t *metadata_templ = NULL; + glite_jp_error_t err; + glite_jp_clear_error(ctx); - puts(__FUNCTION__); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + for (i = 0; query[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + if (query[i].attr.type == GLITE_JP_ATTR_OWNER && query[i].op == GLITE_JP_QUERYOP_EQUAL) { + q_exact_owner = query[i].value.s; + } + if (query[i].attr.type == GLITE_JP_ATTR_TIME) { + switch (query[i].op) { + case GLITE_JP_QUERYOP_EQUAL: + q_min_time = query[i].value.time.tv_sec; + q_max_time = query[i].value.time.tv_sec + 1; + break; + case GLITE_JP_QUERYOP_LESS: + if (q_max_time > query[i].value.time.tv_sec + 1) + q_max_time = query[i].value.time.tv_sec + 1; + break; + case GLITE_JP_QUERYOP_WITHIN: + if (q_max_time > query[i].value2.time.tv_sec + 1) + q_max_time = query[i].value2.time.tv_sec + 1; + /* fallthrough */ + case GLITE_JP_QUERYOP_GREATER: + if (q_min_time < query[i].value.time.tv_sec) + q_min_time = query[i].value.time.tv_sec; + break; + default: + err.code = EINVAL; + err.desc = "Invalid query op"; + return glite_jp_stack_error(ctx,&err); + break; + } + } + if (query[i].attr.type == GLITE_JP_ATTR_TAG) + q_with_tags = 1; + + } + + for (i = 0; metadata[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + case GLITE_JP_ATTR_TIME: + md_info = 1; + break; + case GLITE_JP_ATTR_TAG: + md_tags = 1; + break; + default: + err.code = EINVAL; + err.desc = "Invalid attribute type in metadata parameter"; + return glite_jp_stack_error(ctx,&err); + break; + } + } + metadata_templ = (glite_jp_attrval_t *) calloc(i + 1, sizeof(glite_jp_attrval_t)); + if (!metadata_templ) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + memcpy(metadata_templ, metadata, (i + 1) * sizeof(glite_jp_attrval_t)); + + q_min_time_tr = regtime_trunc(q_min_time); + q_max_time_tr = regtime_ceil(q_max_time); + + if (q_exact_owner) { + ownerhash = str2md5(q_exact_owner); /* static buffer */ + if (asprintf(&owner_dirname, "%s/data/%s", config->internal_path, ownerhash) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + owner_dirp = opendir(owner_dirname); + free(owner_dirname); + if (!owner_dirp) { + free(metadata_templ); + return 0; /* found nothing */ + } + while ((ttimeent = readdir(owner_dirp)) != NULL) { + if (!strcmp(ttimeent->d_name, ".")) continue; + if (!strcmp(ttimeent->d_name, "..")) continue; + ttime = atol(ttimeent->d_name); + if (ttime >= q_min_time_tr && ttime < q_max_time_tr) { + if (query_phase2(ctx, ownerhash, ttime, q_with_tags, md_tags, + query, metadata_templ, callback)) { + err.code = EIO; + err.desc = "query_phase2() error"; + goto error_out; + } + } + } + } else { /* !q_exact_owner */ + if (asprintf(&data_dirname, "%s/data", config->internal_path) == -1) { + err.code = ENOMEM; + goto error_out; + } + data_dirp = opendir(data_dirname); + if (!data_dirp) { + err.code = EIO; + err.desc = "Cannot open data directory"; + goto error_out; + } + while ((ownerent = readdir(data_dirp)) != NULL) { + if (!strcmp(ownerent->d_name, ".")) continue; + if (!strcmp(ownerent->d_name, "..")) continue; + if (asprintf(&owner_dirname, "%s/data/%s", config->internal_path, + ownerent->d_name) == -1) { + err.code = ENOMEM; + goto error_out; + } + owner_dirp = opendir(owner_dirname); + free(owner_dirname); + if (!owner_dirp) { + err.code = EIO; + err.desc = "Cannot open owner data directory"; + goto error_out; + } + while ((ttimeent = readdir(owner_dirp)) != NULL) { + if (!strcmp(ttimeent->d_name, ".")) continue; + if (!strcmp(ttimeent->d_name, "..")) continue; + ttime = atol(ttimeent->d_name); + if (ttime >= q_min_time_tr && ttime < q_max_time_tr) { + if (query_phase2(ctx, ownerent->d_name, ttime, q_with_tags, md_tags, + query, metadata_templ, callback)) { + err.code = EIO; + err.desc = "query_phase2() error"; + goto error_out; + } + } + } + closedir(owner_dirp); owner_dirp = NULL; + } + closedir(data_dirp); data_dirp = NULL; + } return 0; + +error_out: + if (owner_dirp) closedir(owner_dirp); + if (data_dirp) closedir(data_dirp); + free(data_dirname); + free(metadata_templ); + return glite_jp_stack_error(ctx,&err); } + +/* XXX: +- no primary authorization yet +- no concurrency control yet +- partial success in pwrite,append +- "unique" part of jobid is assumed to be unique across bookkeeping servers +- repository versioning not fully implemented yet +*/ diff --git a/org.glite.jp.primary/src/tags.c b/org.glite.jp.primary/src/tags.c index 25474f4..1f11b4d 100644 --- a/org.glite.jp.primary/src/tags.c +++ b/org.glite.jp.primary/src/tags.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #include "tags.h" @@ -60,3 +61,173 @@ int glite_jpps_tag_append( return 0; } +int glite_jpps_tagval_copy( + glite_jp_context_t ctx, + glite_jp_tagval_t *from, + glite_jp_tagval_t *to +) +{ + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + to->name = strdup(from->name); + if (!to->name) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + to->sequence = from->sequence; + to->timestamp = from->timestamp; + to->binary = from->binary; + to->size = from->size; + to->value = (char *) malloc(to->size); + if (!to->value) { + free(to->name); + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + memcpy(from->value, to->value, to->size); + + return 0; +} + +int glite_jpps_tag_read( + glite_jp_context_t ctx, + void *handle, + off_t offset, + glite_jp_tagval_t *tagvalue, + size_t *shift +) +{ + char hdr[HEADER_SIZE+1]; + unsigned int nlen; + unsigned long vlen; + char binary; + unsigned sequence; + unsigned timestamp; + char * name = NULL; + char * value = NULL; + ssize_t ret; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + hdr[HEADER_SIZE] = '\0'; + if (glite_jppsbe_pread(ctx, handle, hdr, HEADER_SIZE, offset, &ret)) { + err.code = EIO; + err.desc = "Cannot read tag header"; + goto error_out; + } + if (ret == 0) { + err.code = ENOENT; + err.desc = "No more tags in the file"; + goto error_out; + } + /* #define HEADER "JP#TAG# %05u %012lu %c %05u %012lu#" */ + if (sscanf(hdr, HEADER, &nlen, &vlen, &binary, &sequence, ×tamp) < 5) { + err.code = EILSEQ; + err.desc = "Incorrect tag header format"; + goto error_out; + } + name = (char*) malloc(nlen + 1); + if (!name) { + err.code = ENOMEM; + goto error_out; + } + name[nlen] = '\0'; + value = (char*) malloc(vlen + 1); + if (!value) { + err.code = ENOMEM; + goto error_out; + } + value[vlen] = '\0'; + if (glite_jppsbe_pread(ctx, handle, name, nlen, offset + HEADER_SIZE, &ret)) { + err.code = EIO; + err.desc = "Cannot read tag name"; + goto error_out; + } + if (glite_jppsbe_pread(ctx, handle, value, vlen, offset + HEADER_SIZE + nlen, &ret)) { + err.code = EIO; + err.desc = "Cannot read tag value"; + goto error_out; + } + + tagvalue->name = name; + tagvalue->sequence = sequence; + tagvalue->timestamp = timestamp; + tagvalue->binary = (binary == 'B') ? 1 : 0; + tagvalue->size = vlen; + tagvalue->value = value; + + *shift = HEADER_SIZE + nlen + vlen; + + return 0; +error_out: + free(name); + free(value); + return glite_jp_stack_error(ctx,&err); +} + +/* +int glite_jpps_tag_read(glite_jp_context_t, void *, off_t, glite_jp_tagval_t *, size_t); +int glite_jpps_tag_readall(glite_jp_context_t, void *, glite_jp_tagval_t **); +*/ + +int glite_jpps_tag_readall( + glite_jp_context_t ctx, + void *handle, + glite_jp_tagval_t **tags_out +) +{ + glite_jp_tagval_t * tags = NULL; + void * newspace; + int ntags = 0; + int ntagspace = 0; + off_t offset = 0; + int ret; + size_t shift; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + ntagspace = 1; + tags = (glite_jp_tagval_t *) calloc(ntagspace + 1, sizeof(*tags)); + if (!tags) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + while (!(ret = glite_jpps_tag_read(ctx, handle, offset, &tags[ntags], &shift))) { + offset += shift; + ntags++; + if (ntagspace <= ntags) { + ntagspace += 1; + newspace = realloc(tags, (ntagspace + 1) * sizeof(*tags)); + if (!newspace) { + err.code = ENOMEM; + goto error_out; + } + tags = (glite_jp_tagval_t *) newspace; + } + } + if (ret == ENOENT) { + *tags_out = tags; + return 0; + } else { + err.code = EIO; + err.desc = "Error reading tag value"; + } + +error_out: + for (; ntags-- ;) { + free(tags[ntags].name); + free(tags[ntags].value); + } + free(tags); + return glite_jp_stack_error(ctx,&err); +} -- 1.8.2.3