JPPS compress arrived data. It can work with old uncompress data too.
authorJiří Filipovič <fila@ics.muni.cz>
Thu, 11 Oct 2007 15:40:58 +0000 (15:40 +0000)
committerJiří Filipovič <fila@ics.muni.cz>
Thu, 11 Oct 2007 15:40:58 +0000 (15:40 +0000)
org.glite.jp.primary/src/new_ftp_backend.c
org.glite.jp.primary/src/soap_ops.c

index ed30a44..816a05c 100644 (file)
@@ -12,6 +12,7 @@
 #include <dirent.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <zlib.h>
 
 #include "glite/lbu/trio.h"
 #include "glite/lbu/escape.h"
@@ -43,8 +44,13 @@ struct ftpbe_config {
 static struct ftpbe_config *config = NULL;
 
 struct fhandle_rec {
-       int fd;
-       int fd_append;
+       gzFile  fd_gz;
+       char*   filename;
+       int     filemode;
+       char*   filedata;
+       int     offset;
+       int     eof;
+       int     modified;
 };
 typedef struct fhandle_rec *fhandle;
 
@@ -947,26 +953,66 @@ int glite_jppsbe_open_file(
                goto error_out;
        }
 
-       handle->fd = open(fname, mode, S_IRUSR | S_IWUSR);
-       if (handle->fd < 0) {
-               err.code = errno;
-               err.desc = "Cannot open requested file";
-               free(handle);
-               goto error_out;
+       int error = 0;
+       int created = 0;
+       if (mode % 4 == O_RDONLY)
+               handle->fd_gz = gzopen(fname, "r");
+       else if (mode % 4 == O_WRONLY)
+               handle->fd_gz = gzopen(fname, "r+");
+       else if (mode % 4 == O_RDWR){
+               handle->fd_gz = gzopen(fname, "r+");
+               if ((handle->fd_gz == NULL) && (mode & O_CREAT)){
+                       handle->fd_gz = gzopen(fname, "w+");
+                       created = 1; // when the file is created, gzread returns -2 
+               }
        }
-       handle->fd_append = open(fname, mode | O_APPEND, S_IRUSR | S_IWUSR);
-       if (handle->fd_append < 0) {
-               err.code = errno;
-               err.desc = "Cannot open requested file for append";
-               close(handle->fd);
-               free(handle);
-               goto error_out;
+       if (handle->fd_gz == NULL){
+               gzerror(((fhandle)handle)->fd_gz, &(err.code));
+                err.desc = "Cannot open requested file";
+                free(handle);
+                error = 1;
+                goto error_out;
+        }
+
+       handle->offset = 0;
+       handle->eof = 0;
+
+       if (! created){
+               const int READ_STEP = 8192;
+               //handle->filedata = malloc(sizeof(*handle->filedata)*READ_STEP);
+               int diff = 0;
+               char buf[READ_STEP];
+               do{
+                       diff = gzread(handle->fd_gz, buf/*handle->filedata + handle->eof*/, READ_STEP);
+                       if (diff < 0){
+                               gzerror(((fhandle)handle)->fd_gz, &(err.code));
+                               err.desc = "Error reading file";
+                               free(handle->filedata);
+                               error = 1;
+                               goto error_out;
+                       }
+                       handle->eof += diff;
+                       handle->filedata = realloc(handle->filedata, sizeof(*handle->filedata)*handle->eof);
+                       memcpy(handle->filedata + handle->eof - diff, buf, sizeof(*buf)*diff);
+               } while(diff == READ_STEP);
        }
+       
+       /*if (gzclose(fd_gz)){
+               gzerror(((fhandle)handle)->fd_gz, &(err.code));
+                err.desc = "Error closing file descriptor";
+               free(handle->filedata);
+                goto error_out;
+       }*/
+
+       handle->filename = strdup(fname);
+       handle->filemode = mode;
+        handle->modified = 0;
+
        *handle_out = (void*) handle;
 
 error_out:
        free(fname);
-       if (err.code) { 
+       if (error) { 
                return glite_jp_stack_error(ctx,&err);
        } else { 
                return 0;
@@ -986,15 +1032,28 @@ int glite_jppsbe_close_file(
        memset(&err,0,sizeof err);
        err.source = __FUNCTION__;
 
-       if (close(((fhandle)handle)->fd_append) < 0) {
-               err.code = errno;
-               err.desc = "Error closing file descriptor (fd_append)";
-               goto error_out;
+       if (gzclose(((fhandle)handle)->fd_gz)){
+               gzerror(((fhandle)handle)->fd_gz, &(err.code));
+                err.desc = "Error closing file descriptor";
+                goto error_out;
        }
-       if (close(((fhandle)handle)->fd) < 0) {
-               err.code = errno;
-               err.desc = "Error closing file descriptor";
-               goto error_out;
+
+       if (((fhandle)handle)->modified){
+               if ((((fhandle)handle)->fd_gz = gzopen(((fhandle)handle)->filename, "w")) == NULL){
+                       gzerror(((fhandle)handle)->fd_gz, &(err.code));
+                       err.desc = "Error opening file for write changes";
+                       goto error_out;
+               }
+               if (gzwrite(((fhandle)handle)->fd_gz, ((fhandle)handle)->filedata, ((fhandle)handle)->eof) < 0){
+                       gzerror(((fhandle)handle)->fd_gz, &(err.code));
+                        err.desc = "Error writing changes";
+                        goto error_out;
+               }
+               if (gzclose(((fhandle)handle)->fd_gz)){
+                       gzerror(((fhandle)handle)->fd_gz, &(err.code));
+                       err.desc = "Error closing file descriptor";
+                       goto error_out;
+               }
        }
 
 error_out:
@@ -1015,40 +1074,48 @@ int glite_jppsbe_file_attrs(glite_jp_context_t ctx, void *handle, struct stat *b
         memset(&err,0,sizeof err);
         err.source = __FUNCTION__;
 
-       if (! fstat(((fhandle)handle)->fd, buf)) {
+       if (! stat(((fhandle)handle)->filename, buf)) {
                err.code = errno;
                err.desc = "Error calling fstat";
-               return -1;
+               return glite_jp_stack_error(ctx,&err);
        }
 
        return 0;
 }
 
 int glite_jppsbe_pread(
-       glite_jp_context_t ctx,
-       void *handle,
-       void *buf,
-       size_t nbytes,
-       off_t offset,
-       ssize_t *nbytes_ret
+        glite_jp_context_t ctx,
+        void *handle,
+        void *buf,
+        size_t nbytes,
+        off_t offset,
+        ssize_t *nbytes_ret
 )
 {
-       ssize_t ret;
        glite_jp_error_t err;
 
-       assert(handle != NULL);
-       assert(buf != NULL);
+        assert(handle != NULL);
+        assert(buf != NULL);
 
        glite_jp_clear_error(ctx);
-       memset(&err,0,sizeof err);
-       err.source = __FUNCTION__;
+        memset(&err,0,sizeof err);
+        err.source = __FUNCTION__;
 
-       if ((ret = pread(((fhandle)handle)->fd, buf, nbytes, offset)) < 0) {
-               err.code = errno;
-               err.desc = "Error in pread()";
-               return glite_jp_stack_error(ctx,&err);
+       if (((fhandle)handle)->filename == NULL){
+               err.code = 0;
+               err.desc = "Cannot read, file not open";
+                return glite_jp_stack_error(ctx,&err);
        }
-       *nbytes_ret = ret;
+
+       int to_read;
+       if (offset + nbytes > ((fhandle)handle)->eof)
+               to_read = ((fhandle)handle)->eof - offset;
+       else
+               to_read = nbytes;
+       memcpy(buf, ((fhandle)handle)->filedata + offset, to_read);
+       ((fhandle)handle)->offset = offset + to_read;
+
+       *nbytes_ret = to_read;
 
        return 0;
 }
@@ -1070,15 +1137,73 @@ int glite_jppsbe_pwrite(
        memset(&err,0,sizeof err);
        err.source = __FUNCTION__;
 
-       if (pwrite(((fhandle)handle)->fd, buf, nbytes, offset) < 0) {
-               err.code = errno;
-               err.desc = "Error in pwrite()";
+       if (((fhandle)handle)->filename == NULL){
+                err.code = 0;
+                err.desc = "Cannot write, file not open";
+                return glite_jp_stack_error(ctx,&err);
+        }
+
+       if (((fhandle)handle)->filemode % 4 == 0){
+               err.desc = "Cannot write to readonly file";
                return glite_jp_stack_error(ctx,&err);
        }
+       
+       if (offset + nbytes > ((fhandle)handle)->eof){
+               ((fhandle)handle)->filedata = realloc(((fhandle)handle)->filedata, offset + nbytes);
+               ((fhandle)handle)->eof = offset + nbytes;
+       }
+
+       memcpy(((fhandle)handle)->filedata + offset, buf, nbytes);
+
+       ((fhandle)handle)->modified = 1;
 
        return 0;
 }
 
+int glite_jppsbe_compress_and_remove_file(
+       glite_jp_context_t ctx,
+        const char *job,
+        const char *class,
+        const char *name
+){
+       glite_jp_error_t err;
+        glite_jp_clear_error(ctx);
+        memset(&err,0,sizeof err);
+        err.source = __FUNCTION__;
+
+       char *src, *dest;
+       get_job_fname(ctx, job, class, name, &src);
+
+       dest = malloc(sizeof(*dest)*(strlen(src)+strlen(".gz")+1));
+       sprintf(dest, "%s.gz", src);
+
+       char buf[8192];
+       FILE* s = fopen(src, "r");
+       gzFile d = gzopen(dest, "w");
+       size_t l;
+       while ((l = fread(buf, sizeof(*buf), 8192, s)) > 0)
+               gzwrite(d, buf, sizeof(*buf)*l);
+       gzclose(d);
+       fclose(s);
+
+       char *ju, *ju_path;
+       if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) {
+                err.code = ctx->error->code;
+                err.desc = "Cannot obtain jobid unique path/name";
+               goto error_out;
+        }
+
+       rename(dest, src);
+
+error_out:
+        if (err.code) {
+                return glite_jp_stack_error(ctx,&err);
+        } else {
+                return 0;
+        }
+
+}
+
 int glite_jppsbe_append(
        glite_jp_context_t ctx,
        void *handle,
@@ -1095,11 +1220,11 @@ int glite_jppsbe_append(
        memset(&err,0,sizeof err);
        err.source = __FUNCTION__;
 
-       if (write(((fhandle)handle)->fd_append, buf, nbytes) < 0) {
-               err.code = errno;
-               err.desc = "Error in write()";
-               return glite_jp_stack_error(ctx,&err);
-       }
+        ((fhandle)handle)->filedata = realloc(((fhandle)handle)->filedata, ((fhandle)handle)->eof + nbytes);
+       memcpy(((fhandle)handle)->filedata + ((fhandle)handle)->eof, buf, nbytes);
+        
+       ((fhandle)handle)->eof += nbytes;
+       ((fhandle)handle)->modified = 1;
 
        return 0;
 }
index 92cc8f2..8c4ed06 100644 (file)
@@ -1,6 +1,7 @@
 #include <stdio.h>
 #include <fcntl.h>
 #include <assert.h>
+#include <sys/stat.h>
 
 #undef SOAP_FMAC1
 #define SOAP_FMAC1     static
@@ -38,6 +39,8 @@
 
 #define CONTEXT_FROM_SOAP(soap,ctx) glite_jp_context_t ctx = (glite_jp_context_t) ((soap)->user)
 
+#define SIZE_TO_COMPRESS 1024
+
 int glite_jpps_srv_init(glite_jp_context_t ctx)
 {
        glite_jp_soap_env_ctx = &my_soap_env_ctx;
@@ -172,6 +175,16 @@ SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__CommitUpload(
                                        }
                 }
 
+       char *fname;
+       //XXX ignore error
+       if (!glite_jppsbe_open_file(ctx,job,class, name, O_RDONLY, &beh)){
+               struct stat fattr;
+               glite_jppsbe_file_attrs(ctx, beh, &fattr);
+               glite_jppsbe_close_file(ctx, beh);
+               if (fattr.st_size > SIZE_TO_COMPRESS)
+                       glite_jppsbe_compress_and_remove_file(ctx,job,class, name);
+       }
+
        free(job); free(class); free(name);
 
        return SOAP_OK;