From 571aeee853359bd8a9a65a331db1cc4adda5bc61 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ji=C5=99=C3=AD=20Filipovi=C4=8D?= Date: Thu, 11 Oct 2007 15:40:58 +0000 Subject: [PATCH] JPPS compress arrived data. It can work with old uncompress data too. --- org.glite.jp.primary/src/new_ftp_backend.c | 225 ++++++++++++++++++++++------- org.glite.jp.primary/src/soap_ops.c | 13 ++ 2 files changed, 188 insertions(+), 50 deletions(-) diff --git a/org.glite.jp.primary/src/new_ftp_backend.c b/org.glite.jp.primary/src/new_ftp_backend.c index ed30a44..816a05c 100644 --- a/org.glite.jp.primary/src/new_ftp_backend.c +++ b/org.glite.jp.primary/src/new_ftp_backend.c @@ -12,6 +12,7 @@ #include #include #include +#include #include "glite/lbu/trio.h" #include "glite/lbu/escape.h" @@ -43,8 +44,13 @@ struct ftpbe_config { static struct ftpbe_config *config = NULL; struct fhandle_rec { - int fd; - int fd_append; + gzFile fd_gz; + char* filename; + int filemode; + char* filedata; + int offset; + int eof; + int modified; }; typedef struct fhandle_rec *fhandle; @@ -947,26 +953,66 @@ int glite_jppsbe_open_file( goto error_out; } - handle->fd = open(fname, mode, S_IRUSR | S_IWUSR); - if (handle->fd < 0) { - err.code = errno; - err.desc = "Cannot open requested file"; - free(handle); - goto error_out; + int error = 0; + int created = 0; + if (mode % 4 == O_RDONLY) + handle->fd_gz = gzopen(fname, "r"); + else if (mode % 4 == O_WRONLY) + handle->fd_gz = gzopen(fname, "r+"); + else if (mode % 4 == O_RDWR){ + handle->fd_gz = gzopen(fname, "r+"); + if ((handle->fd_gz == NULL) && (mode & O_CREAT)){ + handle->fd_gz = gzopen(fname, "w+"); + created = 1; // when the file is created, gzread returns -2 + } } - handle->fd_append = open(fname, mode | O_APPEND, S_IRUSR | S_IWUSR); - if (handle->fd_append < 0) { - err.code = errno; - err.desc = "Cannot open requested file for append"; - close(handle->fd); - free(handle); - goto error_out; + if (handle->fd_gz == NULL){ + gzerror(((fhandle)handle)->fd_gz, &(err.code)); + err.desc = "Cannot open requested file"; + free(handle); + error = 1; + goto error_out; + } + + handle->offset = 0; + handle->eof = 0; + + if (! created){ + const int READ_STEP = 8192; + //handle->filedata = malloc(sizeof(*handle->filedata)*READ_STEP); + int diff = 0; + char buf[READ_STEP]; + do{ + diff = gzread(handle->fd_gz, buf/*handle->filedata + handle->eof*/, READ_STEP); + if (diff < 0){ + gzerror(((fhandle)handle)->fd_gz, &(err.code)); + err.desc = "Error reading file"; + free(handle->filedata); + error = 1; + goto error_out; + } + handle->eof += diff; + handle->filedata = realloc(handle->filedata, sizeof(*handle->filedata)*handle->eof); + memcpy(handle->filedata + handle->eof - diff, buf, sizeof(*buf)*diff); + } while(diff == READ_STEP); } + + /*if (gzclose(fd_gz)){ + gzerror(((fhandle)handle)->fd_gz, &(err.code)); + err.desc = "Error closing file descriptor"; + free(handle->filedata); + goto error_out; + }*/ + + handle->filename = strdup(fname); + handle->filemode = mode; + handle->modified = 0; + *handle_out = (void*) handle; error_out: free(fname); - if (err.code) { + if (error) { return glite_jp_stack_error(ctx,&err); } else { return 0; @@ -986,15 +1032,28 @@ int glite_jppsbe_close_file( memset(&err,0,sizeof err); err.source = __FUNCTION__; - if (close(((fhandle)handle)->fd_append) < 0) { - err.code = errno; - err.desc = "Error closing file descriptor (fd_append)"; - goto error_out; + if (gzclose(((fhandle)handle)->fd_gz)){ + gzerror(((fhandle)handle)->fd_gz, &(err.code)); + err.desc = "Error closing file descriptor"; + goto error_out; } - if (close(((fhandle)handle)->fd) < 0) { - err.code = errno; - err.desc = "Error closing file descriptor"; - goto error_out; + + if (((fhandle)handle)->modified){ + if ((((fhandle)handle)->fd_gz = gzopen(((fhandle)handle)->filename, "w")) == NULL){ + gzerror(((fhandle)handle)->fd_gz, &(err.code)); + err.desc = "Error opening file for write changes"; + goto error_out; + } + if (gzwrite(((fhandle)handle)->fd_gz, ((fhandle)handle)->filedata, ((fhandle)handle)->eof) < 0){ + gzerror(((fhandle)handle)->fd_gz, &(err.code)); + err.desc = "Error writing changes"; + goto error_out; + } + if (gzclose(((fhandle)handle)->fd_gz)){ + gzerror(((fhandle)handle)->fd_gz, &(err.code)); + err.desc = "Error closing file descriptor"; + goto error_out; + } } error_out: @@ -1015,40 +1074,48 @@ int glite_jppsbe_file_attrs(glite_jp_context_t ctx, void *handle, struct stat *b memset(&err,0,sizeof err); err.source = __FUNCTION__; - if (! fstat(((fhandle)handle)->fd, buf)) { + if (! stat(((fhandle)handle)->filename, buf)) { err.code = errno; err.desc = "Error calling fstat"; - return -1; + return glite_jp_stack_error(ctx,&err); } return 0; } int glite_jppsbe_pread( - glite_jp_context_t ctx, - void *handle, - void *buf, - size_t nbytes, - off_t offset, - ssize_t *nbytes_ret + glite_jp_context_t ctx, + void *handle, + void *buf, + size_t nbytes, + off_t offset, + ssize_t *nbytes_ret ) { - ssize_t ret; glite_jp_error_t err; - assert(handle != NULL); - assert(buf != NULL); + assert(handle != NULL); + assert(buf != NULL); glite_jp_clear_error(ctx); - memset(&err,0,sizeof err); - err.source = __FUNCTION__; + memset(&err,0,sizeof err); + err.source = __FUNCTION__; - if ((ret = pread(((fhandle)handle)->fd, buf, nbytes, offset)) < 0) { - err.code = errno; - err.desc = "Error in pread()"; - return glite_jp_stack_error(ctx,&err); + if (((fhandle)handle)->filename == NULL){ + err.code = 0; + err.desc = "Cannot read, file not open"; + return glite_jp_stack_error(ctx,&err); } - *nbytes_ret = ret; + + int to_read; + if (offset + nbytes > ((fhandle)handle)->eof) + to_read = ((fhandle)handle)->eof - offset; + else + to_read = nbytes; + memcpy(buf, ((fhandle)handle)->filedata + offset, to_read); + ((fhandle)handle)->offset = offset + to_read; + + *nbytes_ret = to_read; return 0; } @@ -1070,15 +1137,73 @@ int glite_jppsbe_pwrite( memset(&err,0,sizeof err); err.source = __FUNCTION__; - if (pwrite(((fhandle)handle)->fd, buf, nbytes, offset) < 0) { - err.code = errno; - err.desc = "Error in pwrite()"; + if (((fhandle)handle)->filename == NULL){ + err.code = 0; + err.desc = "Cannot write, file not open"; + return glite_jp_stack_error(ctx,&err); + } + + if (((fhandle)handle)->filemode % 4 == 0){ + err.desc = "Cannot write to readonly file"; return glite_jp_stack_error(ctx,&err); } + + if (offset + nbytes > ((fhandle)handle)->eof){ + ((fhandle)handle)->filedata = realloc(((fhandle)handle)->filedata, offset + nbytes); + ((fhandle)handle)->eof = offset + nbytes; + } + + memcpy(((fhandle)handle)->filedata + offset, buf, nbytes); + + ((fhandle)handle)->modified = 1; return 0; } +int glite_jppsbe_compress_and_remove_file( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name +){ + glite_jp_error_t err; + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + char *src, *dest; + get_job_fname(ctx, job, class, name, &src); + + dest = malloc(sizeof(*dest)*(strlen(src)+strlen(".gz")+1)); + sprintf(dest, "%s.gz", src); + + char buf[8192]; + FILE* s = fopen(src, "r"); + gzFile d = gzopen(dest, "w"); + size_t l; + while ((l = fread(buf, sizeof(*buf), 8192, s)) > 0) + gzwrite(d, buf, sizeof(*buf)*l); + gzclose(d); + fclose(s); + + char *ju, *ju_path; + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + goto error_out; + } + + rename(dest, src); + +error_out: + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } + +} + int glite_jppsbe_append( glite_jp_context_t ctx, void *handle, @@ -1095,11 +1220,11 @@ int glite_jppsbe_append( memset(&err,0,sizeof err); err.source = __FUNCTION__; - if (write(((fhandle)handle)->fd_append, buf, nbytes) < 0) { - err.code = errno; - err.desc = "Error in write()"; - return glite_jp_stack_error(ctx,&err); - } + ((fhandle)handle)->filedata = realloc(((fhandle)handle)->filedata, ((fhandle)handle)->eof + nbytes); + memcpy(((fhandle)handle)->filedata + ((fhandle)handle)->eof, buf, nbytes); + + ((fhandle)handle)->eof += nbytes; + ((fhandle)handle)->modified = 1; return 0; } diff --git a/org.glite.jp.primary/src/soap_ops.c b/org.glite.jp.primary/src/soap_ops.c index 92cc8f2..8c4ed06 100644 --- a/org.glite.jp.primary/src/soap_ops.c +++ b/org.glite.jp.primary/src/soap_ops.c @@ -1,6 +1,7 @@ #include #include #include +#include #undef SOAP_FMAC1 #define SOAP_FMAC1 static @@ -38,6 +39,8 @@ #define CONTEXT_FROM_SOAP(soap,ctx) glite_jp_context_t ctx = (glite_jp_context_t) ((soap)->user) +#define SIZE_TO_COMPRESS 1024 + int glite_jpps_srv_init(glite_jp_context_t ctx) { glite_jp_soap_env_ctx = &my_soap_env_ctx; @@ -172,6 +175,16 @@ SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__CommitUpload( } } + char *fname; + //XXX ignore error + if (!glite_jppsbe_open_file(ctx,job,class, name, O_RDONLY, &beh)){ + struct stat fattr; + glite_jppsbe_file_attrs(ctx, beh, &fattr); + glite_jppsbe_close_file(ctx, beh); + if (fattr.st_size > SIZE_TO_COMPRESS) + glite_jppsbe_compress_and_remove_file(ctx,job,class, name); + } + free(job); free(class); free(name); return SOAP_OK; -- 1.8.2.3