From 6e5909237f59300c2a8b31aa4019b3e374ace0b9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ji=C5=99=C3=AD=20=C5=A0kr=C3=A1bal?= Date: Thu, 14 Jul 2005 16:57:36 +0000 Subject: [PATCH] initial release of JP client module (containing jpps importer) --- org.glite.jp.client/.cvsignore | 1 + org.glite.jp.client/Makefile | 135 ++ org.glite.jp.client/build.xml | 86 + org.glite.jp.client/project/build.number | 1 + org.glite.jp.client/project/build.properties | 0 .../project/configure.properties.xml | 43 + org.glite.jp.client/project/properties.xml | 44 + org.glite.jp.client/project/tar_exclude | 10 + org.glite.jp.client/project/version.properties | 2 + org.glite.jp.client/src/authz.c | 76 + org.glite.jp.client/src/authz.h | 18 + org.glite.jp.client/src/backend.h | 116 ++ org.glite.jp.client/src/bones_server.c | 327 ++++ org.glite.jp.client/src/builtin_plugins.h | 7 + org.glite.jp.client/src/db.h | 83 + org.glite.jp.client/src/feed.c | 327 ++++ org.glite.jp.client/src/feed.h | 21 + org.glite.jp.client/src/file_plugin.c | 115 ++ org.glite.jp.client/src/ftp_backend.c | 1744 +++++++++++++++++++ org.glite.jp.client/src/is_client.c | 38 + org.glite.jp.client/src/jpimporter.c | 243 +++ org.glite.jp.client/src/jptype_map.h | 18 + org.glite.jp.client/src/mysql.c | 265 +++ org.glite.jp.client/src/new_ftp_backend.c | 1790 ++++++++++++++++++++ org.glite.jp.client/src/simple_server.c | 59 + org.glite.jp.client/src/soap_ops.c | 465 +++++ org.glite.jp.client/src/tags.c | 233 +++ org.glite.jp.client/src/tags.h | 1 + org.glite.jp.client/src/tags_plugin.c | 148 ++ org.glite.jp.client/src/typemap.dat | 3 + 30 files changed, 6419 insertions(+) create mode 100644 org.glite.jp.client/.cvsignore create mode 100644 org.glite.jp.client/Makefile create mode 100755 org.glite.jp.client/build.xml create mode 100644 org.glite.jp.client/project/build.number create mode 100644 org.glite.jp.client/project/build.properties create mode 100644 org.glite.jp.client/project/configure.properties.xml create mode 100755 org.glite.jp.client/project/properties.xml create mode 100644 org.glite.jp.client/project/tar_exclude create mode 100644 org.glite.jp.client/project/version.properties create mode 100644 org.glite.jp.client/src/authz.c create mode 100644 org.glite.jp.client/src/authz.h create mode 100644 org.glite.jp.client/src/backend.h create mode 100644 org.glite.jp.client/src/bones_server.c create mode 100644 org.glite.jp.client/src/builtin_plugins.h create mode 100644 org.glite.jp.client/src/db.h create mode 100644 org.glite.jp.client/src/feed.c create mode 100644 org.glite.jp.client/src/feed.h create mode 100644 org.glite.jp.client/src/file_plugin.c create mode 100644 org.glite.jp.client/src/ftp_backend.c create mode 100644 org.glite.jp.client/src/is_client.c create mode 100644 org.glite.jp.client/src/jpimporter.c create mode 100644 org.glite.jp.client/src/jptype_map.h create mode 100644 org.glite.jp.client/src/mysql.c create mode 100644 org.glite.jp.client/src/new_ftp_backend.c create mode 100644 org.glite.jp.client/src/simple_server.c create mode 100644 org.glite.jp.client/src/soap_ops.c create mode 100644 org.glite.jp.client/src/tags.c create mode 100644 org.glite.jp.client/src/tags.h create mode 100644 org.glite.jp.client/src/tags_plugin.c create mode 100644 org.glite.jp.client/src/typemap.dat diff --git a/org.glite.jp.client/.cvsignore b/org.glite.jp.client/.cvsignore new file mode 100644 index 0000000..3a4edf6 --- /dev/null +++ b/org.glite.jp.client/.cvsignore @@ -0,0 +1 @@ +.project diff --git a/org.glite.jp.client/Makefile b/org.glite.jp.client/Makefile new file mode 100644 index 0000000..345a7c1 --- /dev/null +++ b/org.glite.jp.client/Makefile @@ -0,0 +1,135 @@ +# defaults +top_srcdir=. +builddir=build +top_builddir=${top_srcdir}/${builddir} +stagedir=. +distdir=. +globalprefix=glite +jpprefix=jp +package=glite-jp-client +version=0.0.0 +PREFIX=/opt/glite + +glite_location=/opt/glite +globus_prefix=/opt/globus +nothrflavour=gcc32 +thrflavour=gcc32pthr +expat_prefix=/opt/expat +ares_prefix=/opt/ares +gsoap_prefix=/software/gsoap-2.6 + +CC=gcc + +-include Makefile.inc + + +VPATH=${top_srcdir}/src:${top_srcdir}/examples:${top_srcdir}/project:${stagedir}/interface + +GLOBUS_LIBS:=-L${globus_prefix}/lib \ + -lglobus_common_${nothrflavour} \ + -lglobus_gssapi_gsi_${nothrflavour} + +GLOBUS_CFLAGS:=-I${globus_prefix}/include/${nothrflavour} + +DEBUG:=-g -O0 -DDEBUG + +CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${stagedir}/include ${GLOBUS_CFLAGS} +LDFLAGS:=-L${stagedir}/lib + +LINK:=libtool --mode=link ${CC} ${LDFLAGS} +LTCOMPILE:=libtool --mode=compile ${CC} ${CFLAGS} +LINKXX:=libtool --mode=link ${CXX} ${LDFLAGS} +INSTALL:=libtool --mode=install install + +daemon:=glite-jp-importer + +wsprefix:=jpps_ + +SRCS:= jpimporter.c \ + ${wsprefix}ClientLib.c ${wsprefix}C.c +# env_C.c + +EXA_SRCS:= + +OBJS:=${SRCS:.c=.o} +EXA_OBJS:=${EXA_SRCS:.c=.o} + +COMMONLIB:=-lglite_jp_common +GSOAPLIB:=-lglite_security_gsoap_plugin_${nothrflavour} -lglite_security_gss_${nothrflavour} \ + -L${gsoap_prefix}/lib -lgsoap${GSOAP_DEBUG} -L${ares_prefix}/lib -lares +LBMAILDIRLIB:=-lglite_lb_maildir + +default all: compile + +compile: ${daemon} + +${daemon}: ${OBJS} + ${LINK} -o $@ ${OBJS} ${LBMAILDIRLIB} ${COMMONLIB} ${GSOAPLIB} ${GLOBUS_LIBS} + + +JobProvenancePS.xh: %.xh: %.wsdl JobProvenanceTypes.wsdl typemap.dat + cp ${stagedir}/interface/JobProvenanceTypes.wsdl . + ${gsoap_prefix}/bin/wsdl2h -t ${top_srcdir}/src/typemap.dat -c -o $@ $< + rm -f JobProvenanceTypes.wsdl + +${wsprefix}Client.c ${wsprefix}ClientLib.c \ +${wsprefix}C.c ${wsprefix}H.h: JobProvenancePS.xh + ${gsoap_prefix}/bin/soapcpp2 -n -w -c -p ${wsprefix} JobProvenancePS.xh + +env_C.c env_Server.c: + touch env.xh + cp ${jpproject}/JobProvenanceTypes.wsdl . + ${gsoap_prefix}/bin/wsdl2h -t ${top_srcdir}/src/typemap.dat -c -o env.xh JobProvenanceTypes.wsdl + rm -f JobProvenanceTypes.wsdl + ${gsoap_prefix}/bin/soapcpp2 -w -c -p env_ env.xh + +${OBJS}: ${wsprefix}H.h soap_version.h + +soap_version.h: + ${gsoap_prefix}/bin/soapcpp2 /dev/null + perl -ne '$$. == 2 && /.*([0-9])\.([0-9])\.([0-9]).*/ && printf "#define GSOAP_VERSION %d%02d%02d\n",$$1,$$2,$$3' soapH.h >$@ + -rm soapC.cpp soapH.h soapStub.h soapClient.cpp soapServer.cpp soapClientLib.cpp soapServerLib.cpp + + + + +check: + -echo nothing yet + +doc: + +stage: compile + ${MAKE} PREFIX=${stagedir} DOSTAGE=yes install + +dist: distsrc distbin + +distsrc: + mkdir -p ${top_srcdir}/${package}-${version} + cd ${top_srcdir} && GLOBIGNORE="${package}-${version}" && cp -Rf * ${package}-${version} + cd ${top_srcdir} && tar -czf ${distdir}/${package}-${version}_src.tar.gz --exclude-from=project/tar_exclude ${package}-${version} + rm -rf ${top_srcdir}/${package}-${version} + +distbin: + $(MAKE) install PREFIX=`pwd`/tmpbuilddir${stagedir} + save_dir=`pwd`; cd tmpbuilddir${stagedir} && tar -czf $$save_dir/${top_srcdir}/${distdir}/${package}-${version}_bin.tar.gz *; cd $$save_dir + rm -rf tmpbuilddir + +install: + -mkdir -p ${PREFIX}/bin ${PREFIX}/etc ${PREFIX}/examples ${PREFIX}/etc/init.d + ${INSTALL} -m 755 ${daemon} ${PREFIX}/bin + +clean: + +# we have no real config.h but have to force gSoap not to use +# linux ftime with broken (aka obsolete) DST information +stdsoap2.o: ${gsoap_prefix}/devel/stdsoap2.c + test -f config.h || touch config.h + @echo 'The following warning "time_t (de)serialization is not MT safe on this platform" is harmless' + ${CC} -o $@ -c -DWITH_NONAMESPACES -DHAVE_CONFIG_H ${CFLAGS} ${gsoap_prefix}/devel/stdsoap2.c + + +%.lo: %.c + ${LTCOMPILE} -o $@ -c $< + +%.o: %.c + ${LTCOMPILE} -o $@ -c $< diff --git a/org.glite.jp.client/build.xml b/org.glite.jp.client/build.xml new file mode 100755 index 0000000..8a40155 --- /dev/null +++ b/org.glite.jp.client/build.xml @@ -0,0 +1,86 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/org.glite.jp.client/project/build.number b/org.glite.jp.client/project/build.number new file mode 100644 index 0000000..d794048 --- /dev/null +++ b/org.glite.jp.client/project/build.number @@ -0,0 +1 @@ +module.build=0 diff --git a/org.glite.jp.client/project/build.properties b/org.glite.jp.client/project/build.properties new file mode 100644 index 0000000..e69de29 diff --git a/org.glite.jp.client/project/configure.properties.xml b/org.glite.jp.client/project/configure.properties.xml new file mode 100644 index 0000000..3744be5 --- /dev/null +++ b/org.glite.jp.client/project/configure.properties.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + +top_srcdir=.. +builddir=build +stagedir=${stage.abs.dir} +distdir=${dist.dir} +globalprefix=${global.prefix} +lbprefix=${subsystem.prefix} +package=${module.package.name} +PREFIX=${install.dir} +version=${module.version} +glite_location=${with.glite.location} +globus_prefix=${with.globus.prefix} +expat_prefix=${with.expat.prefix} +ares_prefix=${with.ares.prefix} +gsoap_prefix=${with.gsoap.prefix} +mysql_prefix=${with.mysql.prefix} +mysql_version=${ext.mysql.version} +thrflavour=${with.globus.thr.flavor} +nothrflavour=${with.globus.nothr.flavor} +cppunit=${with.cppunit.prefix} +jpproject=${subsystem.project.dir} +project=${component.project.dir} + + + diff --git a/org.glite.jp.client/project/properties.xml b/org.glite.jp.client/project/properties.xml new file mode 100755 index 0000000..e2a32d0 --- /dev/null +++ b/org.glite.jp.client/project/properties.xml @@ -0,0 +1,44 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/org.glite.jp.client/project/tar_exclude b/org.glite.jp.client/project/tar_exclude new file mode 100644 index 0000000..e1fcd1a --- /dev/null +++ b/org.glite.jp.client/project/tar_exclude @@ -0,0 +1,10 @@ +tar_exclude +CVS +build.xml +build +build.properties +properties.xml +configure.properties.xml +.cvsignore +.project +.cdtproject diff --git a/org.glite.jp.client/project/version.properties b/org.glite.jp.client/project/version.properties new file mode 100644 index 0000000..cd1e9e7 --- /dev/null +++ b/org.glite.jp.client/project/version.properties @@ -0,0 +1,2 @@ +module.version=1.0.0 +module.age=1 diff --git a/org.glite.jp.client/src/authz.c b/org.glite.jp.client/src/authz.c new file mode 100644 index 0000000..3e6d6e4 --- /dev/null +++ b/org.glite.jp.client/src/authz.c @@ -0,0 +1,76 @@ +#include +#include +#include +#include + +#include "glite/jp/types.h" +#include "glite/jp/context.h" + +#include "jpps_H.h" + +int glite_jpps_authz(glite_jp_context_t ctx,int op,const char *job,const char *owner) +{ + glite_jp_error_t err; + char buf[200]; + int i; + + memset(&err,0,sizeof err); + glite_jp_clear_error(ctx); + err.source = __FUNCTION__; + err.code = EPERM; + + switch (op) { + case SOAP_TYPE___jpsrv__RegisterJob: + case SOAP_TYPE___jpsrv__StartUpload: + case SOAP_TYPE___jpsrv__CommitUpload: + for (i=0; ctx->trusted_peers && ctx->trusted_peers[i]; i++) + if (!strcmp(ctx->trusted_peers[i],ctx->peer)) return 0; + err.desc = "you are not a trusted peer"; + return glite_jp_stack_error(ctx,&err); + + case SOAP_TYPE___jpsrv__GetJob: + assert(owner); + return strcmp(owner,ctx->peer) ? glite_jp_stack_error(ctx,&err) : 0; + break; + + default: + snprintf(buf,sizeof buf,"%d: unknown operation",op); + err.desc = buf; + err.code = EINVAL; + return glite_jp_stack_error(ctx,&err); + } +} + +int glite_jpps_readauth(glite_jp_context_t ctx,const char *file) +{ + FILE *f = fopen(file,"r"); + glite_jp_error_t err; + int cnt = 0; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + if (!f) { + err.code = errno; + err.desc = file; + return glite_jp_stack_error(ctx,&err); + } + + ctx->trusted_peers = NULL; + while (!feof(f)) { + char buf[BUFSIZ]; + + if (fscanf(f,"%[^\n]\n",buf) != 1) { + err.code = EINVAL; + err.desc = file; + fclose(f); + return glite_jp_stack_error(ctx,&err); + } + + ctx->trusted_peers = realloc(ctx->trusted_peers, (cnt+1) * sizeof *ctx->trusted_peers); + ctx->trusted_peers[cnt++] = strdup(buf); + ctx->trusted_peers[cnt] = NULL; + } + fclose(f); + return 0; +} diff --git a/org.glite.jp.client/src/authz.h b/org.glite.jp.client/src/authz.h new file mode 100644 index 0000000..9451aef --- /dev/null +++ b/org.glite.jp.client/src/authz.h @@ -0,0 +1,18 @@ +/** + * Check authorisation of JPPS operation on job. + * + * \param[in] ctx JP context including peer name & other credentials (VOMS etc.) + * \param[in] op operation, one of SOAP_TYPE___jpsrv__* + * \param[in] job jobid of the job to decide upon + * \param[in] owner current known owner of the job (may be NULL), shortcut to avoid + * unnecessary database query. + * + * \retval 0 OK, operation permitted + * \retval EPERM denied + * \retval other error + */ + +int glite_jpps_authz(glite_jp_context_t ctx,int op,const char *job,const char *owner); + +int glite_jpps_readauth(glite_jp_context_t ctx,const char *file); + diff --git a/org.glite.jp.client/src/backend.h b/org.glite.jp.client/src/backend.h new file mode 100644 index 0000000..cf901fb --- /dev/null +++ b/org.glite.jp.client/src/backend.h @@ -0,0 +1,116 @@ +#ifndef __GLITE_JP_BACKEND +#define __GLITE_JP_BACKEND + +#include +#include + +int glite_jppsbe_init( + glite_jp_context_t ctx, + int argc, + char *argv[] +); + +int glite_jppsbe_init_slave( + glite_jp_context_t ctx +); + +int glite_jppsbe_register_job( + glite_jp_context_t ctx, + const char *job, + const char *owner +); + +int glite_jppsbe_start_upload( + glite_jp_context_t ctx, + const char *job, + const char *class, /* must be filesystem-friendly */ + const char *name, /* optional name within the class */ + const char *content_type, + char **destination_out, + time_t *commit_before_inout +); + +int glite_jppsbe_commit_upload( + glite_jp_context_t ctx, + const char *destination +); + +int glite_jppsbe_get_names( + glite_jp_context_t ctx, + const char *job, + const char *class, + char ***names_out +); + +int glite_jppsbe_destination_info( + glite_jp_context_t ctx, + const char *destination, + char **job_out, + char **class_out, + char **name_out +); + +int glite_jppsbe_get_job_url( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name, /* optional within class */ + char **url_out +); + +int glite_jppsbe_open_file( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name, /* optional within class */ + int mode, + void **handle_out +); + +int glite_jppsbe_close_file( + glite_jp_context_t ctx, + void *handle +); + +int glite_jppsbe_pread( + glite_jp_context_t ctx, + void *handle, + void *buf, + size_t nbytes, + off_t offset, + ssize_t *nbytes_ret +); + +int glite_jppsbe_pwrite( + glite_jp_context_t ctx, + void *handle, + void *buf, + size_t nbytes, + off_t offset +); + +int glite_jppsbe_append( + glite_jp_context_t ctx, + void *handle, + void *buf, + size_t nbytes +); + +int glite_jppsbe_get_job_metadata( + glite_jp_context_t ctx, + const char *job, + glite_jp_attrval_t attrs_inout[] +); + +int glite_jppsbe_query( + glite_jp_context_t ctx, + const glite_jp_query_rec_t query[], + const glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +); + +#endif diff --git a/org.glite.jp.client/src/bones_server.c b/org.glite.jp.client/src/bones_server.c new file mode 100644 index 0000000..8a47169 --- /dev/null +++ b/org.glite.jp.client/src/bones_server.c @@ -0,0 +1,327 @@ +#include +#include +#include +#include + +#include "glite/jp/types.h" +#include "glite/jp/context.h" + +#include "glite/lb/srvbones.h" +#include "glite/security/glite_gss.h" + +#include +#include "glite/security/glite_gsplugin.h" + +#include "backend.h" +#include "file_plugin.h" + +#include "soap_version.h" +#include "jpps_H.h" + +#define CONN_QUEUE 20 + +extern SOAP_NMAC struct Namespace jpis__namespaces[],jpps__namespaces[]; + +static int newconn(int,struct timeval *,void *); +static int request(int,struct timeval *,void *); +static int reject(int); +static int disconn(int,struct timeval *,void *); +static int data_init(void **data); + +static struct glite_srvbones_service stab = { + "JP Primary Storage", -1, newconn, request, reject, disconn +}; + +static time_t cert_mtime; +static char *server_cert, *server_key, *cadir; +static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL; +static char *mysubj; + +static char *port = "8901"; +static int debug = 1; + +static glite_jp_context_t ctx; + +static int call_opts(glite_jp_context_t,char *,char *,int (*)(glite_jp_context_t,int,char **)); + +char *glite_jp_default_namespace; + +int main(int argc, char *argv[]) +{ + int one = 1,opt,i; + edg_wll_GssStatus gss_code; + struct sockaddr_in a; + char *b_argv[20] = { "backend" },*p_argv[20] = { "plugins" },*com; + int b_argc,p_argc; + + glite_jp_init_context(&ctx); + + b_argc = p_argc = 1; + + while ((opt = getopt(argc,argv,"B:P:a:")) != EOF) switch (opt) { + case 'B': + assert(b_argc < 20); + if (com = strchr(optarg,',')) *com = 0; + + /* XXX: memleak -- who cares for once */ + asprintf(&b_argv[b_argc++],"-%s",optarg); + if (com) b_argv[b_argc++] = com+1; + + break; + case 'P': + assert(p_argc < 20); + p_argv[p_argc++] = optarg; + + break; + case 'a': + if (glite_jpps_readauth(ctx,optarg)) { + fprintf(stderr,"%s: %s\n",argv[0],glite_jp_error_chain(ctx)); + exit (1); + } + break; + case '?': fprintf(stderr,"usage: %s: -Bb,val ... -Pplugin.so ...\n" + "b is backend option\n",argv[0]); + exit (1); + } + + if (b_argc == 1) { + fputs("-B required\n",stderr); + exit (1); + } + + optind = 0; /* XXX: getopt used internally */ + if (glite_jppsbe_init(ctx,b_argc,b_argv)) { + fputs(glite_jp_error_chain(ctx), stderr); + exit(1); + } + + optind = 0; /* XXX: getopt used internally */ + if (b_argc > 1 && glite_jpps_fplug_load(ctx,p_argc,p_argv)) { + fputs(glite_jp_error_chain(ctx), stderr); + exit(1); + } + + srand48(time(NULL)); /* feed id generation */ + +#if GSOAP_VERSION <= 20602 + for (i=0; jpps__namespaces[i].id && strcmp(jpps__namespaces[i].id,"ns1"); i++); +#else + for (i=0; jpps__namespaces[i].id && strcmp(jpps__namespaces[i].id,"jpsrv"); i++); +#endif + assert(jpps__namespaces[i].id); + glite_jp_default_namespace = jpps__namespaces[i].ns; + + stab.conn = socket(PF_INET, SOCK_STREAM, 0); + if (stab.conn < 0) { + perror("socket"); + return 1; + } + + setsockopt(stab.conn,SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + + a.sin_family = AF_INET; + a.sin_addr.s_addr = INADDR_ANY; + a.sin_port = htons(atoi(port)); + if (bind(stab.conn,(struct sockaddr *) &a, sizeof(a)) ) { + char buf[200]; + + snprintf(buf,sizeof(buf),"bind(%d)",atoi(port)); + perror(buf); + return 1; + } + + if (listen(stab.conn,CONN_QUEUE)) { + perror("listen()"); + return 1; + } + + if (!server_cert || !server_key) + fprintf(stderr, "%s: WARNING: key or certificate file not specified, " + "can't watch them for changes\n", + argv[0]); + + if ( cadir ) setenv("X509_CERT_DIR", cadir, 1); + edg_wll_gss_watch_creds(server_cert, &cert_mtime); + + if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &mysubj, &gss_code)) + fprintf(stderr,"Server idenity: %s\n",mysubj); + else fputs("WARNING: Running unauthenticated\n",stderr); + + /* XXX: daemonise */ + + glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT,1); + glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug); + + return 0; +} + +static int data_init(void **data) +{ + *data = (void *) soap_new(); + + printf("[%d] slave started\n",getpid()); + glite_jppsbe_init_slave(ctx); /* XXX: global but slave's */ + + return 0; +} + +static int newconn(int conn,struct timeval *to,void *data) +{ + struct soap *soap = (struct soap *) data; + glite_gsplugin_Context plugin_ctx; + + gss_cred_id_t newcred = GSS_C_NO_CREDENTIAL; + edg_wll_GssStatus gss_code; + gss_name_t client_name = GSS_C_NO_NAME; + gss_buffer_desc token = GSS_C_EMPTY_BUFFER; + OM_uint32 maj_stat,min_stat; + + + int ret = 0; + + soap_init2(soap,SOAP_IO_KEEPALIVE,SOAP_IO_KEEPALIVE); + soap_set_namespaces(soap,jpps__namespaces); + soap->user = (void *) ctx; /* XXX: one instance per slave */ + +/* not yet: client to JP index + ctx->other_soap = soap_new(); + soap_init(ctx->other_soap); + soap_set_namespaces(ctx->other_soap,jpis__namespaces); +*/ + + + glite_gsplugin_init_context(&plugin_ctx); + plugin_ctx->connection = calloc(1,sizeof *plugin_ctx->connection); + soap_register_plugin_arg(soap,glite_gsplugin,plugin_ctx); + + switch (edg_wll_gss_watch_creds(server_cert,&cert_mtime)) { + case 0: break; + case 1: if (!edg_wll_gss_acquire_cred_gsi(server_cert,server_key, + &newcred,NULL,&gss_code)) + { + + printf("[%d] reloading credentials\n",getpid()); /* XXX: log */ + gss_release_cred(&min_stat,&mycred); + mycred = newcred; + } + break; + case -1: + printf("[%d] edg_wll_gss_watch_creds failed\n", getpid()); /* XXX: log */ + break; + } + + /* TODO: DNS paranoia etc. */ + + if (edg_wll_gss_accept(mycred,conn,to,plugin_ctx->connection,&gss_code)) { + printf("[%d] GSS connection accept failed, closing.\n", getpid()); + ret = 1; + goto cleanup; + } + + maj_stat = gss_inquire_context(&min_stat,plugin_ctx->connection->context, + &client_name, NULL, NULL, NULL, NULL, NULL, NULL); + + if (!GSS_ERROR(maj_stat)) + maj_stat = gss_display_name(&min_stat,client_name,&token,NULL); + + if (ctx->peer) free(ctx->peer); + if (!GSS_ERROR(maj_stat)) { + printf("[%d] client DN: %s\n",getpid(),(char *) token.value); /* XXX: log */ + + ctx->peer = strdup(token.value); + memset(&token, 0, sizeof(token)); + } + else { + printf("[%d] annonymous client\n",getpid()); + ctx->peer = NULL; + } + + if (client_name != GSS_C_NO_NAME) gss_release_name(&min_stat, &client_name); + if (token.value) gss_release_buffer(&min_stat, &token); + + return 0; + +cleanup: + glite_gsplugin_free_context(plugin_ctx); + soap_end(soap); + + return ret; +} + +static int request(int conn,struct timeval *to,void *data) +{ + struct soap *soap = data; + glite_jp_context_t ctx = soap->user; + + glite_gsplugin_set_timeout(glite_gsplugin_get_context(soap),to); + +/* FIXME: does not work, ask nykolas */ + soap->max_keep_alive = 1; /* XXX: prevent gsoap to close connection */ + soap_begin(soap); + if (soap_begin_recv(soap)) { + if (soap->error < SOAP_STOP) { + soap_send_fault(soap); + return EIO; + } + return ENOTCONN; + } + + if (soap_envelope_begin_in(soap) + || soap_recv_header(soap) + || soap_body_begin_in(soap) + || jpps__serve_request(soap) +#if GSOAP_VERSION >= 20700 + || (soap->fserveloop && soap->fserveloop(soap)) +#endif + ) + { + soap_send_fault(soap); + return ctx->error->code; /* XXX: shall we die on some errors? */ + } + + glite_jp_run_deferred(ctx); + return 0; +} + +static int reject(int conn) +{ + int flags = fcntl(conn, F_GETFL, 0); + + fcntl(conn,F_SETFL,flags | O_NONBLOCK); + edg_wll_gss_reject(conn); + + return 0; +} + +static int disconn(int conn,struct timeval *to,void *data) +{ + struct soap *soap = (struct soap *) data; + soap_end(soap); // clean up everything and close socket + + return 0; +} + +#define WSPACE "\t\n " + +static int call_opts(glite_jp_context_t ctx,char *opt,char *name,int (*f)(glite_jp_context_t,int,char **)) +{ + int ac = 1,ret,my_optind; + char **av = malloc(sizeof *av),*ap; + + *av = name; + for (ap = strtok(opt,WSPACE); ap; ap = strtok(NULL,WSPACE)) { + av = realloc(av,(ac+1) * sizeof *av); + av[ac++] = ap; + } + + my_optind = optind; + optind = 0; + ret = f(ctx,ac,av); + optind = my_optind; + free(av); + return ret; +} + + +/* XXX: we don't use it */ +SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} }; diff --git a/org.glite.jp.client/src/builtin_plugins.h b/org.glite.jp.client/src/builtin_plugins.h new file mode 100644 index 0000000..3b2c201 --- /dev/null +++ b/org.glite.jp.client/src/builtin_plugins.h @@ -0,0 +1,7 @@ + +#define GLITE_JP_FILETYPE_TAGS "urn:org.glite.jp.primary:tags" +#define GLITE_JP_FILETYPE_LB "urn:org.glite.jp.primary:lb" +#define GLITE_JP_FILETYPE_ISB "urn:org.glite.jp.primary:isb" +#define GLITE_JP_FILETYPE_OSB "urn:org.glite.jp.primary:osb" + +#define GLITE_JP_FPLUG_TAGS_APPEND 0 diff --git a/org.glite.jp.client/src/db.h b/org.glite.jp.client/src/db.h new file mode 100644 index 0000000..0b9f730 --- /dev/null +++ b/org.glite.jp.client/src/db.h @@ -0,0 +1,83 @@ +#ifndef _DB_H +#define _DB_H + +#ident "$Header$" + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + + +typedef struct _glite_jp_db_stmt_t *glite_jp_db_stmt_t; + +int glite_jp_db_connect( + glite_jp_context_t, /* INOUT: */ + char * /* IN: connect string user/password@host:database */ +); + +void glite_jp_db_close(glite_jp_context_t); + + +/* Parse and execute SQL statement. Returns number of rows selected, created + * or affected by update, or -1 on error */ + +int glite_jp_db_execstmt( + glite_jp_context_t, /* INOUT: */ + char *, /* IN: SQL statement */ + glite_jp_db_stmt_t * /* OUT: statement handle. Usable for + select only */ +); + + +/* Fetch next row of select statement. + * All columns are returned as fresh allocated strings + * + * return values: + * >0 - number of fields of the retrieved row + * 0 - no more rows + * -1 - error + * + * Errors are stored in context passed to previous glite_jp_db_execstmt() */ + +int glite_jp_db_fetchrow( + glite_jp_db_stmt_t, /* IN: statement */ + char ** /* OUT: array of fetched values. + * As number of columns is fixed and known, + * expects allocated array of pointers here */ +); + +/* Retrieve column names of a query statement */ + +int glite_jp_db_querycolumns( + glite_jp_db_stmt_t, /* IN: statement */ + char ** /* OUT: result set column names. Expects allocated array. */ +); + +/* Free the statement structure */ + +void glite_jp_db_freestmt( + glite_jp_db_stmt_t * /* INOUT: statement */ +); + + +/* convert time_t into database-specific time string + * returns pointer to static area that is changed by subsequent calls */ + +char *glite_jp_db_timetodb(time_t); +time_t glite_jp_db_dbtotime(char *); + + +/** + * Check database version. + */ +int glite_jp_db_dbcheckversion(glite_jp_context_t); + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/org.glite.jp.client/src/feed.c b/org.glite.jp.client/src/feed.c new file mode 100644 index 0000000..5d39565 --- /dev/null +++ b/org.glite.jp.client/src/feed.c @@ -0,0 +1,327 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "glite/jp/types.h" +#include "glite/jp/strmd5.h" +#include "feed.h" +#include "file_plugin.h" +#include "builtin_plugins.h" + + +/* + * seconds before feed expires: should be + * XXX: should be configurable, default for real deployment sort of 1 hour + */ +#define FEED_TTL 120 + +static int check_qry_item( + glite_jp_context_t ctx, + const glite_jp_query_rec_t *qry, + const glite_jp_attrval_t *attr +) +{ + int cmp,cmp2; + long scmp,ucmp; + + switch (qry->attr.type) { + case GLITE_JP_ATTR_OWNER: + case GLITE_JP_ATTR_TAG: + cmp = strcmp(attr->value.s,qry->value.s); + break; + case GLITE_JP_ATTR_TIME: + scmp = (ucmp = attr->value.time.tv_usec - qry->value.time.tv_usec) > 0 ? 0 : -1; + ucmp -= 1000000 * scmp; + scmp += attr->value.time.tv_sec - qry->value.time.tv_sec; + cmp = scmp ? scmp : ucmp; + break; + } + switch (qry->op) { + case GLITE_JP_QUERYOP_EQUAL: return !cmp; + case GLITE_JP_QUERYOP_UNEQUAL: return cmp; + case GLITE_JP_QUERYOP_LESS: return cmp < 0; + case GLITE_JP_QUERYOP_GREATER: return cmp > 0; + + case GLITE_JP_QUERYOP_WITHIN: + switch (qry->attr.type) { + case GLITE_JP_ATTR_OWNER: + case GLITE_JP_ATTR_TAG: + cmp2 = strcmp(attr->value.s,qry->value2.s); + break; + case GLITE_JP_ATTR_TIME: + scmp = (ucmp = attr->value.time.tv_usec - qry->value2.time.tv_usec) > 0 ? 0 : -1; + ucmp -= 1000000 * scmp; + scmp += attr->value.time.tv_sec - qry->value2.time.tv_sec; + cmp2 = scmp ? scmp : ucmp; + break; + } + return cmp >= 0 && cmp2 <= 0; + } +} + +/* XXX: limit on query size -- I'm lazy to malloc() */ +#define QUERY_MAX 100 + +static int match_feed( + glite_jp_context_t ctx, + const struct jpfeed *feed, + const char *job, + const glite_jp_attrval_t attrs[] /* XXX: not checked for correctness */ +) +{ + int i; + int attri[GLITE_JP_ATTR__LAST]; + int qi[QUERY_MAX]; + + glite_jp_attrval_t *newattr = NULL; + + glite_jp_clear_error(ctx); + + for (i=0; iqry) { + int j,complete = 1; + + memset(qi,0,sizeof qi); + for (i=0; feed->qry[i].attr.type; i++) { + assert(iqry[i].attr.type]) >=0) { + if (check_qry_item(ctx,feed->qry+i,attrs+j)) + qi[i] = 1; /* matched */ + else return 0; /* can't be satisfied */ + } + else complete = 0; + } + + /* not all attributes in query are known from input + * we have to retrieve job metadata from the backend + */ + if (!complete) { + glite_jp_attrval_t meta[GLITE_JP_ATTR__LAST+1]; + int qai[GLITE_JP_ATTR__LAST]; + + memset(meta,0,sizeof meta); + j=0; + for (i=0; feed->qry[i].attr.type; i++) if (!qi[i]) { + meta[j].attr.type = feed->qry[i].attr.type; + meta[j].attr.name = feed->qry[i].attr.name; + qai[feed->qry[i].attr.type] = i; + j++; + } + + if (glite_jppsbe_get_job_metadata(ctx,job,meta)) { + glite_jp_error_t err; + memset(&err,0,sizeof err); + err.code = EIO; + err.source = __FUNCTION__; + err.desc = "complete query"; + return glite_jp_stack_error(ctx,&err); + } + + for (i=0; j=meta[i].attr.type; i++) + if (!check_qry_item(ctx,feed->qry+qai[j],meta+i)) + return 0; + } + } + + /* matched completely */ + return glite_jpps_single_feed(ctx,feed->destination,job,attrs); + return 0; +} + +int glite_jpps_match_attr( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t attrs[] +) +{ + struct jpfeed *f = (struct jpfeed *) ctx->feeds; + int i,j; + int attri[GLITE_JP_ATTR__LAST]; + + glite_jp_clear_error(ctx); + + for (i=0; i= GLITE_JP_ATTR__LAST || + attrs[i].attr.type <= 0) + { + glite_jp_error_t err; + memset(&err,0,sizeof err); + err.code = EINVAL; + err.source = __FUNCTION__; + err.desc = "unknown attribute"; + return glite_jp_stack_error(ctx,&err); + } + if (attri[attrs[i].attr.type] >= 0) { + glite_jp_error_t err; + memset(&err,0,sizeof err); + err.code = EINVAL; + err.source = __FUNCTION__; + err.desc = "double attribute change"; + return glite_jp_stack_error(ctx,&err); + } + + attri[attrs[i].attr.type] = i; + } + + for (;f; f = f->next) { + for (i=0; f->attrs[i].type && attri[f->attrs[i].type] == -1; i++); + /* XXX: ignore any errors */ + if (f->attrs[i].type) match_feed(ctx,f,job,attrs); + } + + return glite_jp_clear_error(ctx); +} + +int glite_jpps_match_file( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name +) +{ + glite_jpps_fplug_data_t **pd = NULL; + int pi; + void *bh = NULL; + int ret; + + fprintf(stderr,"%s: %s %s %s\n",__FUNCTION__,job,class,name); + + switch (glite_jpps_fplug_lookup(ctx,class,&pd)) { + case ENOENT: return 0; /* XXX: shall we complain? */ + case 0: break; + default: return -1; + } + + for (pi=0; pd[pi]; pi++) { + int ci; + for (ci=0; pd[pi]->uris[ci]; ci++) if (!strcmp(pd[pi]->uris[ci],class)) { + void *ph; + + if (!bh && (ret = glite_jppsbe_open_file(ctx,job,pd[pi]->classes[ci],name,O_RDONLY,&bh))) { + free(pd); + return ret; + } + + if (pd[pi]->ops.open(pd[pi]->fpctx,bh,class,&ph)) { + /* XXX: complain more visibly */ + fputs("plugin open failed\n",stderr); + continue; + } + + /* XXX: does not belong here but I'd like to avoid opening the file twice */ + if (!strcmp(class,GLITE_JP_FILETYPE_LB)) { + glite_jp_attr_t owner = { GLITE_JP_ATTR_OWNER, NULL }; + glite_jp_attrval_t *val; + + switch (pd[pi]->ops.attr(pd[pi]->fpctx,ph,owner,&val)) { + case ENOENT: + case ENOSYS: abort(); + case 0: printf("LB plugin: owner = %s\n",val[0].value.s); + /* TODO: store it in backend */ + + glite_jp_attrval_free(val,1); + break; + + default: /* TODO: complain */; break; + } + } + + /* TODO: extract attributes for the feeds */ + + + pd[pi]->ops.close(pd[pi]->fpctx,ph); + } + } + + if (bh) glite_jppsbe_close_file(ctx,bh); + free(pd); + + return 0; +} + +int glite_jpps_match_tag( + glite_jp_context_t ctx, + const char *job, + const glite_jp_tagval_t *tag +) +{ + fprintf(stderr,"%s: \n",__FUNCTION__); + return 0; +} + +static char *generate_feedid(void) +{ + char hname[200],buf[1000]; + + gethostname(hname,sizeof hname); + snprintf(buf,sizeof buf,"%s%d%ld",hname,getpid(),lrand48()); + buf[sizeof buf-1] = 0; + return str2md5base64(buf); +} + + +int glite_jpps_run_feed( + glite_jp_context_t ctx, + const char *destination, + const glite_jp_attr_t *attrs, + const glite_jp_query_rec_t *qry, + char **feed_id) +{ + fprintf(stderr,"%s: \n",__FUNCTION__); + return 0; +} + +static int register_feed_deferred(glite_jp_context_t ctx,void *feed) +{ + struct jpfeed *f = feed; + + f->next = ctx->feeds; + ctx->feeds = f; + return 0; +} + +/* FIXME: + * - volatile implementation: should store the registrations in a file + * and recover after restart + * - should communicate the data among all server slaves + */ +int glite_jpps_register_feed( + glite_jp_context_t ctx, + const char *destination, + const glite_jp_attr_t *attrs, + const glite_jp_query_rec_t *qry, + char **feed_id, + time_t *expires) +{ + int i; + struct jpfeed *f = calloc(1,sizeof *f); + + if (!*feed_id) *feed_id = generate_feedid(); + time(expires); *expires += FEED_TTL; + + f->id = strdup(*feed_id); + f->destination = strdup(destination); + f->expires = *expires; + for (i=0; attrs[i].type; i++) { + f->attrs = realloc(f->attrs,(i+2) * sizeof *f->attrs); + glite_jp_attr_copy(f->attrs+i,attrs+i); + memset(f->attrs+i+1,0,sizeof *f->attrs); + } + for (i=0; qry[i].attr.type; i++) { + f->qry = realloc(f->qry,(i+2) * sizeof *f->qry); + glite_jp_queryrec_copy(f->qry+i,qry+i); + memset(f->qry+i+1,0,sizeof *f->qry); + } + + glite_jp_add_deferred(ctx,register_feed_deferred,f); + + return 0; +} + diff --git a/org.glite.jp.client/src/feed.h b/org.glite.jp.client/src/feed.h new file mode 100644 index 0000000..c3c2461 --- /dev/null +++ b/org.glite.jp.client/src/feed.h @@ -0,0 +1,21 @@ +#ifndef __GLITE_JP_FEED +#define __GLITE_JP_FEED + + +struct jpfeed { + char *id,*destination; + time_t expires; + glite_jp_attr_t *attrs; + glite_jp_query_rec_t *qry; + struct jpfeed *next; +}; + + +int glite_jpps_match_attr(glite_jp_context_t,const char *,const glite_jp_attrval_t[]); +int glite_jpps_match_file(glite_jp_context_t,const char *,const char *,const char *); +int glite_jpps_match_tag(glite_jp_context_t,const char *,const glite_jp_tagval_t *); +int glite_jpps_run_feed(glite_jp_context_t,const char *,const glite_jp_attr_t *,const glite_jp_query_rec_t *,char **); +int glite_jpps_register_feed(glite_jp_context_t,const char *,const glite_jp_attr_t *,const glite_jp_query_rec_t *,char **,time_t *); + +#endif + diff --git a/org.glite.jp.client/src/file_plugin.c b/org.glite.jp.client/src/file_plugin.c new file mode 100644 index 0000000..144a231 --- /dev/null +++ b/org.glite.jp.client/src/file_plugin.c @@ -0,0 +1,115 @@ +#include +#include +#include +#include +#include + +#include +#include "file_plugin.h" + +static struct option opts[] = { + { "plugin", 1, NULL, 'p' }, + { NULL } +}; + +static int loadit(glite_jp_context_t ctx,const char *so) +{ +/* XXX: not stored but we never dlclose() yet */ + void *dl_handle = dlopen(so,RTLD_NOW); + + glite_jp_error_t err; + const char *e; + glite_jpps_fplug_data_t *data,*dp; + int i; + + glite_jpps_fplug_init_t init; + memset(&err,0,sizeof err); + + if (!dl_handle) { + err.source = "dlopen()"; + err.code = EINVAL; + err.desc = dlerror(); + return glite_jp_stack_error(ctx,&err); + } + + dlerror(); + init = dlsym(dl_handle,"init"); + e = dlerror(); + if (e) { + char buf[300]; + snprintf(buf,sizeof buf,"dlsym(\"%s\",\"init\")",so); + buf[299] = 0; + err.source = buf; + err.code = ENOENT; + err.desc = e; + return glite_jp_stack_error(ctx,&err); + } + + data = calloc(1,sizeof *data); + + if (init(ctx,data)) return -1; + + i = 0; + if (ctx->plugins) for (i=0; ctx->plugins[i]; i++); + ctx->plugins = realloc(ctx->plugins, (i+2) * sizeof *ctx->plugins); + ctx->plugins[i] = data; + ctx->plugins[i+1] = NULL; + + /* TODO: check consistency of uri+class pairs wrt. previous plugins */ + + return 0; +} + +int glite_jpps_fplug_load(glite_jp_context_t ctx,int argc,char **argv) +{ + int i; + + for (i=1; iplugins) { + return glite_jp_stack_error(ctx,&err); + } + + for (i = 0; ctx->plugins[i]; i++) { + int j; + glite_jpps_fplug_data_t *p = ctx->plugins[i]; + + for (j=0; p->uris && p->uris[j]; j++) + if (!strcmp(p->uris[j],uri)) { + out = realloc(out, (matches+2) * sizeof *out); + out[matches++] = p; + out[matches] = NULL; + } + } + + if (matches) { + *plugin_data = out; + return 0; + } + else return glite_jp_stack_error(ctx,&err); +} + diff --git a/org.glite.jp.client/src/ftp_backend.c b/org.glite.jp.client/src/ftp_backend.c new file mode 100644 index 0000000..8bf523b --- /dev/null +++ b/org.glite.jp.client/src/ftp_backend.c @@ -0,0 +1,1744 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "glite/jp/types.h" +#include "glite/jp/context.h" +#include "glite/jp/strmd5.h" + +#include "tags.h" +#include "backend.h" + +#define UPLOAD_SUFFIX ".upload" +#define LOCK_SUFFIX ".lock" + +struct ftpbe_config { + char *internal_path; + char *external_path; + char *gridmap; + char *logname; +}; + +static struct ftpbe_config *config = NULL; + +struct fhandle_rec { + int fd; + int fd_append; +}; +typedef struct fhandle_rec *fhandle; + +static struct option ftpbe_opts[] = { + { "ftp-internal-path", 1, NULL, 'I' }, + { "ftp-external-path", 1, NULL, 'E' }, + { "ftp-gridmap", 1, NULL, 'G' }, + { NULL, 0, NULL, 0 } +}; + +/* obsolete */ +#if 0 +static struct { + glite_jp_fileclass_t type; + char * fname; + } class_to_fname_tab[] = { + { GLITE_JP_FILECLASS_INPUT, "input" }, + { GLITE_JP_FILECLASS_OUTPUT, "output" }, + { GLITE_JP_FILECLASS_LBLOG, "lblog" }, + { GLITE_JP_FILECLASS_TAGS, "tags" }, + { GLITE_JP_FILECLASS_UNDEF, NULL } + }; + +static char *class_to_fname(glite_jp_fileclass_t type) +{ + int i; + + for (i = 0; class_to_fname_tab[i].type != GLITE_JP_FILECLASS_UNDEF; i++) + if (type == class_to_fname_tab[i].type) + return class_to_fname_tab[i].fname; + + return NULL; +} + +static glite_jp_fileclass_t fname_to_class(char* fname) +{ + int i; + + for (i = 0; class_to_fname_tab[i].type != GLITE_JP_FILECLASS_UNDEF; i++) + if (!strcmp(fname, class_to_fname_tab[i].fname)) + return class_to_fname_tab[i].type; + + return GLITE_JP_FILECLASS_UNDEF; +} +#endif + +static int config_check( + glite_jp_context_t ctx, + struct ftpbe_config *config) +{ + return config == NULL || + config->internal_path == NULL || + config->external_path == NULL || + config->gridmap == NULL || + config->logname == NULL; + + /* XXX check reality */ +} + +static int jobid_unique_pathname(glite_jp_context_t ctx, const char *job, + char **unique, char **ju_path, int get_path) +{ + char *p; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + p = strrchr(job, '/'); + if (!p) { + err.code = EINVAL; + err.desc = "Malformed jobid"; + return glite_jp_stack_error(ctx,&err); + } + /* XXX thorough checks */ + if (!(*unique = strdup(p+1))) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + if (get_path) { + if (!(*ju_path = strdup(p+1))) { + free(*unique); + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + *(*ju_path + 10) = '\0'; + } + return 0; +} + +static int mkdirpath(const char* path, int prefixlen) +{ + char *wpath, *p; + int goout, ret; + + wpath = strdup(path); + if (!wpath) { + errno = ENOMEM; + return -1; + } + + p = wpath + prefixlen; + goout = 0; + while (!goout) { + while (*p == '/') p++; + while (*p != '/' && *p != '\0') p++; + goout = (*p == '\0'); + *p = '\0'; + ret = mkdir(wpath, S_IRUSR | S_IWUSR | S_IXUSR); + if (ret < 0 && errno != EEXIST) break; + *p = '/'; + } + free(wpath); + return goout ? 0 : ret; +} + +static long regtime_trunc(long tv_sec) +{ + return tv_sec / (86400*7); +} + +static long regtime_ceil(long tv_sec) +{ + return (tv_sec % (86400*7)) ? tv_sec/(86400*7)+1 : tv_sec/(86400*7) ; +} + +/********************************************************************************/ +int glite_jppsbe_init( + glite_jp_context_t ctx, + int argc, + char *argv[] +) +{ + glite_jp_error_t err; + int opt; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + config = (struct ftpbe_config *) calloc(1, sizeof *config); + if (!config) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + + config->logname = getlogin(); + + while ((opt = getopt_long(argc, argv, "I:E:G:", ftpbe_opts, NULL)) != EOF) { + switch (opt) { + case 'I': config->internal_path = optarg; break; + case 'E': config->external_path = optarg; break; + case 'G': config->gridmap = optarg; break; + default: break; + } + } + + if (config_check(ctx, config)) { + err.code = EINVAL; + err.desc = "Invalid FTP backend configuration"; + return glite_jp_stack_error(ctx,&err); + } + + return 0; +} + +int glite_jppsbe_init_slave( + glite_jp_context_t ctx +) +{ + /* Nothing to do */ +} + +int glite_jppsbe_register_job( + glite_jp_context_t ctx, + const char *job, + const char *owner +) +{ + glite_jp_error_t err; + char *int_dir = NULL; + char *int_fname = NULL; + char *data_dir = NULL; + char *data_fname = NULL; + char *ju = NULL; + char *ju_path = NULL; + char *ownerhash = NULL; + FILE *regfile = NULL; + struct timeval reg_tv; + long reg_tv_trunc; + struct stat statbuf; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job != NULL); + assert(owner != NULL); + + gettimeofday(®_tv, NULL); + reg_tv_trunc = regtime_trunc(reg_tv.tv_sec); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&int_dir, "%s/regs/%s", + config->internal_path, ju_path) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + + if (mkdirpath(int_dir, strlen(config->internal_path)) < 0 && + errno != EEXIST) { + free(int_dir); + err.code = errno; + err.desc = "Cannot mkdir jobs's reg directory"; + return glite_jp_stack_error(ctx,&err); + } + free(int_dir); + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + + if (stat(int_fname, &statbuf) < 0) { + if (errno != ENOENT) { + err.code = errno; + err.desc = "Cannot stat jobs's reg info file"; + goto error_out; + } + } else { + err.code = EEXIST; + err.desc = "Job already registered"; + goto error_out; + } + + regfile = fopen(int_fname, "w"); + if (regfile == NULL) { + err.code = errno; + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + + ownerhash = str2md5(owner); /* static buffer */ + + if (fprintf(regfile, "%d %ld.%06ld %s %s %d %s\n", 1, + (long)reg_tv.tv_sec, (long)reg_tv.tv_usec, job, + ownerhash, strlen(owner), owner) < 1 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot write jobs's reg info file"; + goto error_out; + } + if (fclose(regfile) != 0 ) { + err.code = errno; + err.desc = "Cannot close(write) jobs's reg info file"; + goto error_out; + } + + if (asprintf(&data_dir, "%s/data/%s/%d/%s", + config->internal_path, ownerhash, regtime_trunc(reg_tv.tv_sec), ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + if (asprintf(&data_fname, "%s/_info", data_dir) == -1) { + err.code = ENOMEM; + goto error_out; + } + if (mkdirpath(data_dir, strlen(config->internal_path)) < 0 && + errno != EEXIST) { + err.code = errno; + err.desc = "Cannot mkdir jobs's data directory"; + goto error_out; + } + + if (link(int_fname, data_fname) < 0) { + err.code = errno; + err.desc = "Cannot link job's reg and data info files"; + goto error_out; + } + +error_out: + free(int_fname); + free(data_fname); + if (err.code && data_dir) rmdir(data_dir); + free(data_dir); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +static int add_to_gridmap(glite_jp_context_t ctx, const char *dn) +{ + FILE *gridmap = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + gridmap = fopen(config->gridmap, "a"); + if (!gridmap) { + err.code = errno; + err.desc = "Cannot open gridmap file"; + return glite_jp_stack_error(ctx,&err); + } + if (fprintf(gridmap, "\"%s\" %s\n", dn, config->logname) < 6 || + ferror(gridmap)) { + err.code = EIO; + err.desc = "Cannot write to gridmap file"; + fclose(gridmap); + return glite_jp_stack_error(ctx,&err); + } + fclose(gridmap); + return 0; +} + +static int remove_from_gridmap(glite_jp_context_t ctx, const char *dn) +{ + FILE *gridmap = NULL; + char *temp_name = NULL; + FILE *temp_file = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + /* XXX */ + return 0; +} + +int glite_jppsbe_start_upload( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name, /* TODO */ + const char *content_type, + char **destination_out, + time_t *commit_before_inout +) +{ + char *int_fname = NULL; + char *lock_fname = NULL; + FILE *lockfile = NULL; + FILE *regfile = NULL; + char *data_dir = NULL; + char *data_lock = NULL; + char *ju = NULL; + char *ju_path = NULL; + char *peername = NULL; + int info_version; + long reg_time; + char ownerhash[33]; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job!=NULL); + assert(destination_out!=NULL); + + assert(class!=NULL); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + peername = glite_jp_peer_name(ctx); + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + if (errno == ENOENT) + err.desc = "Job not registered"; + else + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%*ld %*s %s ", &info_version, + ®_time, ownerhash) < 3 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + fclose(regfile); + + /* XXX authorization */ + + if (asprintf(&data_dir, "%s/data/%s/%d/%s", + config->internal_path, ownerhash, regtime_trunc(reg_time), ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + + if (asprintf(&lock_fname, "%s/%s" LOCK_SUFFIX, + data_dir, class) == -1) { + err.code = ENOMEM; + goto error_out; + } + + if (commit_before_inout != NULL) + *commit_before_inout = (time_t) LONG_MAX; /* XXX no timeout enforced */ + + lockfile = fopen(lock_fname, "w"); + if (lockfile == NULL) { + err.code = errno; + err.desc = "Cannot open uploads's lock file"; + goto error_out; + } + + if (fprintf(lockfile, "%ld %d %s\n", (long)*commit_before_inout, + peername ? peername : 0, + peername ? peername : "") < 1 || ferror(regfile)) { + fclose(lockfile); + err.code = errno; + err.desc = "Cannot write upload's lock file"; + goto error_out; + } + if (fclose(lockfile) != 0 ) { + err.code = errno; + err.desc = "Cannot close(write) upload's lock file"; + goto error_out; + } + + if (asprintf(destination_out, "%s/data/%s/%d/%s/%s" UPLOAD_SUFFIX, + config->external_path, ownerhash, regtime_trunc(reg_time), ju, class) == -1) { + err.code = ENOMEM; + goto error_out; + } + + if (add_to_gridmap(ctx, peername)) { + err.code = EIO; + err.desc = "Cannot add peer DN to ftp server authorization file"; + goto error_out; + } + +error_out: + free(int_fname); + free(data_dir); + if (err.code && data_lock) unlink(data_lock); + free(data_lock); + free(ju); free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_commit_upload( + glite_jp_context_t ctx, + const char *destination +) +{ + size_t dest_len; + size_t suff_len; + size_t extp_len; + long commit_before; + int lockpeerlen; + char *lockpeername = NULL; + char *peername = NULL; + char *dest_rw = NULL; + char *dest_rw_suff = NULL; + char *dest_rw_lock = NULL; + FILE *lockfile = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(destination != NULL); + + suff_len = strlen(UPLOAD_SUFFIX); + dest_len = strlen(destination); + extp_len = strlen(config->external_path); + + if (dest_len < suff_len || + strcmp(UPLOAD_SUFFIX, destination + (dest_len - suff_len)) || + strncmp(destination, config->external_path, extp_len)) { + err.code = EINVAL; + err.desc = "Forged destination path"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&dest_rw_suff, "%s%s", config->internal_path, + destination + extp_len) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + dest_rw = strdup(dest_rw_suff); + if (!dest_rw) { + err.code = ENOMEM; + goto error_out; + } + *(dest_rw + (strlen(dest_rw_suff) - suff_len)) = '\0'; + + if (asprintf(&dest_rw_lock, "%s" LOCK_SUFFIX, dest_rw) == -1) { + err.code = ENOMEM; + goto error_out; + } + + lockfile = fopen(dest_rw_lock, "r"); + if (lockfile == NULL) { + err.code = errno; + err.desc = "Cannot open upload's lock file"; + goto error_out; + } + if (fscanf(lockfile, "%ld %d ", &commit_before, &lockpeerlen) < 2 || ferror(lockfile)) { + fclose(lockfile); + err.code = errno; + err.desc = "Cannot read upload's lock file"; + goto error_out; + } + if (lockpeerlen) { + lockpeername = (char*) calloc(1, lockpeerlen+1); + if (!lockpeername) { + err.code = ENOMEM; + goto error_out; + } + if (fgets(lockpeername, lockpeerlen+1, lockfile) == NULL) { + fclose(lockfile); + err.code = errno; + err.desc = "Cannot read upload's lock file"; + goto error_out; + } + } + fclose(lockfile); + + peername = glite_jp_peer_name(ctx); + if (lockpeername && (!peername || strcmp(lockpeername, peername))) { + err.code = EPERM; + err.desc = "Upload started by client of different identity"; + goto error_out; + } + + if (rename(dest_rw_suff, dest_rw) < 0) { + err.code = errno; + err.desc = "Cannot move upload file to the final place"; + goto error_out; + } + + if (unlink(dest_rw_lock) < 0) { + err.code = errno; + err.desc = "Cannot unlink upload's lock file"; + goto error_out; + } + +error_out: + free(dest_rw); + free(dest_rw_suff); + free(dest_rw_lock); + free(peername); + free(lockpeername); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_destination_info( + glite_jp_context_t ctx, + const char *destination, + char **job, + char **class, + char **name +) +{ + size_t dest_len; + size_t suff_len; + size_t extp_len; + char *dest_rw = NULL; + char *dest_rw_suff = NULL; + char *dest_rw_info = NULL; + FILE *infofile = NULL; + char *classname = NULL; + char jobstr[256+1]; + glite_jp_error_t err; + + assert(destination != NULL); + assert(job != NULL); + assert(class != NULL); + assert(name != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + suff_len = strlen(UPLOAD_SUFFIX); + dest_len = strlen(destination); + extp_len = strlen(config->external_path); + + if (dest_len < suff_len || + strcmp(UPLOAD_SUFFIX, destination + (dest_len - suff_len)) || + strncmp(destination, config->external_path, extp_len)) { + err.code = EINVAL; + err.desc = "Forged destination path"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&dest_rw_suff, "%s%s", config->internal_path, + destination + extp_len) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + dest_rw = strdup(dest_rw_suff); + if (!dest_rw) { + err.code = ENOMEM; + goto error_out; + } + *(dest_rw + (strlen(dest_rw_suff) - suff_len)) = '\0'; + + classname = strrchr(dest_rw,'/'); + if (classname == NULL) { + err.code = EINVAL; + err.desc = "Forged destination path"; + goto error_out; + } + *classname++ ='\0'; + *class = strdup(classname); + +/* XXX: do we need similar check? + if (!class == GLITE_JP_FILECLASS_UNDEF) { + err.code = EINVAL; + err.desc = "Forged destination path"; + goto error_out; + } +*/ + + /* TODO: */ + *name = NULL; + + if (asprintf(&dest_rw_info, "%s/_info", dest_rw) == -1) { + err.code = ENOMEM; + goto error_out; + } + + infofile = fopen(dest_rw_info, "r"); + if (infofile == NULL) { + err.code = errno; + err.desc = "Cannot open _info file"; + goto error_out; + } + if (fscanf(infofile, "%*d %*ld.%*ld %256s ", jobstr) < 1 || ferror(infofile)) { + fclose(infofile); + err.code = errno; + err.desc = "Cannot read _info file"; + goto error_out; + } + *job = strdup(jobstr); + fclose(infofile); + +error_out: + free(dest_rw); + free(dest_rw_suff); + free(dest_rw_info); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + + +int glite_jppsbe_get_job_url( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name, /* TODO */ + char **url_out +) +{ + FILE *regfile = NULL; + char *int_fname = NULL; + char *ju = NULL; + char *ju_path = NULL; + int info_version; + long reg_time; + char ownerhash[33]; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job!=NULL); + assert(url_out != NULL); + + assert(class!=NULL); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + if (errno == ENOENT) + err.desc = "Job not registered"; + else + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%*ld %*s %s", &info_version, + ®_time, ownerhash) < 3 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + fclose(regfile); + + if (asprintf(url_out, "%s/data/%s/%d/%s/%s", + config->external_path, ownerhash, regtime_trunc(reg_time), ju, class) == -1) { + err.code = ENOMEM; + goto error_out; + } + +error_out: + free(int_fname); + free(ju); free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +static int get_job_fname( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name, /* TODO */ + char **fname_out +) +{ + FILE *regfile = NULL; + char *int_fname = NULL; + char *ju = NULL; + char *ju_path = NULL; + int info_version; + long reg_time; + char ownerhash[33]; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job!=NULL); + assert(fname_out != NULL); + + assert(class!=NULL); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + if (errno == ENOENT) + err.desc = "Job not registered"; + else + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%*ld %*s %s", &info_version, + ®_time, ownerhash) < 3 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + fclose(regfile); + + if (asprintf(fname_out, "%s/data/%s/%d/%s/%s", + config->internal_path, ownerhash, regtime_trunc(reg_time), ju, class) == -1) { + err.code = ENOMEM; + goto error_out; + } + +error_out: + free(int_fname); + free(ju); free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_open_file( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name, /* TODO */ + int mode, + void **handle_out +) +{ + fhandle handle = NULL; + char* fname = NULL; + glite_jp_error_t err; + + assert(handle_out != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (get_job_fname(ctx, job, class, name, &fname)) { + err.code = ctx->error->code; + err.desc = "Cannot construct internal filename"; + return glite_jp_stack_error(ctx,&err); + } + + handle = (fhandle) calloc(1,sizeof(*handle)); + if (handle == NULL) { + err.code = ENOMEM; + goto error_out; + } + + handle->fd = open(fname, mode, S_IRUSR | S_IWUSR); + if (handle->fd < 0) { + err.code = errno; + err.desc = "Cannot open requested file"; + free(handle); + goto error_out; + } + handle->fd_append = open(fname, mode | O_APPEND, S_IRUSR | S_IWUSR); + if (handle->fd_append < 0) { + err.code = errno; + err.desc = "Cannot open requested file for append"; + close(handle->fd); + free(handle); + goto error_out; + } + *handle_out = (void*) handle; + +error_out: + free(fname); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_close_file( + glite_jp_context_t ctx, + void *handle +) +{ + glite_jp_error_t err; + + assert(handle != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (close(((fhandle)handle)->fd_append) < 0) { + err.code = errno; + err.desc = "Error closing file descriptor (fd_append)"; + goto error_out; + } + if (close(((fhandle)handle)->fd) < 0) { + err.code = errno; + err.desc = "Error closing file descriptor"; + goto error_out; + } + +error_out: + free(handle); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_pread( + glite_jp_context_t ctx, + void *handle, + void *buf, + size_t nbytes, + off_t offset, + ssize_t *nbytes_ret +) +{ + ssize_t ret; + glite_jp_error_t err; + + assert(handle != NULL); + assert(buf != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if ((ret = pread(((fhandle)handle)->fd, buf, nbytes, offset)) < 0) { + err.code = errno; + err.desc = "Error in pread()"; + return glite_jp_stack_error(ctx,&err); + } + *nbytes_ret = ret; + + return 0; +} + +int glite_jppsbe_pwrite( + glite_jp_context_t ctx, + void *handle, + void *buf, + size_t nbytes, + off_t offset +) +{ + glite_jp_error_t err; + + assert(handle != NULL); + assert(buf != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (pwrite(((fhandle)handle)->fd, buf, nbytes, offset) < 0) { + err.code = errno; + err.desc = "Error in pwrite()"; + return glite_jp_stack_error(ctx,&err); + } + + return 0; +} + +int glite_jppsbe_append( + glite_jp_context_t ctx, + void *handle, + void *buf, + size_t nbytes +) +{ + glite_jp_error_t err; + + assert(handle != NULL); + assert(buf != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (write(((fhandle)handle)->fd_append, buf, nbytes) < 0) { + err.code = errno; + err.desc = "Error in write()"; + return glite_jp_stack_error(ctx,&err); + } + + return 0; +} + +static int get_job_info( + glite_jp_context_t ctx, + const char *job, + char **owner, + struct timeval *tv_reg +) +{ + char *ju = NULL; + char *ju_path = NULL; + FILE *regfile = NULL; + long reg_time_sec; + long reg_time_usec; + int ownerlen = 0; + int info_version; + char *int_fname = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + if (errno == ENOENT) + err.desc = "Job not registered"; + else + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%ld %*s %*s %d ", &info_version, + ®_time_sec, ®_time_usec, &ownerlen) < 4 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + if (ownerlen) { + *owner = (char *) calloc(1, ownerlen+1); + if (!*owner) { + err.code = ENOMEM; + goto error_out; + } + if (fgets(*owner, ownerlen+1, regfile) == NULL) { + fclose(regfile); + free(*owner); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + } + fclose(regfile); + + tv_reg->tv_sec = reg_time_sec; + tv_reg->tv_usec = reg_time_usec; + +error_out: + free(int_fname); + free(ju); + free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +static int get_job_info_int( + glite_jp_context_t ctx, + const char *int_fname, + char **jobid, + char **owner, + struct timeval *tv_reg +) +{ + FILE *regfile = NULL; + long reg_time_sec; + long reg_time_usec; + int ownerlen = 0; + int info_version; + char jobid_buf[256]; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%ld %s %*s %d ", &info_version, + ®_time_sec, ®_time_usec, jobid_buf, &ownerlen) < 5 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + *jobid = strdup(jobid_buf); + if (ownerlen) { + *owner = (char *) calloc(1, ownerlen+1); + if (!*owner) { + err.code = ENOMEM; + goto error_out; + } + if (fgets(*owner, ownerlen+1, regfile) == NULL) { + fclose(regfile); + free(*owner); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + } + fclose(regfile); + + tv_reg->tv_sec = reg_time_sec; + tv_reg->tv_usec = reg_time_usec; + +error_out: + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_get_job_metadata( + glite_jp_context_t ctx, + const char *job, + glite_jp_attrval_t attrs_inout[] +) +{ + int got_info = 0; + struct timeval tv_reg; + char *owner = NULL; + int got_tags = 0; + void *tags_handle = NULL; + glite_jp_tagval_t* tags = NULL; + int i,j; + glite_jp_error_t err; + + assert(job != NULL); + assert(attrs_inout != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + for (i = 0; attrs_inout[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (attrs_inout[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + +/* must be implemented via filetype plugin + case GLITE_JP_ATTR_TIME: +*/ + if (!got_info) { + if (get_job_info(ctx, job, &owner, &tv_reg)) { + err.code = ctx->error->code; + err.desc = "Cannot retrieve job info"; + goto error_out; + } + got_info = 1; + } + break; + +/* must be implemented via filetype plugin + case GLITE_JP_ATTR_TAG: + if (!got_tags) { + if (glite_jppsbe_open_file(ctx, job, GLITE_JP_FILECLASS_TAGS, + O_RDONLY, &tags_handle)) { + err.code = ctx->error->code; + err.desc = "Cannot open tag file"; + goto error_out; + } + if (glite_jpps_tag_readall(ctx, tags_handle, &tags)) { + err.code = ctx->error->code; + err.desc = "Cannot read tags"; + glite_jppsbe_close_file(ctx, tags_handle); + goto error_out; + } + glite_jppsbe_close_file(ctx, tags_handle); + got_tags = 1; + } + break; +*/ + default: + err.code = EINVAL; + err.desc = "Invalid attribute type"; + goto error_out; + break; + } + + switch (attrs_inout[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + attrs_inout[i].value.s = strdup(owner); + if (!attrs_inout[i].value.s) { + err.code = ENOMEM; + err.desc = "Cannot copy owner string"; + goto error_out; + } + break; + case GLITE_JP_ATTR_TIME: + attrs_inout[i].value.time = tv_reg; + break; + +/* must be implemented via filetype plugin + case GLITE_JP_ATTR_TAG: + for (j = 0; tags[j].name != NULL; j++) { + if (!strcmp(tags[j].name, attrs_inout[i].attr.name)) { + if (glite_jpps_tagval_copy(ctx, &tags[j], + &attrs_inout[i].value.tag)) { + err.code = ENOMEM; + err.desc = "Cannot copy tag value"; + goto error_out; + } + break; + } + } + if (!tags[j].name) attrs_inout[i].value.tag.name = NULL; + break; +*/ + default: + break; + } + } + +error_out: + free(owner); + if (tags) for (j = 0; tags[j].name != NULL; j++) { + free(tags[j].name); + free(tags[j].value); + } + free(tags); + + if (err.code) { + while (i > 0) { + i--; + switch (attrs_inout[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + free(attrs_inout[i].value.s); + break; + case GLITE_JP_ATTR_TAG: + free(attrs_inout[i].value.tag.name); + free(attrs_inout[i].value.tag.value); + default: + break; + } + } + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} +static int compare_timeval(struct timeval a, struct timeval b) +{ + if (a.tv_sec < b.tv_sec) return -1; + if (a.tv_sec > b.tv_sec) return 1; + if (a.tv_usec < b.tv_usec) return -1; + if (a.tv_usec > b.tv_usec) return 1; + return 0; +} + + +/* FIXME: disabled -- clarification wrt. filetype plugin needed */ + +#if 0 + +static int query_phase2( + glite_jp_context_t ctx, + const char *ownerhash, + long regtime_tr, + int q_tags, + int md_tags, + const glite_jp_query_rec_t query[], + glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +); + +static int query_phase2( + glite_jp_context_t ctx, + const char *ownerhash, + long regtime_tr, + int q_tags, + int md_tags, + const glite_jp_query_rec_t query[], + glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +) +{ + char *time_dirname = NULL; + DIR *time_dirp = NULL; + struct dirent *jobent; + char *info_fname = NULL; + char *jobid = NULL; + char *owner = NULL; + struct timeval tv_reg; + void *tags_handle = NULL; + int matching; + int i, j; + glite_jp_tagval_t* tags = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (asprintf(&time_dirname, "%s/data/%s/%d", config->internal_path, + ownerhash, regtime_tr) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + time_dirp = opendir(time_dirname); + if (!time_dirp) { + free(time_dirname); + return 0; /* found nothing */ + } + while ((jobent = readdir(time_dirp)) != NULL) { + if (!strcmp(jobent->d_name, ".")) continue; + if (!strcmp(jobent->d_name, "..")) continue; + if (asprintf(&info_fname, "%s/%s/_info", time_dirname, + jobent->d_name) == -1) { + err.code = ENOMEM; + goto error_out; + } + if (get_job_info_int(ctx, info_fname, &jobid, &owner, &tv_reg)) { + err.code = EIO; + err.desc = "Cannot retrieve job info"; + goto error_out; + } + if (q_tags || md_tags) { + if (glite_jppsbe_open_file(ctx, jobid, GLITE_JP_FILECLASS_TAGS, + O_RDONLY, &tags_handle)) { + err.code = ctx->error->code; + err.desc = "Cannot open tag file"; + goto error_out; + } + if (glite_jpps_tag_readall(ctx, tags_handle, &tags)) { + err.code = ctx->error->code; + err.desc = "Cannot read tags"; + glite_jppsbe_close_file(ctx, tags_handle); + goto error_out; + } + glite_jppsbe_close_file(ctx, tags_handle); + tags_handle = NULL; + } + + matching = 1; + for (i = 0; matching && query[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (query[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + if (query[i].value.s == NULL || + strcmp(query[i].value.s, owner)) matching = 0; + break; + case GLITE_JP_ATTR_TIME: + switch (query[i].op) { + case GLITE_JP_QUERYOP_EQUAL: + matching = !compare_timeval(tv_reg, query[i].value.time); + break; + case GLITE_JP_QUERYOP_UNEQUAL: + matching = compare_timeval(tv_reg, query[i].value.time); + break; + case GLITE_JP_QUERYOP_LESS: + matching = compare_timeval(tv_reg, query[i].value.time) < 0; + break; + case GLITE_JP_QUERYOP_GREATER: + matching = compare_timeval(tv_reg, query[i].value.time) > 0; + break; + case GLITE_JP_QUERYOP_WITHIN: + matching = compare_timeval(tv_reg, query[i].value.time) >= 0 + && compare_timeval(tv_reg, query[i].value2.time) <= 0; + break; + } + break; + case GLITE_JP_ATTR_TAG: + if (!tags) { + matching = 0; + break; + } + for (j = 0; tags[j].name != NULL; j++) { + if (!strcmp(tags[j].name, query[i].attr.name)) { + switch (query[i].op) { + case GLITE_JP_QUERYOP_EQUAL: + matching = !strcmp(tags[j].value, query[i].value.s); + break; + case GLITE_JP_QUERYOP_UNEQUAL: + matching = strcmp(tags[j].value, query[i].value.s); + break; + case GLITE_JP_QUERYOP_LESS: + matching = strcmp(tags[j].value, query[i].value.s) < 0; + break; + case GLITE_JP_QUERYOP_GREATER: + matching = strcmp(tags[j].value, query[i].value.s) > 0; + break; + case GLITE_JP_QUERYOP_WITHIN: + matching = strcmp(tags[j].value, query[i].value.s) >= 0 \ + && strcmp(tags[j].value, query[i].value2.s) <= 0 ; + break; + default: + break; + } + } + } + break; + default: + break; + } + } + if (!matching) { + free(info_fname); info_fname = NULL; + free(jobid); jobid = NULL; + if (tags) for (j = 0; tags[j].name != NULL; j++) { + free(tags[j].name); + free(tags[j].value); + } + free(tags); tags = NULL; + continue; + } + + for (i = 0; metadata[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + metadata[i].value.s = owner; + break; + case GLITE_JP_ATTR_TIME: + metadata[i].value.time = tv_reg; + break; + case GLITE_JP_ATTR_TAG: + for (j = 0; tags[j].name != NULL; j++) { + if (!strcmp(tags[j].name, metadata[i].attr.name)) { + if (glite_jpps_tagval_copy(ctx, &tags[j], + &metadata[i].value.tag)) { + err.code = ENOMEM; + err.desc = "Cannot copy tag value"; + goto error_out; + } + break; + } + } + if (!tags[j].name) { + metadata[i].value.tag.name = NULL; + metadata[i].value.tag.value = NULL; + } + break; + default: + break; + } + } + (*callback)(ctx, jobid, metadata); + free(jobid); jobid = NULL; + while (i > 0) { + i--; + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_TAG: + free(metadata[i].value.tag.name); + free(metadata[i].value.tag.value); + default: + break; + } + } + } + +error_out: + if (tags) for (j = 0; tags[j].name != NULL; j++) { + free(tags[j].name); + free(tags[j].value); + } + if (tags_handle) glite_jppsbe_close_file(ctx, tags_handle); + free(info_fname); + free(owner); + free(jobid); + closedir(time_dirp); + free(time_dirname); + if (err.code) { + while (i > 0) { + i--; + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_TAG: + free(metadata[i].value.tag.name); + free(metadata[i].value.tag.value); + default: + break; + } + } + return glite_jp_stack_error(ctx,&err); + } else + return 0; +} + +int glite_jppsbe_query( + glite_jp_context_t ctx, + const glite_jp_query_rec_t query[], + const glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +) +{ + /* XXX clone metadata */ + int i; + char *q_exact_owner = NULL; + char *ownerhash = NULL; + long q_min_time = 0; + long q_max_time = LONG_MAX; + long q_min_time_tr; + long q_max_time_tr; + int q_with_tags = 0; + int md_info = 0; + int md_tags = 0; + char *owner_dirname = NULL; + DIR *owner_dirp = NULL; + struct dirent *ttimeent; + char *data_dirname = NULL; + DIR *data_dirp = NULL; + struct dirent *ownerent; + long ttime = 0; + glite_jp_attrval_t *metadata_templ = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + for (i = 0; query[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + if (query[i].attr.type == GLITE_JP_ATTR_OWNER && query[i].op == GLITE_JP_QUERYOP_EQUAL) { + q_exact_owner = query[i].value.s; + } + if (query[i].attr.type == GLITE_JP_ATTR_TIME) { + switch (query[i].op) { + case GLITE_JP_QUERYOP_EQUAL: + q_min_time = query[i].value.time.tv_sec; + q_max_time = query[i].value.time.tv_sec + 1; + break; + case GLITE_JP_QUERYOP_LESS: + if (q_max_time > query[i].value.time.tv_sec + 1) + q_max_time = query[i].value.time.tv_sec + 1; + break; + case GLITE_JP_QUERYOP_WITHIN: + if (q_max_time > query[i].value2.time.tv_sec + 1) + q_max_time = query[i].value2.time.tv_sec + 1; + /* fallthrough */ + case GLITE_JP_QUERYOP_GREATER: + if (q_min_time < query[i].value.time.tv_sec) + q_min_time = query[i].value.time.tv_sec; + break; + default: + err.code = EINVAL; + err.desc = "Invalid query op"; + return glite_jp_stack_error(ctx,&err); + break; + } + } + if (query[i].attr.type == GLITE_JP_ATTR_TAG) + q_with_tags = 1; + + } + + for (i = 0; metadata[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + case GLITE_JP_ATTR_TIME: + md_info = 1; + break; + case GLITE_JP_ATTR_TAG: + md_tags = 1; + break; + default: + err.code = EINVAL; + err.desc = "Invalid attribute type in metadata parameter"; + return glite_jp_stack_error(ctx,&err); + break; + } + } + metadata_templ = (glite_jp_attrval_t *) calloc(i + 1, sizeof(glite_jp_attrval_t)); + if (!metadata_templ) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + memcpy(metadata_templ, metadata, (i + 1) * sizeof(glite_jp_attrval_t)); + + q_min_time_tr = regtime_trunc(q_min_time); + q_max_time_tr = regtime_ceil(q_max_time); + + if (q_exact_owner) { + ownerhash = str2md5(q_exact_owner); /* static buffer */ + if (asprintf(&owner_dirname, "%s/data/%s", config->internal_path, ownerhash) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + owner_dirp = opendir(owner_dirname); + free(owner_dirname); + if (!owner_dirp) { + free(metadata_templ); + return 0; /* found nothing */ + } + while ((ttimeent = readdir(owner_dirp)) != NULL) { + if (!strcmp(ttimeent->d_name, ".")) continue; + if (!strcmp(ttimeent->d_name, "..")) continue; + ttime = atol(ttimeent->d_name); + if (ttime >= q_min_time_tr && ttime < q_max_time_tr) { + if (query_phase2(ctx, ownerhash, ttime, q_with_tags, md_tags, + query, metadata_templ, callback)) { + err.code = EIO; + err.desc = "query_phase2() error"; + goto error_out; + } + } + } + } else { /* !q_exact_owner */ + if (asprintf(&data_dirname, "%s/data", config->internal_path) == -1) { + err.code = ENOMEM; + goto error_out; + } + data_dirp = opendir(data_dirname); + if (!data_dirp) { + err.code = EIO; + err.desc = "Cannot open data directory"; + goto error_out; + } + while ((ownerent = readdir(data_dirp)) != NULL) { + if (!strcmp(ownerent->d_name, ".")) continue; + if (!strcmp(ownerent->d_name, "..")) continue; + if (asprintf(&owner_dirname, "%s/data/%s", config->internal_path, + ownerent->d_name) == -1) { + err.code = ENOMEM; + goto error_out; + } + owner_dirp = opendir(owner_dirname); + free(owner_dirname); + if (!owner_dirp) { + err.code = EIO; + err.desc = "Cannot open owner data directory"; + goto error_out; + } + while ((ttimeent = readdir(owner_dirp)) != NULL) { + if (!strcmp(ttimeent->d_name, ".")) continue; + if (!strcmp(ttimeent->d_name, "..")) continue; + ttime = atol(ttimeent->d_name); + if (ttime >= q_min_time_tr && ttime < q_max_time_tr) { + if (query_phase2(ctx, ownerent->d_name, ttime, q_with_tags, md_tags, + query, metadata_templ, callback)) { + err.code = EIO; + err.desc = "query_phase2() error"; + goto error_out; + } + } + } + closedir(owner_dirp); owner_dirp = NULL; + } + closedir(data_dirp); data_dirp = NULL; + } + return 0; + +error_out: + if (owner_dirp) closedir(owner_dirp); + if (data_dirp) closedir(data_dirp); + free(data_dirname); + free(metadata_templ); + return glite_jp_stack_error(ctx,&err); +} + +#else + +/* placeholder instead */ +int glite_jppsbe_query( + glite_jp_context_t ctx, + const glite_jp_query_rec_t query[], + const glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +) +{ + glite_jp_error_t err; + err.code = ENOSYS; + err.desc = "not implemented"; + return glite_jp_stack_error(ctx,&err); +} + +#endif + +/* XXX: +- no primary authorization yet +- no concurrency control yet +- partial success in pwrite,append +- "unique" part of jobid is assumed to be unique across bookkeeping servers +- repository versioning not fully implemented yet +*/ diff --git a/org.glite.jp.client/src/is_client.c b/org.glite.jp.client/src/is_client.c new file mode 100644 index 0000000..8a747ef --- /dev/null +++ b/org.glite.jp.client/src/is_client.c @@ -0,0 +1,38 @@ +#include +#include +#include +#include +#include +#include + +#include "glite/jp/types.h" + +#include "feed.h" +/* FIXME +#include "jpis_H.h" +#include "jpis_.nsmap" +*/ + +int glite_jpps_single_feed( + glite_jp_context_t ctx, + const char *destination, + const char *job, + const glite_jp_attrval_t attrs[] +) +{ + /* TODO: really call JP Index server (via interlogger) */ + printf("feed to %s, job %s\n",destination,job); + +/* FIXME */ +#if 0 + if (soap_call_jpsrv__UpdateJobs(ctx->other_soap,destination,"", + /* FIXME: feedId */ "", + /* FIXME: UpdateJobsData */ NULL, + 0, + NULL + )) fprintf(stderr,"UpdateJobs: %s %s\n",ctx->other_soap->fault->faultcode, + ctx->other_soap->fault->faultstring); + +#endif + return 0; +} diff --git a/org.glite.jp.client/src/jpimporter.c b/org.glite.jp.client/src/jpimporter.c new file mode 100644 index 0000000..b54aac8 --- /dev/null +++ b/org.glite.jp.client/src/jpimporter.c @@ -0,0 +1,243 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "glite/lb/lb_maildir.h" +#include "glite/security/glite_gsplugin.h" + +#include "jpps_H.h" +#include "jpps_.nsmap" + +#include "jptype_map.h" + +#include "soap_version.h" +#if GSOAP_VERSION <= 20602 +#define soap_call___jpsrv__RegisterJob soap_call___ns1__RegisterJob +#endif + + +#ifndef dprintf +#define dprintf(x) { if (debug) printf x; } +#endif + +#ifndef GLITE_JPIMPORTER_PIDFILE +#define GLITE_JPIMPORTER_PIDFILE "/var/run/glite-jpimporter.pid" +#endif + +#ifndef GLITE_JPIMPORTER_MDIR +#define GLITE_JPIMPORTER_MDIR "/tmp/jpreg" +#endif + +static int debug = 0; +static int die = 0; + +static struct option opts[] = { + { "help", 0, NULL, 'h'}, + { "debug", 0, NULL, 'd'}, + { "jpps", 1, NULL, 'p'}, + { "mdir", 1, NULL, 'm'}, + { "pidfile", 1, NULL, 'i'}, + { NULL, 0, NULL, 0} +}; + +static const char *get_opt_string = "hdp:m:i:"; + +static void usage(char *me) +{ + fprintf(stderr,"usage: %s [option]\n" + "\t-h, --help\t displays this screen\n" + "\t-d, --debug\t don't run as daemon, additional diagnostics\n" + "\t-p, --jpps\t JP primary service server\n" + "\t-m, --mdir\t path to the 'LB maildir' subtree\n" + "\t-i, --pidfile\t file to store master pid\n", + me); +} + +static void catchsig(int sig) +{ + die = sig; +} + +int main(int argc, char *argv[]) +{ + struct sigaction sa; + struct soap *soap; + sigset_t sset; + FILE *fpid; + int opt; + char *name, + *jpps = "http://localhost:8901", + pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE, + mdir[PATH_MAX] = GLITE_JPIMPORTER_MDIR; + + + name = strrchr(argv[0],'/'); + if (name) name++; else name = argv[0]; + + if ( geteuid() ) + snprintf(pidfile, sizeof pidfile, "%s/glite_jpimporter.pid", getenv("HOME")); + + while ( (opt = getopt_long(argc, argv, get_opt_string, opts, NULL)) != EOF ) + switch ( opt ) { + case 'd': debug = 1; break; + case 'h': usage(name); return 0; + case 'p': jpps = optarg; break; + case 'm': strcpy(mdir, optarg); break; + case 'i': strcpy(pidfile, optarg); break; + case '?': usage(name); return 1; + } + if ( optind < argc ) { usage(name); return 1; } + + soap = soap_new(); + soap_init(soap); + soap_set_namespaces(soap, jpps__namespaces); + soap_register_plugin(soap, glite_gsplugin); + + setlinebuf(stdout); + setlinebuf(stderr); + + fpid = fopen(pidfile,"r"); + if ( fpid ) { + int opid = -1; + + if ( fscanf(fpid,"%d",&opid) == 1 ) { + if ( !kill(opid,0) ) { + fprintf(stderr,"%s: another instance running, pid = %d\n",argv[0],opid); + return 1; + } + else if (errno != ESRCH) { perror("kill()"); return 1; } + } + fclose(fpid); + } else if (errno != ENOENT) { perror(pidfile); return 1; } + fpid = fopen(pidfile, "w"); + if ( !fpid ) { perror(pidfile); return 1; } + fprintf(fpid, "%d", getpid()); + fclose(fpid); + + if ( !debug ) { + if ( daemon(1,0) == -1 ) { perror("deamon()"); exit(1); } + + fpid = fopen(pidfile,"w"); + if ( !fpid ) { perror(pidfile); return 1; } + fprintf(fpid, "%d", getpid()); + fclose(fpid); + openlog(name, LOG_PID, LOG_DAEMON); + } else { setpgid(0, getpid()); } + + dprintf(("Master pid %d\n", getpid())); + + memset(&sa, 0, sizeof(sa)); assert(sa.sa_handler == NULL); + sa.sa_handler = catchsig; + sigaction(SIGINT, &sa, NULL); + sigaction(SIGTERM, &sa, NULL); + + sa.sa_handler = SIG_IGN; + sigaction(SIGUSR1, &sa, NULL); + + sigemptyset(&sset); + sigaddset(&sset, SIGTERM); + sigaddset(&sset, SIGINT); + sigprocmask(SIG_BLOCK, &sset, NULL); + + while ( !die ) { + int ret; + char *msg = NULL; + char *fname = NULL; + + ret = edg_wll_MaildirTransStart(mdir, &msg, &fname); + /* XXX: where should unblocking signal besides? */ + sigprocmask(SIG_UNBLOCK, &sset, NULL); + sigprocmask(SIG_BLOCK, &sset, NULL); + if ( ret < 0 ) { + dprintf(("edg_wll_MaildirTransStart: %s (%s)\n", strerror(errno), lbm_errdesc)); + if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); + exit(1); + } else if ( ret == 0 ) { + sleep(2); + } else { + struct _jpelem__RegisterJob in; + struct _jpelem__RegisterJobResponse empty; + struct SOAP_ENV__Detail *detail; + struct jptype__genericFault *f; + char *aux, *reason, indent[200] = " "; + + + dprintf(("JP registration request received\n")); + if ( !debug ) syslog(LOG_INFO, "JP registration request received\n"); + + if ( !(aux = strchr(msg, '\n')) ) { + dprintf(("Wrong format of message!\n")); + if ( !debug ) syslog(LOG_ERR, "Wrong format of message\n"); + free(msg); + continue; + } + *aux++ = '\0'; + in.job = msg; + in.owner = aux; + ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty); + free(msg); + + switch ( ret ) { + case SOAP_OK: + /* XXX: checks return error code */ + edg_wll_MaildirTransEnd(mdir, fname, LBMD_TRANS_OK); + dprintf(("Job '%s' succesfully registered to JP\n", msg)); + if ( !debug ) syslog(LOG_INFO, "Job '%s' succesfully registered to JP\n", msg); + break; + + case SOAP_FAULT: + case SOAP_SVR_FAULT: + edg_wll_MaildirTransEnd(mdir, fname, LBMD_TRANS_FAILED); + if (soap->version == 2) { + detail = soap->fault->SOAP_ENV__Detail; + reason = soap->fault->SOAP_ENV__Reason; + } else { + detail = soap->fault->detail; + reason = soap->fault->faultstring; + } + dprintf(("%s\n", reason)); + assert(detail->__type == SOAP_TYPE__genericFault); +#if GSOAP_VERSION >=20700 + f = ((struct _genericFault *) detail->fault) +#else + f = ((struct _genericFault *) detail->value) +#endif + -> jpelem__genericFault; + + while ( f ) { + dprintf(("%s%s: %s (%s)\n", indent, f->source, f->text, f->description)); + f = f->reason; + strcat(indent, " "); + } + break; + + default: + soap_print_fault(soap, stderr); + edg_wll_MaildirTransEnd(mdir, fname, LBMD_TRANS_FAILED); + break; + } + free(fname); + } + } + + /* XXX: some sort of soap_destroy(soap) */ + dprintf(("Terminating on signal %d\n", die)); + if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d\n", die); + + unlink(pidfile); + + return 0; +} + +/* XXX: we don't use it */ +SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} }; + diff --git a/org.glite.jp.client/src/jptype_map.h b/org.glite.jp.client/src/jptype_map.h new file mode 100644 index 0000000..56d611f --- /dev/null +++ b/org.glite.jp.client/src/jptype_map.h @@ -0,0 +1,18 @@ +#include "soap_version.h" + +#if GSOAP_VERSION >= 20700 +#define INPUT_SANDBOX jptype__UploadClass__INPUT_SANDBOX +#define OUTPUT_SANDBOX jptype__UploadClass__OUTPUT_SANDBOX +#define JOB_LOG jptype__UploadClass__JOB_LOG + +#define OWNER jptype__AttributeType__OWNER +#define TIME jptype__AttributeType__TIME +#define TAG jptype__AttributeType__TAG + +#define EQUAL jptype__queryOp__EQUAL +#define UNEQUAL jptype__queryOp__UNEQUAL +#define LESS jptype__queryOp__LESS +#define GREATER jptype__queryOp__GREATER +#define WITHIN jptype__queryOp__WITHIN +#endif + diff --git a/org.glite.jp.client/src/mysql.c b/org.glite.jp.client/src/mysql.c new file mode 100644 index 0000000..0f080ce --- /dev/null +++ b/org.glite.jp.client/src/mysql.c @@ -0,0 +1,265 @@ +#ident "$Header$" + +#include "mysql.h" // MySql header file +#include "mysqld_error.h" +#include "errmsg.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "glite/jp/types.h" +#include "glite/jp/context.h" + +#include "db.h" + +#define DEFAULTCS "jpps/@localhost:jpps1" +#define GLITE_JP_LB_MYSQL_VERSION 40018 + +static int my_err(glite_jp_context_t ctx, char *function) +{ + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = function; + err.code = EIO; /* XXX */ + err.desc = mysql_error((MYSQL *) ctx->dbhandle); + return glite_jp_stack_error(ctx,&err); +} + +struct _glite_jp_db_stmt_t { + MYSQL_RES *result; + glite_jp_context_t ctx; +}; + +int glite_jp_db_connect(glite_jp_context_t ctx,char *cs) +{ + char *buf = NULL; + char *host,*user,*pw,*db; + char *slash,*at,*colon; + + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (!cs) cs = DEFAULTCS; + + if (!(ctx->dbhandle = (void *) mysql_init(NULL))) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + + mysql_options(ctx->dbhandle, MYSQL_READ_DEFAULT_FILE, "my"); + + host = user = pw = db = NULL; + + buf = strdup(cs); + slash = strchr(buf,'/'); + at = strrchr(buf,'@'); + colon = strrchr(buf,':'); + + if (!slash || !at || !colon) { + free(buf); + err.code = EINVAL; + err.desc = "Invalid DB connect string"; + return glite_jp_stack_error(ctx,&err); + } + + *slash = *at = *colon = 0; + host = at+1; + user = buf; + pw = slash+1; + db = colon+1; + + if (!mysql_real_connect((MYSQL *) ctx->dbhandle,host,user,pw,db,0,NULL,CLIENT_FOUND_ROWS)) { + free(buf); + return my_err(ctx, __FUNCTION__); + } + + free(buf); + return 0; +} + +void glite_jp_db_close(glite_jp_context_t ctx) +{ + mysql_close((MYSQL *) ctx->dbhandle); + ctx->dbhandle = NULL; +} + +int glite_jp_db_execstmt(glite_jp_context_t ctx,char *txt,glite_jp_db_stmt_t *stmt) +{ + int merr; + int retry_nr = 0; + int do_reconnect = 0; + + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (stmt) { + *stmt = NULL; + } + + while (retry_nr == 0 || do_reconnect) { + do_reconnect = 0; + if (mysql_query((MYSQL *) ctx->dbhandle,txt)) { + /* error occured */ + switch (merr = mysql_errno((MYSQL *) ctx->dbhandle)) { + case 0: + break; + case ER_DUP_ENTRY: + err.code = EEXIST; + err.desc = mysql_error((MYSQL *) ctx->dbhandle); + glite_jp_stack_error(ctx,&err); + return -1; + break; + case CR_SERVER_LOST: + if (retry_nr <= 0) + do_reconnect = 1; + break; + default: + my_err(ctx, __FUNCTION__); + return -1; + break; + } + } + retry_nr++; + } + + if (stmt) { + *stmt = malloc(sizeof(**stmt)); + if (!*stmt) { + err.code = ENOMEM; + glite_jp_stack_error(ctx,&err); + return -1; + } + memset(*stmt,0,sizeof(**stmt)); + (**stmt).ctx = ctx; + (**stmt).result = mysql_store_result((MYSQL *) ctx->dbhandle); + if (!(**stmt).result) { + if (mysql_errno((MYSQL *) ctx->dbhandle)) { + my_err(ctx, __FUNCTION__); + return -1; + } + } + } else { + MYSQL_RES *r = mysql_store_result((MYSQL *) ctx->dbhandle); + mysql_free_result(r); + } + + return mysql_affected_rows((MYSQL *) ctx->dbhandle); +} + +int glite_jp_db_fetchrow(glite_jp_db_stmt_t stmt,char **res) +{ + MYSQL_ROW row; + glite_jp_context_t ctx = stmt->ctx; + int nr,i; + unsigned long *len; + + glite_jp_clear_error(ctx); + + if (!stmt->result) return 0; + + if (!(row = mysql_fetch_row(stmt->result))) { + if (mysql_errno((MYSQL *) ctx->dbhandle)) { + my_err(ctx, __FUNCTION__); + return -1; + } else return 0; + } + + nr = mysql_num_fields(stmt->result); + len = mysql_fetch_lengths(stmt->result); + for (i=0; iresult))) cols[i++] = f->name; + return i == 0; +} + +void glite_jp_db_freestmt(glite_jp_db_stmt_t *stmt) +{ + if (*stmt) { + if ((**stmt).result) mysql_free_result((**stmt).result); + free(*stmt); + *stmt = NULL; + } +} + + +char *glite_jp_db_timetodb(time_t t) +{ + struct tm *tm = gmtime(&t); + char tbuf[256]; + + /* XXX: the very end of our days */ + if (!tm && t == (time_t) LONG_MAX) return strdup("9999-12-31 23:59:59"); + + sprintf(tbuf,"'%4d-%02d-%02d %02d:%02d:%02d'",tm->tm_year+1900,tm->tm_mon+1, + tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec); + + return strdup(tbuf); +} + +time_t glite_jp_db_dbtotime(char *t) +{ + struct tm tm; + + memset(&tm,0,sizeof(tm)); + setenv("TZ","UTC",1); tzset(); + sscanf(t,"%4d-%02d-%02d %02d:%02d:%02d", + &tm.tm_year,&tm.tm_mon,&tm.tm_mday, + &tm.tm_hour,&tm.tm_min,&tm.tm_sec); + tm.tm_year -= 1900; + tm.tm_mon--; + + return mktime(&tm); +} + +int glite_jp_db_dbcheckversion(glite_jp_context_t ctx) +{ + MYSQL *m = (MYSQL *) ctx->dbhandle; + const char *ver_s = mysql_get_server_info(m); + int major,minor,sub,version; + + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (!ver_s || 3 != sscanf(ver_s,"%d.%d.%d",&major,&minor,&sub)) { + err.code = EINVAL; + err.desc = "problem checking MySQL version"; + return glite_jp_stack_error(ctx,&err); + } + + version = 10000*major + 100*minor + sub; + + if (version < GLITE_JP_LB_MYSQL_VERSION) { + char msg[300]; + + snprintf(msg,sizeof msg,"Your MySQL version is %d. At least %d required.",version, GLITE_JP_LB_MYSQL_VERSION); + err.code = EINVAL; + err.desc = msg; + return glite_jp_stack_error(ctx,&err); + } + + return 0; +} diff --git a/org.glite.jp.client/src/new_ftp_backend.c b/org.glite.jp.client/src/new_ftp_backend.c new file mode 100644 index 0000000..930030e --- /dev/null +++ b/org.glite.jp.client/src/new_ftp_backend.c @@ -0,0 +1,1790 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "glite/jp/types.h" +#include "glite/jp/context.h" +#include "glite/jp/strmd5.h" + +#include "tags.h" +#include "backend.h" +#include "db.h" + +#include "jpps_H.h" /* XXX: SOAP_TYPE___jpsrv__GetJob */ + +#define FTPBE_DEFAULT_DB_CS "jpps/@localhost:jpps" + +struct ftpbe_config { + char *internal_path; + char *external_path; + char *db_cs; + char *gridmap; + char *logname; +}; + +static struct ftpbe_config *config = NULL; + +struct fhandle_rec { + int fd; + int fd_append; +}; +typedef struct fhandle_rec *fhandle; + +static struct option ftpbe_opts[] = { + { "ftp-internal-path", 1, NULL, 'I' }, + { "ftp-external-path", 1, NULL, 'E' }, + { "ftp-db-cs", 1, NULL, 'D' }, + { "ftp-gridmap", 1, NULL, 'G' }, + { NULL, 0, NULL, 0 } +}; + +/******************************************************************************* + Internal helpers +*******************************************************************************/ + + +static int config_check( + glite_jp_context_t ctx, + struct ftpbe_config *config) +{ + return config == NULL || + config->internal_path == NULL || + config->external_path == NULL || + config->db_cs == NULL || + config->gridmap == NULL || + config->logname == NULL; + + /* XXX check reality */ +} + +static int jobid_unique_pathname(glite_jp_context_t ctx, const char *job, + char **unique, char **ju_path, int get_path) +{ + char *p; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + p = strrchr(job, '/'); + if (!p) { + err.code = EINVAL; + err.desc = "Malformed jobid"; + return glite_jp_stack_error(ctx,&err); + } + /* XXX thorough checks */ + if (!(*unique = strdup(p+1))) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + if (get_path) { + if (!(*ju_path = strdup(p+1))) { + free(*unique); + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + *(*ju_path + 10) = '\0'; + } + return 0; +} + +static int mkdirpath(const char* path, int prefixlen) +{ + char *wpath, *p; + int goout, ret; + + wpath = strdup(path); + if (!wpath) { + errno = ENOMEM; + return -1; + } + + p = wpath + prefixlen; + goout = 0; + while (!goout) { + while (*p == '/') p++; + while (*p != '/' && *p != '\0') p++; + goout = (*p == '\0'); + *p = '\0'; + ret = mkdir(wpath, S_IRUSR | S_IWUSR | S_IXUSR); + if (ret < 0 && errno != EEXIST) break; + *p = '/'; + } + free(wpath); + return goout ? 0 : ret; +} + +static int store_user(glite_jp_context_t ctx, const char *userid, const char *subj) +{ + glite_jp_error_t err; + char *stmt; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(userid != NULL); + assert(subj != NULL); + + trio_asprintf(&stmt,"insert into users(userid,cert_subj) " + "values ('%|Ss','%|Ss')",userid,subj); + if (!stmt) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + + if (glite_jp_db_execstmt(ctx, stmt, NULL) < 0) { + if (ctx->error->code == EEXIST) + glite_jp_clear_error(ctx); + else { + free(stmt); + err.code = EIO; + err.desc = "DB access failed"; + return glite_jp_stack_error(ctx,&err); + } + } + free(stmt); + + return 0; +} + +static long regtime_trunc(long tv_sec) +{ + return tv_sec / (86400*7); +} + +static long regtime_ceil(long tv_sec) +{ + return (tv_sec % (86400*7)) ? tv_sec/(86400*7)+1 : tv_sec/(86400*7) ; +} + +/******************************************************************************** + Backend calls +********************************************************************************/ +int glite_jppsbe_init( + glite_jp_context_t ctx, + int argc, + char *argv[] +) +{ + glite_jp_error_t err; + int opt; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + config = (struct ftpbe_config *) calloc(1, sizeof *config); + if (!config) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + + config->logname = getlogin(); + + while ((opt = getopt_long(argc, argv, "I:E:G:", ftpbe_opts, NULL)) != EOF) { + switch (opt) { + case 'I': config->internal_path = optarg; break; + case 'E': config->external_path = optarg; break; + case 'D': config->db_cs = optarg; break; + case 'G': config->gridmap = optarg; break; + default: break; + } + } + + /* Defaults */ + if (!config->db_cs) config->db_cs = strdup(FTPBE_DEFAULT_DB_CS); + + if (config_check(ctx, config)) { + err.code = EINVAL; + err.desc = "Invalid FTP backend configuration"; + return glite_jp_stack_error(ctx,&err); + } + + if (glite_jp_db_connect(ctx, config->db_cs)) { + err.code = EIO; + err.desc = "Cannot access backend's database (during init)"; + return glite_jp_stack_error(ctx,&err); + } else { + glite_jp_db_close(ctx); /* slaves open their own connections */ + } + + return 0; +} + +int glite_jppsbe_init_slave( + glite_jp_context_t ctx +) +{ + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (glite_jp_db_connect(ctx, config->db_cs)) { + err.code = EIO; + err.desc = "Cannot access backend's database"; + return glite_jp_stack_error(ctx,&err); + } + + return 0; +} + +int glite_jppsbe_register_job( + glite_jp_context_t ctx, + const char *job, + const char *owner +) +{ + glite_jp_error_t err; + char *data_dir = NULL; + char *ju = NULL; + char *ju_path = NULL; + char *ownerhash = NULL; + struct timeval reg_tv; + char *stmt = NULL; + char *dbtime = NULL; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job != NULL); + assert(owner != NULL); + + gettimeofday(®_tv, NULL); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + ownerhash = str2md5(owner); /* static buffer */ + if (store_user(ctx, ownerhash, owner)) { + err.code = EIO; + err.desc = "Cannot store user entry"; + goto error_out; + } + + dbtime = glite_jp_db_timetodb(reg_tv.tv_sec); + if (!dbtime) { + err.code = ENOMEM; + goto error_out; + } + + trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,owner,reg_time) " + "values ('%|Ss','%|Ss','%|Ss', %s)", + ju, job, ownerhash, dbtime); + if (!stmt) { + err.code = ENOMEM; + goto error_out; + } + + if (glite_jp_db_execstmt(ctx, stmt, NULL) < 0) { + if (ctx->error->code == EEXIST) { + err.code = EEXIST; + err.desc = "Job already registered"; + } + else { + err.code = EIO; + err.desc = "DB access failed"; + } + goto error_out; + } + + if (asprintf(&data_dir, "%s/data/%s/%d/%s", + config->internal_path, ownerhash, regtime_trunc(reg_tv.tv_sec), ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + if (mkdirpath(data_dir, strlen(config->internal_path)) < 0 && + errno != EEXIST) { + err.code = errno; + err.desc = "Cannot mkdir jobs's data directory"; + goto error_out; + } + +error_out: + free(data_dir); + free(stmt); free(dbtime); + free(ju); free(ju_path); + + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +static int add_to_gridmap(glite_jp_context_t ctx, const char *dn) +{ + FILE *gridmap = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + gridmap = fopen(config->gridmap, "a"); + if (!gridmap) { + err.code = errno; + err.desc = "Cannot open gridmap file"; + return glite_jp_stack_error(ctx,&err); + } + if (fprintf(gridmap, "\"%s\" %s\n", dn, config->logname) < 6 || + ferror(gridmap)) { + err.code = EIO; + err.desc = "Cannot write to gridmap file"; + fclose(gridmap); + return glite_jp_stack_error(ctx,&err); + } + fclose(gridmap); + return 0; +} + +static int remove_from_gridmap(glite_jp_context_t ctx, const char *dn) +{ + FILE *gridmap = NULL; + char *temp_name = NULL; + FILE *temp_file = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + /* XXX */ + return 0; +} + +int glite_jppsbe_start_upload( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name, + const char *content_type, + char **destination_out, + time_t *commit_before_inout +) +{ + char *data_basename = NULL; + char *data_fname = NULL; + char *ju = NULL; + char *ju_path = NULL; + char *peername = NULL; + char *peerhash = NULL; + + char *stmt = NULL; + glite_jp_db_stmt_t db_res; + int db_retn; + char *db_row[2] = { NULL, NULL }; + + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job!=NULL); + assert(destination_out!=NULL); + + assert(class!=NULL); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + peername = glite_jp_peer_name(ctx); + if (peername == NULL) { + err.code = EINVAL; + err.desc = "Cannot obtain client certificate info"; + goto error_out; + } + + trio_asprintf(&stmt, "select owner, reg_time from jobs" + " where jobid='%|Ss'", ju); + + if (!stmt) { + err.code = ENOMEM; + goto error_out; + } + + if ((db_retn = glite_jp_db_execstmt(ctx, stmt, &db_res)) <= 0) { + if (db_retn == 0) { + err.code = ENOENT; + err.desc = "No such job registered"; + } else { + err.code = EIO; + err.desc = "DB access failed"; + } + goto error_out; + } + + db_retn = glite_jp_db_fetchrow(db_res, db_row); + if (db_retn != 2) { + glite_jp_db_freestmt(&db_res); + err.code = EIO; + err.desc = "DB access failed"; + goto error_out; + } + + glite_jp_db_freestmt(&db_res); + + /* XXX authorization done in soap_ops.c */ + + /* XXX name length */ + if (asprintf(&data_basename, "%s%s%s", class, + (name != NULL) ? "." : "", + (name != NULL) ? name : "") == -1) { + err.code = ENOMEM; + goto error_out; + } + + if (asprintf(&data_fname, "%s/data/%s/%d/%s/%s", + config->internal_path, db_row[0], + regtime_trunc(glite_jp_db_dbtotime(db_row[1])), + ju, data_basename) == -1) { + err.code = ENOMEM; + goto error_out; + } + if (asprintf(destination_out, "%s/data/%s/%d/%s/%s", + config->external_path, db_row[0], + regtime_trunc(glite_jp_db_dbtotime(db_row[1])), + ju, data_basename) == -1) { + err.code = ENOMEM; + goto error_out; + } + + if (commit_before_inout != NULL) + /* XXX no timeout enforced */ + /* XXX: gsoap does not like so much, one year should be enough + *commit_before_inout = (time_t) LONG_MAX; + */ + *commit_before_inout = time(NULL) + 365*24*60*60; + + /* + if (add_to_gridmap(ctx, peername)) { + err.code = EIO; + err.desc = "Cannot add peer DN to ftp server authorization file"; + goto error_out; + } + */ + + peerhash = str2md5(peername); /* static buffer */ + if (store_user(ctx, peerhash, peername)) { + err.code = EIO; + err.desc = "Cannot store upload user entry"; + goto error_out; + } + + free(stmt); stmt = NULL; + trio_asprintf(&stmt,"insert into files" + "(jobid,filename,int_path,ext_url,state,deadline,ul_userid) " + "values ('%|Ss','%|Ss','%|Ss','%|Ss','%|Ss', '%|Ss', '%|Ss')", + ju, data_basename, data_fname, *destination_out, "uploading", + glite_jp_db_timetodb(*commit_before_inout), peerhash); + if (!stmt) { + err.code = ENOMEM; + goto error_out; + } + + if (glite_jp_db_execstmt(ctx, stmt, NULL) < 0) { + if (ctx->error->code == EEXIST) { + err.code = EEXIST; + err.desc = "File already stored or upload in progress"; + } else { + err.code = EIO; + err.desc = "DB access failed"; + } + goto error_out; + } + +error_out: + free(db_row[0]); free(db_row[1]); + free(stmt); + free(data_basename); + free(data_fname); + free(ju); free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_commit_upload( + glite_jp_context_t ctx, + const char *destination +) +{ + char *peername = NULL; + char *peerhash = NULL; + + char *stmt = NULL; + glite_jp_db_stmt_t db_res; + int db_retn; + char *db_row[7] = { NULL, NULL, NULL, NULL, NULL, NULL, NULL }; + int i; + + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(destination != NULL); + + trio_asprintf(&stmt, "select * from files where " + "ext_url='%|Ss' and state='uploading'", destination); + if (!stmt) { + err.code = ENOMEM; + goto error_out; + } + + if ((db_retn = glite_jp_db_execstmt(ctx, stmt, &db_res)) <= 0) { + if (db_retn == 0) { + err.code = ENOENT; + err.desc = "No such upload in progress"; + } else { + err.code = EIO; + err.desc = "DB access failed"; + } + goto error_out; + } + + db_retn = glite_jp_db_fetchrow(db_res, db_row); + if (db_retn != 7) { + glite_jp_db_freestmt(&db_res); + err.code = EIO; + err.desc = "DB access failed"; + goto error_out; + } + glite_jp_db_freestmt(&db_res); + + peername = glite_jp_peer_name(ctx); + if (peername == NULL) { + err.code = EINVAL; + err.desc = "Cannot obtain client certificate info"; + goto error_out; + } + + peerhash = str2md5(peername); /* static buffer */ + if (strcmp(peerhash, db_row[6])) { + err.code = EPERM; + err.desc = "Upload started by client with different identity"; + goto error_out; + } + + free(stmt); + trio_asprintf(&stmt,"update files set state='committed', deadline=NULL " + "where jobid='%|Ss' and filename='%|Ss'", db_row[0], db_row[1]); + + if (!stmt) { + err.code = ENOMEM; + goto error_out; + } + + if (glite_jp_db_execstmt(ctx, stmt, NULL) < 0) { + err.code = EIO; + err.desc = "DB access failed"; + goto error_out; + } +error_out: + for (i=0; i<7; i++) free(db_row[i]); + free(peername); + free(stmt); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_destination_info( + glite_jp_context_t ctx, + const char *destination, + char **job, + char **class, + char **name +) +{ + char *stmt = NULL; + glite_jp_db_stmt_t db_res; + int db_retn; + char *db_row[2] = { NULL, NULL}; + int i; + char *cp = NULL; + + char *classname = NULL; + glite_jp_error_t err; + + assert(destination != NULL); + assert(job != NULL); + assert(class != NULL); + assert(name != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + + trio_asprintf(&stmt, "select j.dg_jobid,f.filename from jobs j,files f where " + "f.ext_url='%|Ss' and j.jobid=f.jobid", destination); + if (!stmt) { + err.code = ENOMEM; + goto error_out; + } + + if ((db_retn = glite_jp_db_execstmt(ctx, stmt, &db_res)) <= 0) { + if (db_retn == 0) { + err.code = ENOENT; + err.desc = "Invalid destination string"; + } else { + err.code = EIO; + err.desc = "DB access failed"; + } + goto error_out; + } + + db_retn = glite_jp_db_fetchrow(db_res, db_row); + if (db_retn != 2) { + glite_jp_db_freestmt(&db_res); + err.code = EIO; + err.desc = "DB access failed"; + goto error_out; + } + glite_jp_db_freestmt(&db_res); + + *job = strdup(db_row[0]); + + cp = strchr(db_row[1],'.'); + if (!cp) { + *name = NULL; + } else { + *cp++ = '\0'; + *name = strdup(cp); + } + *class = strdup(db_row[1]); + + if (!*job || !*class) { + err.code = ENOMEM; + goto error_out; + } + +error_out: + for (i=0; i<2; i++) free(db_row[i]); + free(stmt); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + + +int glite_jppsbe_get_job_url( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name, + char **url_out +) +{ + char *data_basename = NULL; + char *data_fname = NULL; + char *ju = NULL; + char *ju_path = NULL; + + char *stmt = NULL; + glite_jp_db_stmt_t db_res; + int db_retn; + char *db_row[3] = { NULL, NULL, NULL }; + + long reg_time; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job!=NULL); + assert(url_out != NULL); + + assert(class!=NULL); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/ : ""name"; + return glite_jp_stack_error(ctx,&err); + } + + trio_asprintf(&stmt, "select j.owner,reg_time,u.cert_subj from jobs j, users u " + "where j.jobid='%|Ss' and j.owner = u.userid", ju); + + if (!stmt) { + err.code = ENOMEM; + goto error_out; + } + + if ((db_retn = glite_jp_db_execstmt(ctx, stmt, &db_res)) <= 0) { + if (db_retn == 0) { + err.code = ENOENT; + err.desc = "No such job registered"; + } else { + err.code = EIO; + err.desc = "DB access failed"; + } + goto error_out; + } + + free(stmt); stmt = NULL; + + db_retn = glite_jp_db_fetchrow(db_res, db_row); + if (db_retn != 3) { + glite_jp_db_freestmt(&db_res); + err.code = EIO; + err.desc = "DB access failed"; + goto error_out; + } + + glite_jp_db_freestmt(&db_res); + + if (glite_jpps_authz(ctx,SOAP_TYPE___jpsrv__GetJob,job,db_row[2])) { + err.code = EPERM; + goto error_out; + } + + /* XXX name length */ + if (asprintf(&data_basename, "%s%s%s", class, + (name != NULL) ? "." : "", + (name != NULL) ? name : "") == -1) { + err.code = ENOMEM; + goto error_out; + } + + if (asprintf(url_out, "%s/data/%s/%d/%s/%s", + config->external_path, db_row[0], + regtime_trunc(glite_jp_db_dbtotime(db_row[1])), + ju, data_basename) == -1) { + err.code = ENOMEM; + goto error_out; + } + + trio_asprintf(&stmt,"select 'x' from files where jobid='%|Ss' " + "and ext_url = '%|Ss' " + "and state='committed' ",ju,*url_out); + + if ((db_retn = glite_jp_db_execstmt(ctx,stmt,&db_res)) <= 0) { + if (db_retn == 0) { + err.code = ENOENT; + err.desc = "not uploaded yet"; + } + else { + err.code = EIO; + err.desc = "DB access failed"; + } + /* goto error_out; */ + } + +error_out: + free(db_row[0]); free(db_row[1]); + free(stmt); + free(data_basename); + free(ju); free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +static int get_job_fname( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name, + char **fname_out +) +{ + char *data_basename = NULL; + char *ju = NULL; + char *ju_path = NULL; + + char *stmt = NULL; + glite_jp_db_stmt_t db_res; + int db_retn; + char *db_row[2] = { NULL, NULL }; + + long reg_time; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + assert(job!=NULL); + assert(fname_out != NULL); + + assert(class!=NULL); + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + trio_asprintf(&stmt, "select owner, reg_time from jobs " + "where jobid='%|Ss'", ju); + + if (!stmt) { + err.code = ENOMEM; + goto error_out; + } + + if ((db_retn = glite_jp_db_execstmt(ctx, stmt, &db_res)) <= 0) { + if (db_retn == 0) { + err.code = ENOENT; + err.desc = "No such job registered"; + } else { + err.code = EIO; + err.desc = "DB access failed"; + } + goto error_out; + } + + db_retn = glite_jp_db_fetchrow(db_res, db_row); + if (db_retn != 2) { + glite_jp_db_freestmt(&db_res); + err.code = EIO; + err.desc = "DB access failed"; + goto error_out; + } + + glite_jp_db_freestmt(&db_res); + + /* XXX name length */ + if (asprintf(&data_basename, "%s%s%s", class, + (name != NULL) ? "." : "", (name != NULL) ? name : "") == -1) { + err.code = ENOMEM; + goto error_out; + } + + if (asprintf(fname_out, "%s/data/%s/%d/%s/%s", + config->internal_path, db_row[0], + regtime_trunc(glite_jp_db_dbtotime(db_row[1])), + ju, data_basename) == -1) { + err.code = ENOMEM; + goto error_out; + } + +error_out: + free(db_row[0]); free(db_row[1]); + free(stmt); + free(data_basename); + free(ju); free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + + +int glite_jppsbe_open_file( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name, + int mode, + void **handle_out +) +{ + fhandle handle = NULL; + char* fname = NULL; + glite_jp_error_t err; + + assert(handle_out != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (get_job_fname(ctx, job, class, name, &fname)) { + err.code = ctx->error->code; + err.desc = "Cannot construct internal filename"; + return glite_jp_stack_error(ctx,&err); + } + + handle = (fhandle) calloc(1,sizeof(*handle)); + if (handle == NULL) { + err.code = ENOMEM; + goto error_out; + } + + handle->fd = open(fname, mode, S_IRUSR | S_IWUSR); + if (handle->fd < 0) { + err.code = errno; + err.desc = "Cannot open requested file"; + free(handle); + goto error_out; + } + handle->fd_append = open(fname, mode | O_APPEND, S_IRUSR | S_IWUSR); + if (handle->fd_append < 0) { + err.code = errno; + err.desc = "Cannot open requested file for append"; + close(handle->fd); + free(handle); + goto error_out; + } + *handle_out = (void*) handle; + +error_out: + free(fname); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_close_file( + glite_jp_context_t ctx, + void *handle +) +{ + glite_jp_error_t err; + + assert(handle != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (close(((fhandle)handle)->fd_append) < 0) { + err.code = errno; + err.desc = "Error closing file descriptor (fd_append)"; + goto error_out; + } + if (close(((fhandle)handle)->fd) < 0) { + err.code = errno; + err.desc = "Error closing file descriptor"; + goto error_out; + } + +error_out: + free(handle); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_pread( + glite_jp_context_t ctx, + void *handle, + void *buf, + size_t nbytes, + off_t offset, + ssize_t *nbytes_ret +) +{ + ssize_t ret; + glite_jp_error_t err; + + assert(handle != NULL); + assert(buf != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if ((ret = pread(((fhandle)handle)->fd, buf, nbytes, offset)) < 0) { + err.code = errno; + err.desc = "Error in pread()"; + return glite_jp_stack_error(ctx,&err); + } + *nbytes_ret = ret; + + return 0; +} + +int glite_jppsbe_pwrite( + glite_jp_context_t ctx, + void *handle, + void *buf, + size_t nbytes, + off_t offset +) +{ + glite_jp_error_t err; + + assert(handle != NULL); + assert(buf != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (pwrite(((fhandle)handle)->fd, buf, nbytes, offset) < 0) { + err.code = errno; + err.desc = "Error in pwrite()"; + return glite_jp_stack_error(ctx,&err); + } + + return 0; +} + +int glite_jppsbe_append( + glite_jp_context_t ctx, + void *handle, + void *buf, + size_t nbytes +) +{ + glite_jp_error_t err; + + assert(handle != NULL); + assert(buf != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (write(((fhandle)handle)->fd_append, buf, nbytes) < 0) { + err.code = errno; + err.desc = "Error in write()"; + return glite_jp_stack_error(ctx,&err); + } + + return 0; +} + +static int get_job_info( + glite_jp_context_t ctx, + const char *job, + char **owner, + struct timeval *tv_reg +) +{ + char *ju = NULL; + char *ju_path = NULL; + FILE *regfile = NULL; + long reg_time_sec; + long reg_time_usec; + int ownerlen = 0; + int info_version; + char *int_fname = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (jobid_unique_pathname(ctx, job, &ju, &ju_path, 1) != 0) { + err.code = ctx->error->code; + err.desc = "Cannot obtain jobid unique path/name"; + return glite_jp_stack_error(ctx,&err); + } + + if (asprintf(&int_fname, "%s/regs/%s/%s.info", + config->internal_path, ju_path, ju) == -1) { + err.code = ENOMEM; + goto error_out; + } + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + if (errno == ENOENT) + err.desc = "Job not registered"; + else + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%ld %*s %*s %d ", &info_version, + ®_time_sec, ®_time_usec, &ownerlen) < 4 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + if (ownerlen) { + *owner = (char *) calloc(1, ownerlen+1); + if (!*owner) { + err.code = ENOMEM; + goto error_out; + } + if (fgets(*owner, ownerlen+1, regfile) == NULL) { + fclose(regfile); + free(*owner); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + } + fclose(regfile); + + tv_reg->tv_sec = reg_time_sec; + tv_reg->tv_usec = reg_time_usec; + +error_out: + free(int_fname); + free(ju); + free(ju_path); + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +static int get_job_info_int( + glite_jp_context_t ctx, + const char *int_fname, + char **jobid, + char **owner, + struct timeval *tv_reg +) +{ + FILE *regfile = NULL; + long reg_time_sec; + long reg_time_usec; + int ownerlen = 0; + int info_version; + char jobid_buf[256]; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + regfile = fopen(int_fname, "r"); + if (regfile == NULL) { + err.code = errno; + err.desc = "Cannot open jobs's reg info file"; + goto error_out; + } + if (fscanf(regfile, "%d %ld.%ld %s %*s %d ", &info_version, + ®_time_sec, ®_time_usec, jobid_buf, &ownerlen) < 5 || ferror(regfile)) { + fclose(regfile); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + *jobid = strdup(jobid_buf); + if (ownerlen) { + *owner = (char *) calloc(1, ownerlen+1); + if (!*owner) { + err.code = ENOMEM; + goto error_out; + } + if (fgets(*owner, ownerlen+1, regfile) == NULL) { + fclose(regfile); + free(*owner); + err.code = errno; + err.desc = "Cannot read jobs's reg info file"; + goto error_out; + } + } + fclose(regfile); + + tv_reg->tv_sec = reg_time_sec; + tv_reg->tv_usec = reg_time_usec; + +error_out: + if (err.code) { + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} + +int glite_jppsbe_get_job_metadata( + glite_jp_context_t ctx, + const char *job, + glite_jp_attrval_t attrs_inout[] +) +{ + int got_info = 0; + struct timeval tv_reg; + char *owner = NULL; + int got_tags = 0; + void *tags_handle = NULL; + glite_jp_tagval_t* tags = NULL; + int i,j; + glite_jp_error_t err; + + assert(job != NULL); + assert(attrs_inout != NULL); + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + for (i = 0; attrs_inout[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (attrs_inout[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + +/* must be implemented via filetype plugin + case GLITE_JP_ATTR_TIME: +*/ + if (!got_info) { + if (get_job_info(ctx, job, &owner, &tv_reg)) { + err.code = ctx->error->code; + err.desc = "Cannot retrieve job info"; + goto error_out; + } + got_info = 1; + } + break; + +/* must be implemented via filetype plugin + case GLITE_JP_ATTR_TAG: + if (!got_tags) { + if (glite_jppsbe_open_file(ctx, job, GLITE_JP_FILECLASS_TAGS, + O_RDONLY, &tags_handle)) { + err.code = ctx->error->code; + err.desc = "Cannot open tag file"; + goto error_out; + } + if (glite_jpps_tag_readall(ctx, tags_handle, &tags)) { + err.code = ctx->error->code; + err.desc = "Cannot read tags"; + glite_jppsbe_close_file(ctx, tags_handle); + goto error_out; + } + glite_jppsbe_close_file(ctx, tags_handle); + got_tags = 1; + } + break; +*/ + default: + err.code = EINVAL; + err.desc = "Invalid attribute type"; + goto error_out; + break; + } + + switch (attrs_inout[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + attrs_inout[i].value.s = strdup(owner); + if (!attrs_inout[i].value.s) { + err.code = ENOMEM; + err.desc = "Cannot copy owner string"; + goto error_out; + } + break; + case GLITE_JP_ATTR_TIME: + attrs_inout[i].value.time = tv_reg; + break; + +/* must be implemented via filetype plugin + case GLITE_JP_ATTR_TAG: + for (j = 0; tags[j].name != NULL; j++) { + if (!strcmp(tags[j].name, attrs_inout[i].attr.name)) { + if (glite_jpps_tagval_copy(ctx, &tags[j], + &attrs_inout[i].value.tag)) { + err.code = ENOMEM; + err.desc = "Cannot copy tag value"; + goto error_out; + } + break; + } + } + if (!tags[j].name) attrs_inout[i].value.tag.name = NULL; + break; +*/ + default: + break; + } + } + +error_out: + free(owner); + if (tags) for (j = 0; tags[j].name != NULL; j++) { + free(tags[j].name); + free(tags[j].value); + } + free(tags); + + if (err.code) { + while (i > 0) { + i--; + switch (attrs_inout[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + free(attrs_inout[i].value.s); + break; + case GLITE_JP_ATTR_TAG: + free(attrs_inout[i].value.tag.name); + free(attrs_inout[i].value.tag.value); + default: + break; + } + } + return glite_jp_stack_error(ctx,&err); + } else { + return 0; + } +} +static int compare_timeval(struct timeval a, struct timeval b) +{ + if (a.tv_sec < b.tv_sec) return -1; + if (a.tv_sec > b.tv_sec) return 1; + if (a.tv_usec < b.tv_usec) return -1; + if (a.tv_usec > b.tv_usec) return 1; + return 0; +} + + +/* FIXME: disabled -- clarification wrt. filetype plugin needed */ + +#if 0 + +static int query_phase2( + glite_jp_context_t ctx, + const char *ownerhash, + long regtime_tr, + int q_tags, + int md_tags, + const glite_jp_query_rec_t query[], + glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +); + +static int query_phase2( + glite_jp_context_t ctx, + const char *ownerhash, + long regtime_tr, + int q_tags, + int md_tags, + const glite_jp_query_rec_t query[], + glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +) +{ + char *time_dirname = NULL; + DIR *time_dirp = NULL; + struct dirent *jobent; + char *info_fname = NULL; + char *jobid = NULL; + char *owner = NULL; + struct timeval tv_reg; + void *tags_handle = NULL; + int matching; + int i, j; + glite_jp_tagval_t* tags = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (asprintf(&time_dirname, "%s/data/%s/%d", config->internal_path, + ownerhash, regtime_tr) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + time_dirp = opendir(time_dirname); + if (!time_dirp) { + free(time_dirname); + return 0; /* found nothing */ + } + while ((jobent = readdir(time_dirp)) != NULL) { + if (!strcmp(jobent->d_name, ".")) continue; + if (!strcmp(jobent->d_name, "..")) continue; + if (asprintf(&info_fname, "%s/%s/_info", time_dirname, + jobent->d_name) == -1) { + err.code = ENOMEM; + goto error_out; + } + if (get_job_info_int(ctx, info_fname, &jobid, &owner, &tv_reg)) { + err.code = EIO; + err.desc = "Cannot retrieve job info"; + goto error_out; + } + if (q_tags || md_tags) { + if (glite_jppsbe_open_file(ctx, jobid, GLITE_JP_FILECLASS_TAGS, + O_RDONLY, &tags_handle)) { + err.code = ctx->error->code; + err.desc = "Cannot open tag file"; + goto error_out; + } + if (glite_jpps_tag_readall(ctx, tags_handle, &tags)) { + err.code = ctx->error->code; + err.desc = "Cannot read tags"; + glite_jppsbe_close_file(ctx, tags_handle); + goto error_out; + } + glite_jppsbe_close_file(ctx, tags_handle); + tags_handle = NULL; + } + + matching = 1; + for (i = 0; matching && query[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (query[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + if (query[i].value.s == NULL || + strcmp(query[i].value.s, owner)) matching = 0; + break; + case GLITE_JP_ATTR_TIME: + switch (query[i].op) { + case GLITE_JP_QUERYOP_EQUAL: + matching = !compare_timeval(tv_reg, query[i].value.time); + break; + case GLITE_JP_QUERYOP_UNEQUAL: + matching = compare_timeval(tv_reg, query[i].value.time); + break; + case GLITE_JP_QUERYOP_LESS: + matching = compare_timeval(tv_reg, query[i].value.time) < 0; + break; + case GLITE_JP_QUERYOP_GREATER: + matching = compare_timeval(tv_reg, query[i].value.time) > 0; + break; + case GLITE_JP_QUERYOP_WITHIN: + matching = compare_timeval(tv_reg, query[i].value.time) >= 0 + && compare_timeval(tv_reg, query[i].value2.time) <= 0; + break; + } + break; + case GLITE_JP_ATTR_TAG: + if (!tags) { + matching = 0; + break; + } + for (j = 0; tags[j].name != NULL; j++) { + if (!strcmp(tags[j].name, query[i].attr.name)) { + switch (query[i].op) { + case GLITE_JP_QUERYOP_EQUAL: + matching = !strcmp(tags[j].value, query[i].value.s); + break; + case GLITE_JP_QUERYOP_UNEQUAL: + matching = strcmp(tags[j].value, query[i].value.s); + break; + case GLITE_JP_QUERYOP_LESS: + matching = strcmp(tags[j].value, query[i].value.s) < 0; + break; + case GLITE_JP_QUERYOP_GREATER: + matching = strcmp(tags[j].value, query[i].value.s) > 0; + break; + case GLITE_JP_QUERYOP_WITHIN: + matching = strcmp(tags[j].value, query[i].value.s) >= 0 \ + && strcmp(tags[j].value, query[i].value2.s) <= 0 ; + break; + default: + break; + } + } + } + break; + default: + break; + } + } + if (!matching) { + free(info_fname); info_fname = NULL; + free(jobid); jobid = NULL; + if (tags) for (j = 0; tags[j].name != NULL; j++) { + free(tags[j].name); + free(tags[j].value); + } + free(tags); tags = NULL; + continue; + } + + for (i = 0; metadata[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + metadata[i].value.s = owner; + break; + case GLITE_JP_ATTR_TIME: + metadata[i].value.time = tv_reg; + break; + case GLITE_JP_ATTR_TAG: + for (j = 0; tags[j].name != NULL; j++) { + if (!strcmp(tags[j].name, metadata[i].attr.name)) { + if (glite_jpps_tagval_copy(ctx, &tags[j], + &metadata[i].value.tag)) { + err.code = ENOMEM; + err.desc = "Cannot copy tag value"; + goto error_out; + } + break; + } + } + if (!tags[j].name) { + metadata[i].value.tag.name = NULL; + metadata[i].value.tag.value = NULL; + } + break; + default: + break; + } + } + (*callback)(ctx, jobid, metadata); + free(jobid); jobid = NULL; + while (i > 0) { + i--; + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_TAG: + free(metadata[i].value.tag.name); + free(metadata[i].value.tag.value); + default: + break; + } + } + } + +error_out: + if (tags) for (j = 0; tags[j].name != NULL; j++) { + free(tags[j].name); + free(tags[j].value); + } + if (tags_handle) glite_jppsbe_close_file(ctx, tags_handle); + free(info_fname); + free(owner); + free(jobid); + closedir(time_dirp); + free(time_dirname); + if (err.code) { + while (i > 0) { + i--; + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_TAG: + free(metadata[i].value.tag.name); + free(metadata[i].value.tag.value); + default: + break; + } + } + return glite_jp_stack_error(ctx,&err); + } else + return 0; +} + +int glite_jppsbe_query( + glite_jp_context_t ctx, + const glite_jp_query_rec_t query[], + const glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +) +{ + /* XXX clone metadata */ + int i; + char *q_exact_owner = NULL; + char *ownerhash = NULL; + long q_min_time = 0; + long q_max_time = LONG_MAX; + long q_min_time_tr; + long q_max_time_tr; + int q_with_tags = 0; + int md_info = 0; + int md_tags = 0; + char *owner_dirname = NULL; + DIR *owner_dirp = NULL; + struct dirent *ttimeent; + char *data_dirname = NULL; + DIR *data_dirp = NULL; + struct dirent *ownerent; + long ttime = 0; + glite_jp_attrval_t *metadata_templ = NULL; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + for (i = 0; query[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + if (query[i].attr.type == GLITE_JP_ATTR_OWNER && query[i].op == GLITE_JP_QUERYOP_EQUAL) { + q_exact_owner = query[i].value.s; + } + if (query[i].attr.type == GLITE_JP_ATTR_TIME) { + switch (query[i].op) { + case GLITE_JP_QUERYOP_EQUAL: + q_min_time = query[i].value.time.tv_sec; + q_max_time = query[i].value.time.tv_sec + 1; + break; + case GLITE_JP_QUERYOP_LESS: + if (q_max_time > query[i].value.time.tv_sec + 1) + q_max_time = query[i].value.time.tv_sec + 1; + break; + case GLITE_JP_QUERYOP_WITHIN: + if (q_max_time > query[i].value2.time.tv_sec + 1) + q_max_time = query[i].value2.time.tv_sec + 1; + /* fallthrough */ + case GLITE_JP_QUERYOP_GREATER: + if (q_min_time < query[i].value.time.tv_sec) + q_min_time = query[i].value.time.tv_sec; + break; + default: + err.code = EINVAL; + err.desc = "Invalid query op"; + return glite_jp_stack_error(ctx,&err); + break; + } + } + if (query[i].attr.type == GLITE_JP_ATTR_TAG) + q_with_tags = 1; + + } + + for (i = 0; metadata[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { + switch (metadata[i].attr.type) { + case GLITE_JP_ATTR_OWNER: + case GLITE_JP_ATTR_TIME: + md_info = 1; + break; + case GLITE_JP_ATTR_TAG: + md_tags = 1; + break; + default: + err.code = EINVAL; + err.desc = "Invalid attribute type in metadata parameter"; + return glite_jp_stack_error(ctx,&err); + break; + } + } + metadata_templ = (glite_jp_attrval_t *) calloc(i + 1, sizeof(glite_jp_attrval_t)); + if (!metadata_templ) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + memcpy(metadata_templ, metadata, (i + 1) * sizeof(glite_jp_attrval_t)); + + q_min_time_tr = regtime_trunc(q_min_time); + q_max_time_tr = regtime_ceil(q_max_time); + + if (q_exact_owner) { + ownerhash = str2md5(q_exact_owner); /* static buffer */ + if (asprintf(&owner_dirname, "%s/data/%s", config->internal_path, ownerhash) == -1) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + owner_dirp = opendir(owner_dirname); + free(owner_dirname); + if (!owner_dirp) { + free(metadata_templ); + return 0; /* found nothing */ + } + while ((ttimeent = readdir(owner_dirp)) != NULL) { + if (!strcmp(ttimeent->d_name, ".")) continue; + if (!strcmp(ttimeent->d_name, "..")) continue; + ttime = atol(ttimeent->d_name); + if (ttime >= q_min_time_tr && ttime < q_max_time_tr) { + if (query_phase2(ctx, ownerhash, ttime, q_with_tags, md_tags, + query, metadata_templ, callback)) { + err.code = EIO; + err.desc = "query_phase2() error"; + goto error_out; + } + } + } + } else { /* !q_exact_owner */ + if (asprintf(&data_dirname, "%s/data", config->internal_path) == -1) { + err.code = ENOMEM; + goto error_out; + } + data_dirp = opendir(data_dirname); + if (!data_dirp) { + err.code = EIO; + err.desc = "Cannot open data directory"; + goto error_out; + } + while ((ownerent = readdir(data_dirp)) != NULL) { + if (!strcmp(ownerent->d_name, ".")) continue; + if (!strcmp(ownerent->d_name, "..")) continue; + if (asprintf(&owner_dirname, "%s/data/%s", config->internal_path, + ownerent->d_name) == -1) { + err.code = ENOMEM; + goto error_out; + } + owner_dirp = opendir(owner_dirname); + free(owner_dirname); + if (!owner_dirp) { + err.code = EIO; + err.desc = "Cannot open owner data directory"; + goto error_out; + } + while ((ttimeent = readdir(owner_dirp)) != NULL) { + if (!strcmp(ttimeent->d_name, ".")) continue; + if (!strcmp(ttimeent->d_name, "..")) continue; + ttime = atol(ttimeent->d_name); + if (ttime >= q_min_time_tr && ttime < q_max_time_tr) { + if (query_phase2(ctx, ownerent->d_name, ttime, q_with_tags, md_tags, + query, metadata_templ, callback)) { + err.code = EIO; + err.desc = "query_phase2() error"; + goto error_out; + } + } + } + closedir(owner_dirp); owner_dirp = NULL; + } + closedir(data_dirp); data_dirp = NULL; + } + return 0; + +error_out: + if (owner_dirp) closedir(owner_dirp); + if (data_dirp) closedir(data_dirp); + free(data_dirname); + free(metadata_templ); + return glite_jp_stack_error(ctx,&err); +} + +#else + +/* placeholder instead */ +int glite_jppsbe_query( + glite_jp_context_t ctx, + const glite_jp_query_rec_t query[], + const glite_jp_attrval_t metadata[], + int (*callback)( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t metadata[] + ) +) +{ + glite_jp_error_t err; + err.code = ENOSYS; + err.desc = "not implemented"; + return glite_jp_stack_error(ctx,&err); +} + +#endif + +/* XXX: +- no primary authorization yet +- no concurrency control yet +- partial success in pwrite,append +- "unique" part of jobid is assumed to be unique across bookkeeping servers +- repository versioning not fully implemented yet +*/ diff --git a/org.glite.jp.client/src/simple_server.c b/org.glite.jp.client/src/simple_server.c new file mode 100644 index 0000000..3bbb743 --- /dev/null +++ b/org.glite.jp.client/src/simple_server.c @@ -0,0 +1,59 @@ +#include + +#include "glite/jp/types.h" +#include "glite/jp/context.h" + +#include "jpps_H.h" + +extern SOAP_NMAC struct Namespace jpis__namespaces[],jpps__namespaces[]; + +int main(int argc, char *argv[]) { + struct soap soap; + int i, m, s; // master and slave sockets + + glite_jp_context_t ctx; + + soap_init(&soap); + soap_set_namespaces(&soap, jpps__namespaces); + + glite_jp_init_context(&ctx); + + if (glite_jppsbe_init(ctx, &argc, argv)) { + /* XXX log */ + fputs(glite_jp_error_chain(ctx), stderr); + exit(1); + } + + soap.user = (void *) ctx; + + ctx->other_soap = soap_new(); + soap_init(ctx->other_soap); + soap_set_namespaces(ctx->other_soap,jpis__namespaces); + + srand48(time(NULL)); /* feed id generation */ + + m = soap_bind(&soap, NULL, 8901, 100); + if (m < 0) + soap_print_fault(&soap, stderr); + else + { + fprintf(stderr, "Socket connection successful: master socket = %d\n", m); + for (i = 1; ; i++) { + s = soap_accept(&soap); + if (s < 0) { + soap_print_fault(&soap, stderr); + break; + } + jpps__serve(&soap); // process RPC request + soap_destroy(&soap); // clean up class instances + soap_end(&soap); // clean up everything and close socket + glite_jp_run_deferred(ctx); + } + } + soap_done(&soap); // close master socket + + return 0; +} + +/* XXX: we don't use it */ +SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} }; diff --git a/org.glite.jp.client/src/soap_ops.c b/org.glite.jp.client/src/soap_ops.c new file mode 100644 index 0000000..9411403 --- /dev/null +++ b/org.glite.jp.client/src/soap_ops.c @@ -0,0 +1,465 @@ +#include +#include +#include + +#include "glite/jp/types.h" +#include "glite/jp/context.h" + +#include "feed.h" + +#include "jpps_H.h" +/* #include "JobProvenancePS.nsmap" */ +#include "jpps_.nsmap" + +#include "jptype_map.h" + +#include "file_plugin.h" +#include "builtin_plugins.h" + +#include "soap_version.h" +#if GSOAP_VERSION <= 20602 +#define __jpsrv__RegisterJob __ns1__RegisterJob +#define __jpsrv__StartUpload __ns1__StartUpload +#define __jpsrv__CommitUpload __ns1__CommitUpload +#define __jpsrv__RecordTag __ns1__RecordTag +#define __jpsrv__FeedIndex __ns1__FeedIndex +#define __jpsrv__FeedIndexRefresh __ns1__FeedIndexRefresh +#define __jpsrv__GetJob __ns1__GetJob +#endif + +static struct jptype__genericFault *jp2s_error(struct soap *soap, + const glite_jp_error_t *err) +{ + struct jptype__genericFault *ret = NULL; + if (err) { + ret = soap_malloc(soap,sizeof *ret); + memset(ret,0,sizeof *ret); + ret->code = err->code; + ret->source = soap_strdup(soap,err->source); + ret->text = soap_strdup(soap,strerror(err->code)); + ret->description = soap_strdup(soap,err->desc); + ret->reason = jp2s_error(soap,err->reason); + } + return ret; +} + +static void err2fault(const glite_jp_context_t ctx,struct soap *soap) +{ + char *et; + struct SOAP_ENV__Detail *detail = soap_malloc(soap,sizeof *detail); + struct _genericFault *f = soap_malloc(soap,sizeof *f); + + + f->jpelem__genericFault = jp2s_error(soap,ctx->error); + + detail->__type = SOAP_TYPE__genericFault; +#if GSOAP_VERSION >= 20700 + detail->fault = f; +#else + detail->value = f; +#endif + detail->__any = NULL; + + soap_receiver_fault(soap,"Oh, shit!",NULL); + if (soap->version == 2) soap->fault->SOAP_ENV__Detail = detail; + else soap->fault->detail = detail; +} + +/* deprecated +static glite_jp_fileclass_t s2jp_fileclass(enum jptype__UploadClass class) +{ + switch (class) { + case INPUT_SANDBOX: return GLITE_JP_FILECLASS_INPUT; + case OUTPUT_SANDBOX: return GLITE_JP_FILECLASS_OUTPUT; + case JOB_LOG: return GLITE_JP_FILECLASS_LBLOG; + default: return GLITE_JP_FILECLASS_UNDEF; + } +} +*/ + +static void s2jp_tag(const struct jptype__tagValue *stag,glite_jp_tagval_t *jptag) +{ + memset(jptag,0,sizeof *jptag); + jptag->name = strdup(stag->name); + jptag->sequence = stag->sequence ? *stag->sequence : 0; + jptag->timestamp = stag->timestamp ? *stag->timestamp : 0; + if (stag->stringValue) jptag->value = strdup(stag->stringValue); + else if (stag->blobValue) { + jptag->binary = 1; + jptag->size = stag->blobValue->__size; + jptag->value = (char *) stag->blobValue->__ptr; + } +} + +#define CONTEXT_FROM_SOAP(soap,ctx) glite_jp_context_t ctx = (glite_jp_context_t) ((soap)->user) + +SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__RegisterJob( + struct soap *soap, + struct _jpelem__RegisterJob *in, + struct _jpelem__RegisterJobResponse *empty) +{ + CONTEXT_FROM_SOAP(soap,ctx); + glite_jp_attrval_t owner_val[2]; + + printf("%s %s %s\n",__FUNCTION__,in->job,in->owner); + if (glite_jpps_authz(ctx,SOAP_TYPE___jpsrv__RegisterJob,in->job,in->owner) || + glite_jppsbe_register_job(ctx,in->job,in->owner)) + { + err2fault(ctx,soap); + return SOAP_FAULT; + } + + owner_val[0].attr.type = GLITE_JP_ATTR_OWNER; + owner_val[0].value.s = in->owner; + owner_val[1].attr.type = GLITE_JP_ATTR_UNDEF; + +/* XXX: errrors should be ingored but not silently */ + glite_jpps_match_attr(ctx,in->job,owner_val); + + return SOAP_OK; +} + + +SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__StartUpload( + struct soap *soap, + struct _jpelem__StartUpload *in, + struct _jpelem__StartUploadResponse *out) +{ + CONTEXT_FROM_SOAP(soap,ctx); + char *destination; + time_t commit_before = in->commitBefore; + glite_jp_error_t err; + glite_jpps_fplug_data_t **pd = NULL; + int i; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + + if (glite_jpps_authz(ctx,SOAP_TYPE___jpsrv__StartUpload,NULL,NULL)) { + err2fault(ctx,soap); + return SOAP_FAULT; + } + + switch (glite_jpps_fplug_lookup(ctx,in->class_,&pd)) { + case ENOENT: + err.code = ENOENT; + err.source = __FUNCTION__; + err.desc = "unknown file class"; + glite_jp_stack_error(ctx,&err); + err2fault(ctx,soap); + return SOAP_FAULT; + case 0: break; + default: + err2fault(ctx,soap); + return SOAP_FAULT; + } + + for (i=0; pd[0]->uris[i] && strcmp(pd[0]->uris[i],in->class_); i++); + assert(pd[0]->uris[i]); + + if (glite_jppsbe_start_upload(ctx,in->job,pd[0]->classes[i],in->name,in->contentType, + &destination,&commit_before)) + { + err2fault(ctx,soap); + free(pd); + return SOAP_FAULT; + } + + out->destination = soap_strdup(soap,destination); + free(destination); + out->commitBefore = commit_before; + + free(pd); + return SOAP_OK; +} + +SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__CommitUpload( + struct soap *soap, + struct _jpelem__CommitUpload *in, + struct _jpelem__CommitUploadResponse *out) +{ + CONTEXT_FROM_SOAP(soap,ctx); + char *job,*class,*name; + + job = class = name = NULL; + + if (glite_jpps_authz(ctx,SOAP_TYPE___jpsrv__CommitUpload,NULL,NULL) || + glite_jppsbe_commit_upload(ctx,in->destination)) + { + err2fault(ctx,soap); + return SOAP_FAULT; + } + + /* XXX: should not fail when commit_upload was OK */ + assert(glite_jppsbe_destination_info(ctx,in->destination,&job,&class,&name) == 0); + + /* XXX: ignore errors but don't fail silenty */ + glite_jpps_match_file(ctx,job,class,name); + + free(job); free(class); free(name); + + return SOAP_OK; +} + +SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__RecordTag( + struct soap *soap, + struct _jpelem__RecordTag *in, + struct _jpelem__RecordTagResponse *out) +{ + CONTEXT_FROM_SOAP(soap,ctx); + void *file_be,*file_p; + glite_jpps_fplug_data_t **pd = NULL; + + glite_jp_tagval_t mytag; + + file_be = file_p = NULL; + + /* XXX: we assume just one plugin and also that TAGS plugin handles + * just one uri/class */ + + if (glite_jpps_fplug_lookup(ctx,GLITE_JP_FILETYPE_TAGS,&pd) + || glite_jppsbe_open_file(ctx,in->jobid,pd[0]->classes[0],NULL, + O_RDWR|O_CREAT,&file_be) + /* XXX: tags need reading to check magic number */ + ) { + free(pd); + err2fault(ctx,soap); + return SOAP_FAULT; + } + + s2jp_tag(in->tag,&mytag); + + /* XXX: assuming tag plugin handles just one type */ + if (pd[0]->ops.open(pd[0]->fpctx,file_be,GLITE_JP_FILETYPE_TAGS,&file_p) + || pd[0]->ops.generic(pd[0]->fpctx,file_p,GLITE_JP_FPLUG_TAGS_APPEND,&mytag)) + { + err2fault(ctx,soap); + if (file_p) pd[0]->ops.close(pd[0]->fpctx,file_p); + glite_jppsbe_close_file(ctx,file_be); + free(pd); + return SOAP_FAULT; + } + + if (pd[0]->ops.close(pd[0]->fpctx,file_p) + || glite_jppsbe_close_file(ctx,file_be)) + { + err2fault(ctx,soap); + free(pd); + return SOAP_FAULT; + } + + /* XXX: ignore errors but don't fail silenty */ + glite_jpps_match_tag(ctx,in->jobid,&mytag); + + free(pd); + return SOAP_OK; +} + +extern char *glite_jp_default_namespace; + +/* XXX: should be public */ +#define GLITE_JP_TAGS_NAMESPACE "http://glite.org/services/jp/tags" + +static void s2jp_attr(const char *in,glite_jp_attr_t *out) +{ + char *buf = strdup(in),*name = strchr(buf,':'),*ns = NULL; + + if (name) { + ns = buf; + *name++ = 0; + } + else { + name = buf; + ns = glite_jp_default_namespace; + } + + memset(out,0,sizeof *out); + + if (strcmp(ns,glite_jp_default_namespace)) + out->type = strcmp(ns,GLITE_JP_TAGS_NAMESPACE) ? + GLITE_JP_ATTR_GENERIC : GLITE_JP_ATTR_TAG; + else { + if (!strcmp(name,"owner")) out->type = GLITE_JP_ATTR_OWNER; + else if (!strcmp(name,"time")) out->type = GLITE_JP_ATTR_OWNER; + + } + + if (out->type) { + out->name = strdup(name); + out->namespace = strdup(ns); + } +} + +static void s2jp_queryval( + const char *in, + glite_jp_attrtype_t type, + union _glite_jp_query_rec_val *out) +{ + switch (type) { + case GLITE_JP_ATTR_OWNER: + case GLITE_JP_ATTR_TAG: + case GLITE_JP_ATTR_GENERIC: + out->s = strdup(in); + break; + case GLITE_JP_ATTR_TIME: + out->time.tv_sec = atoi(in); + break; + } +} + +static void s2jp_query(const struct jptype__primaryQuery *in, glite_jp_query_rec_t *out) +{ + s2jp_attr(in->attr,&out->attr); + + switch (in->op) { + case EQUAL: out->op = GLITE_JP_QUERYOP_EQUAL; break; + case UNEQUAL: out->op = GLITE_JP_QUERYOP_UNEQUAL; break; + case LESS: out->op = GLITE_JP_QUERYOP_LESS; break; + case GREATER: out->op = GLITE_JP_QUERYOP_GREATER; break; + case WITHIN: + out->op = GLITE_JP_QUERYOP_WITHIN; + s2jp_queryval(in->value2,out->attr.type,&out->value2); + break; + } + + s2jp_queryval(in->value,out->attr.type,&out->value); +} + +SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__FeedIndex( + struct soap *soap, + struct _jpelem__FeedIndex *in, + struct _jpelem__FeedIndexResponse *out) +{ + +/* deferred processing: return feed_id to the index server first, + * start feeding it afterwards -- not before the index server actually + * knows feed_id and is ready to accept the feed. + * + * Has to be done within the same server slave, + * passed through the context */ + + CONTEXT_FROM_SOAP(soap,ctx); + char *feed_id = NULL; + time_t expires = 0; + int ret = SOAP_OK; + + glite_jp_attr_t *attrs = calloc(in->__sizeattributes+1,sizeof *attrs); + glite_jp_query_rec_t *qry = calloc(in->__sizeconditions+1,sizeof *qry); + int i; + + glite_jp_clear_error(ctx); + + for (i = 0; i__sizeattributes; i++) s2jp_attr(in->attributes[i],attrs+i); + for (i = 0; i__sizeconditions; i++) s2jp_query(in->conditions[i],qry+i); + + if (in->history) { + if (glite_jpps_run_feed(ctx,in->destination,attrs,qry,&feed_id)) { + err2fault(ctx,soap); + ret = SOAP_FAULT; + goto cleanup; + } + } + + if (in->continuous) { + if (glite_jpps_register_feed(ctx,in->destination,attrs,qry,&feed_id,&expires)) { + err2fault(ctx,soap); + ret = SOAP_FAULT; + goto cleanup; + } + } + + if (!in->history && !in->continuous) { + glite_jp_error_t err; + memset(&err,0,sizeof err); + err.code = EINVAL; + err.source = __FUNCTION__; + err.desc = "at least one of and must be true"; + glite_jp_stack_error(ctx,&err); + err2fault(ctx,soap); + ret = SOAP_FAULT; + goto cleanup; + } + + out->feedExpires = expires; + out->feedId = soap_strdup(soap,feed_id); + +cleanup: + free(feed_id); + for (i=0; attrs[i].type; i++) free(attrs[i].name); + free(attrs); + for (i=0; qry[i].attr.type; i++) glite_jp_free_query_rec(qry+i); + free(qry); + + return ret; +} + +SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__FeedIndexRefresh( + struct soap *soap, + struct _jpelem__FeedIndexRefresh *in, + struct _jpelem__FeedIndexRefreshResponse *out) +{ + fprintf(stderr,"%s: not implemented\n",__FUNCTION__); + abort(); +} + +SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__GetJob( + struct soap *soap, + struct _jpelem__GetJob *in, + struct _jpelem__GetJobResponse *out) +{ + CONTEXT_FROM_SOAP(soap,ctx); + char *url; + + int i,n; + glite_jp_error_t err; + void **pd; + struct jptype__jppsFile **f = NULL; + + memset(&err,0,sizeof err); + out->__sizefiles = 0; + + for (pd = ctx->plugins; *pd; pd++) { + glite_jpps_fplug_data_t *plugin = *pd; + + for (i=0; plugin->uris[i]; i++) { + glite_jp_clear_error(ctx); + switch (glite_jppsbe_get_job_url(ctx,in->jobid,plugin->classes[i],NULL,&url)) { + case 0: n = out->__sizefiles++; + f = realloc(f,out->__sizefiles * sizeof *f); + f[n] = soap_malloc(soap, sizeof **f); + f[n]->class_ = soap_strdup(soap,plugin->uris[i]); + f[n]->name = NULL; + f[n]->url = soap_strdup(soap,url); + free(url); + break; + case ENOENT: + break; + default: + err.code = ctx->error->code; + err.source = "jpsrv__GetJob()"; + err.desc = plugin->uris[i]; + glite_jp_stack_error(ctx,&err); + err2fault(ctx,soap); + glite_jp_clear_error(ctx); + return SOAP_FAULT; + } + } + } + + if (!out->__sizefiles) { + glite_jp_clear_error(ctx); + err.code = ENOENT; + err.source = __FUNCTION__; + err.desc = "No file found for this job"; + glite_jp_stack_error(ctx,&err); + err2fault(ctx,soap); + glite_jp_clear_error(ctx); + return SOAP_FAULT; + } + + out->files = soap_malloc(soap,out->__sizefiles * sizeof *f); + memcpy(out->files,f,out->__sizefiles * sizeof *f); + + return SOAP_OK; +} + diff --git a/org.glite.jp.client/src/tags.c b/org.glite.jp.client/src/tags.c new file mode 100644 index 0000000..1f11b4d --- /dev/null +++ b/org.glite.jp.client/src/tags.c @@ -0,0 +1,233 @@ +#include +#include +#include +#include +#include + +#include +#include "tags.h" +#include "backend.h" + +/* magic name_len value_len binary sequence timestamp */ +#define HEADER "JP#TAG# %05u %012lu %c %05u %012lu#" +#define HEADER_SIZE 48 + +int glite_jpps_tag_append( + glite_jp_context_t ctx, + void *handle, + const glite_jp_tagval_t *tag +) +{ + char hdr[HEADER_SIZE+1]; + glite_jp_error_t err; + + unsigned long vlen = tag->binary ? tag->size : + (tag->value ? strlen(tag->value) : 0); + int nlen; + + memset(&err,0,sizeof err); + err.source = "glite_jpps_tag_append()"; + + if (!tag->name) { + err.code = EINVAL; + err.desc = "tag name"; + return glite_jp_stack_error(ctx,&err); + } + + nlen = strlen(tag->name); + + assert(sprintf(hdr,HEADER,nlen,vlen, + tag->binary ? "B" : "S", + tag->sequence, tag->timestamp) == HEADER_SIZE); + + if (glite_jppsbe_append(ctx,handle,hdr,HEADER_SIZE)) { + err.code = EIO; + err.desc = "write tag header"; + return glite_jp_stack_error(ctx,&err); + } + + if (glite_jppsbe_append(ctx,handle,tag->name,nlen)) { + err.code = EIO; + err.desc = "write tag name"; + return glite_jp_stack_error(ctx,&err); + } + + if (glite_jppsbe_append(ctx,handle,tag->value,vlen)) { + err.code = EIO; + err.desc = "write tag value"; + return glite_jp_stack_error(ctx,&err); + } + + return 0; +} + +int glite_jpps_tagval_copy( + glite_jp_context_t ctx, + glite_jp_tagval_t *from, + glite_jp_tagval_t *to +) +{ + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + to->name = strdup(from->name); + if (!to->name) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + to->sequence = from->sequence; + to->timestamp = from->timestamp; + to->binary = from->binary; + to->size = from->size; + to->value = (char *) malloc(to->size); + if (!to->value) { + free(to->name); + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + memcpy(from->value, to->value, to->size); + + return 0; +} + +int glite_jpps_tag_read( + glite_jp_context_t ctx, + void *handle, + off_t offset, + glite_jp_tagval_t *tagvalue, + size_t *shift +) +{ + char hdr[HEADER_SIZE+1]; + unsigned int nlen; + unsigned long vlen; + char binary; + unsigned sequence; + unsigned timestamp; + char * name = NULL; + char * value = NULL; + ssize_t ret; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + hdr[HEADER_SIZE] = '\0'; + if (glite_jppsbe_pread(ctx, handle, hdr, HEADER_SIZE, offset, &ret)) { + err.code = EIO; + err.desc = "Cannot read tag header"; + goto error_out; + } + if (ret == 0) { + err.code = ENOENT; + err.desc = "No more tags in the file"; + goto error_out; + } + /* #define HEADER "JP#TAG# %05u %012lu %c %05u %012lu#" */ + if (sscanf(hdr, HEADER, &nlen, &vlen, &binary, &sequence, ×tamp) < 5) { + err.code = EILSEQ; + err.desc = "Incorrect tag header format"; + goto error_out; + } + name = (char*) malloc(nlen + 1); + if (!name) { + err.code = ENOMEM; + goto error_out; + } + name[nlen] = '\0'; + value = (char*) malloc(vlen + 1); + if (!value) { + err.code = ENOMEM; + goto error_out; + } + value[vlen] = '\0'; + if (glite_jppsbe_pread(ctx, handle, name, nlen, offset + HEADER_SIZE, &ret)) { + err.code = EIO; + err.desc = "Cannot read tag name"; + goto error_out; + } + if (glite_jppsbe_pread(ctx, handle, value, vlen, offset + HEADER_SIZE + nlen, &ret)) { + err.code = EIO; + err.desc = "Cannot read tag value"; + goto error_out; + } + + tagvalue->name = name; + tagvalue->sequence = sequence; + tagvalue->timestamp = timestamp; + tagvalue->binary = (binary == 'B') ? 1 : 0; + tagvalue->size = vlen; + tagvalue->value = value; + + *shift = HEADER_SIZE + nlen + vlen; + + return 0; +error_out: + free(name); + free(value); + return glite_jp_stack_error(ctx,&err); +} + +/* +int glite_jpps_tag_read(glite_jp_context_t, void *, off_t, glite_jp_tagval_t *, size_t); +int glite_jpps_tag_readall(glite_jp_context_t, void *, glite_jp_tagval_t **); +*/ + +int glite_jpps_tag_readall( + glite_jp_context_t ctx, + void *handle, + glite_jp_tagval_t **tags_out +) +{ + glite_jp_tagval_t * tags = NULL; + void * newspace; + int ntags = 0; + int ntagspace = 0; + off_t offset = 0; + int ret; + size_t shift; + glite_jp_error_t err; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + ntagspace = 1; + tags = (glite_jp_tagval_t *) calloc(ntagspace + 1, sizeof(*tags)); + if (!tags) { + err.code = ENOMEM; + return glite_jp_stack_error(ctx,&err); + } + while (!(ret = glite_jpps_tag_read(ctx, handle, offset, &tags[ntags], &shift))) { + offset += shift; + ntags++; + if (ntagspace <= ntags) { + ntagspace += 1; + newspace = realloc(tags, (ntagspace + 1) * sizeof(*tags)); + if (!newspace) { + err.code = ENOMEM; + goto error_out; + } + tags = (glite_jp_tagval_t *) newspace; + } + } + if (ret == ENOENT) { + *tags_out = tags; + return 0; + } else { + err.code = EIO; + err.desc = "Error reading tag value"; + } + +error_out: + for (; ntags-- ;) { + free(tags[ntags].name); + free(tags[ntags].value); + } + free(tags); + return glite_jp_stack_error(ctx,&err); +} diff --git a/org.glite.jp.client/src/tags.h b/org.glite.jp.client/src/tags.h new file mode 100644 index 0000000..0d8afa8 --- /dev/null +++ b/org.glite.jp.client/src/tags.h @@ -0,0 +1 @@ +int glite_jpps_tag_append(glite_jp_context_t,void *,const glite_jp_tagval_t *); diff --git a/org.glite.jp.client/src/tags_plugin.c b/org.glite.jp.client/src/tags_plugin.c new file mode 100644 index 0000000..95dabd8 --- /dev/null +++ b/org.glite.jp.client/src/tags_plugin.c @@ -0,0 +1,148 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include "file_plugin.h" +#include "builtin_plugins.h" + +static int tagappend(void *,void *,int,...); +static int tagopen(void *,void *,const char *uri,void **); +static int tagclose(void *,void *); + +#define TAGS_MAGIC 0x74c016f2 /* two middle digits encode version, i.e. 01 */ + +static int tagdummy() +{ + puts("tagdummy()"); + return -1; +} + +struct tags_handle { + void *bhandle; + int n; + glite_jp_tagval_t *tags; +}; + +int init(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data) +{ + data->fpctx = ctx; + + data->uris = calloc(2,sizeof *data->uris); + data->uris[0] = strdup(GLITE_JP_FILETYPE_TAGS); + + data->classes = calloc(2,sizeof *data->classes); + data->classes[0] = strdup("tags"); + + data->ops.open = tagopen; + data->ops.close = tagclose; + data->ops.attr = tagdummy; + data->ops.generic = tagappend; + + printf("tags_plugin: URI: \"%s\"; magic number: 0x%08lx\n",GLITE_JP_FILETYPE_TAGS,TAGS_MAGIC); + return 0; +} + +static int tagopen(void *fpctx,void *bhandle,const char *uri,void **handle) +{ + struct tags_handle *h = calloc(1,sizeof *h); + h->n = -1; + h->bhandle = bhandle; + + *handle = h; + + return 0; +} + +static int tagclose(void *fpctx,void *handle) +{ + int i; + struct tags_handle *h = handle; + + for (i=0; in; i++) { + free(h->tags[i].name); + free(h->tags[i].value); + } + free(h->tags); + free(h); + + return 0; +} + +static int tagappend(void *fpctx,void *handle,int oper,...) +{ + glite_jp_tagval_t *tag; + va_list ap; + char *hdr,*rec; + glite_jp_context_t ctx = fpctx; + struct tags_handle *h = handle; + uint32_t magic,hlen,rlen,rlen_n; + size_t r; + glite_jp_error_t err; + + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + glite_jp_clear_error(ctx); + + va_start(ap,oper); + tag = va_arg(ap,glite_jp_tagval_t *); + va_end(ap); + + printf("tagappend: %s,%d,%s\n",tag->name,tag->sequence,tag->value); + + assert(oper == GLITE_JP_FPLUG_TAGS_APPEND); + + if (glite_jppsbe_pread(ctx,h->bhandle,&magic,sizeof magic,0,&r)) { + err.code = EIO; + err.desc = "reading magic number"; + return glite_jp_stack_error(ctx,&err); + } + + if (r == 0) { + magic = htonl(TAGS_MAGIC); + if (glite_jppsbe_pwrite(ctx,h->bhandle,&magic,sizeof magic,0)) { + err.code = EIO; + err.desc = "writing magic number"; + return glite_jp_stack_error(ctx,&err); + } + } + else if (r != sizeof magic) { + err.code = EIO; + err.desc = "can't read magic number"; + return glite_jp_stack_error(ctx,&err); + } + else if (magic != htonl(TAGS_MAGIC)) { + err.code = EINVAL; + err.desc = "invalid magic number"; + return glite_jp_stack_error(ctx,&err); + } + + trio_asprintf(&hdr,"%d %ld %c",tag->sequence, + tag->timestamp,tag->binary ? 'B' : 'S'); + + rlen = strlen(tag->name) + strlen(hdr) + 2 /* \0 after name and after hdr */ + + (r = tag->binary ? tag->size : (tag->value ? strlen(tag->value) : 0)); + + rlen_n = htonl(rlen); + + rec = malloc(rlen + sizeof rlen_n); + *((uint32_t *) rec) = rlen_n; + strcpy(rec + sizeof rlen_n,tag->name); + strcpy(rec + (hlen = sizeof rlen_n + strlen(tag->name) + 1),hdr); + + if (r) memcpy(rec + hlen + strlen(hdr) + 1,tag->value,r); + free(hdr); + + if (glite_jppsbe_append(ctx,h->bhandle,rec,rlen + sizeof rlen_n)) { + err.code = EIO; + err.desc = "writing tag record"; + free(rec); + return glite_jp_stack_error(ctx,&err); + } + + return 0; +} diff --git a/org.glite.jp.client/src/typemap.dat b/org.glite.jp.client/src/typemap.dat new file mode 100644 index 0000000..72f515f --- /dev/null +++ b/org.glite.jp.client/src/typemap.dat @@ -0,0 +1,3 @@ +jpsrv = http://glite.org/wsdl/services/jp +jptype = http://glite.org/wsdl/types/jp +jpelem = http://glite.org/wsdl/elements/jp -- 1.8.2.3