From 671b9db8bc3b0a14944223107e04accc6558faf2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ji=C5=99=C3=AD=20=C5=A0kr=C3=A1bal?= Date: Tue, 23 Aug 2005 09:42:15 +0000 Subject: [PATCH] - ftp client functionality added - added initial cert/key handling (refresh not done yet!) --- org.glite.jp.client/Makefile | 4 +- org.glite.jp.client/src/jpimporter.c | 372 +++++++++++++++++++++++++++++------ 2 files changed, 316 insertions(+), 60 deletions(-) diff --git a/org.glite.jp.client/Makefile b/org.glite.jp.client/Makefile index 345a7c1..b065ce7 100644 --- a/org.glite.jp.client/Makefile +++ b/org.glite.jp.client/Makefile @@ -27,7 +27,9 @@ VPATH=${top_srcdir}/src:${top_srcdir}/examples:${top_srcdir}/project:${stagedir} GLOBUS_LIBS:=-L${globus_prefix}/lib \ -lglobus_common_${nothrflavour} \ - -lglobus_gssapi_gsi_${nothrflavour} + -lglobus_gssapi_gsi_${nothrflavour} \ + -lglobus_ftp_client_gcc32dbg \ + -lglobus_ftp_control_gcc32dbg GLOBUS_CFLAGS:=-I${globus_prefix}/include/${nothrflavour} diff --git a/org.glite.jp.client/src/jpimporter.c b/org.glite.jp.client/src/jpimporter.c index fbc2d91..06050a0 100644 --- a/org.glite.jp.client/src/jpimporter.c +++ b/org.glite.jp.client/src/jpimporter.c @@ -5,11 +5,12 @@ #include #include #include +#include #include #include +#include #include #include -#include #include #include "glite/lb/lb_maildir.h" @@ -20,12 +21,20 @@ #include "jptype_map.h" +#include "globus_ftp_client.h" + #include "soap_version.h" #if GSOAP_VERSION <= 20602 #define soap_call___jpsrv__RegisterJob soap_call___ns1__RegisterJob #endif +typedef struct { + char *key; + char *val; +} msg_pattern_t; + + #ifndef dprintf #define dprintf(x) { if (debug) printf x; } #endif @@ -46,19 +55,32 @@ #define GLITE_JPPS "http://localhost:8901" #endif + #define MAX_REG_CONNS 500 -static int debug = 0; -static int die = 0; -static int child_died = 0; -static int poll = 2; -static char *jpps = GLITE_JPPS; -static char reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR; -static char dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR; -static struct soap *soap; +static int debug = 0; +static int die = 0; +static int child_died = 0; +static int poll = 2; +static char *name; +static char *jpps = GLITE_JPPS; +static char reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR; +static char dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR; +static struct soap *soap; + +static time_t cert_mtime; +static char *server_cert = NULL, + *server_key = NULL, + *cadir; +static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL; +static char *mysubj; + static struct option opts[] = { { "help", 0, NULL, 'h'}, + { "cert", 1, NULL, 'c'}, + { "key", 1, NULL, 'k'}, + { "CAdir", 1, NULL, 'C'}, { "debug", 0, NULL, 'g'}, { "jpps", 1, NULL, 'p'}, { "reg-mdir", 1, NULL, 'r'}, @@ -68,12 +90,15 @@ static struct option opts[] = { { NULL, 0, NULL, 0} }; -static const char *get_opt_string = "hgp:r:d::i:t:"; +static const char *get_opt_string = "hgp:r:d::i:t:c:k:C:"; static void usage(char *me) { fprintf(stderr,"usage: %s [option]\n" "\t-h, --help displays this screen\n" + "\t-k, --key private key file\n" + "\t-c, --cert certificate file\n" + "\t-C, --CAdir trusted certificates directory\n" "\t-g, --debug don't run as daemon, additional diagnostics\n" "\t-p, --jpps JP primary service server\n" "\t-r, --reg-mdir path to the 'LB maildir' subtree for registrations\n" @@ -94,15 +119,18 @@ static void catch_chld(int sig) } -static int slave(int (*)(const char *), const char *); -static int check_soap_fault(struct soap *, int, const char *); -static int reg_importer(const char *); -static int dump_importer(const char *); +static int slave(int (*)(void), const char *); +static int check_soap_fault(struct soap *, int); +static int reg_importer(void); +static int dump_importer(void); +static int parse_msg(char *, msg_pattern_t []); +static int gftp_put_file(const char *, int); int main(int argc, char *argv[]) { + edg_wll_GssStatus gss_code; struct sigaction sa; sigset_t sset; FILE *fpid; @@ -122,6 +150,9 @@ int main(int argc, char *argv[]) switch ( opt ) { case 'g': debug = 1; break; case 'h': usage(name); return 0; + case 'c': server_cert = optarg; break; + case 'k': server_key = optarg; break; + case 'C': cadir = optarg; break; case 'p': jpps = optarg; break; case 't': poll = atoi(optarg); break; case 'r': strcpy(reg_mdir, optarg); break; @@ -164,6 +195,27 @@ int main(int argc, char *argv[]) dprintf(("Master pid %d\n", getpid())); + if ( globus_module_activate(GLOBUS_FTP_CLIENT_MODULE) != GLOBUS_SUCCESS ) { + dprintf(("[master] Could not activate ftp client module\n")); + if (!debug) syslog(LOG_INFO, "Could not activate ftp client module\n"); + exit(1); + } else dprintf(("[master] Ftp client module activated\n")); + + if ( !server_cert || !server_key ) + fprintf(stderr, "%s: key or certificate file not specified" + " - unable to watch them for changes!\n", argv[0]); + if ( cadir ) setenv("X509_CERT_DIR", cadir, 1); + edg_wll_gss_watch_creds(server_cert, &cert_mtime); + if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &mysubj, &gss_code) ) { + dprintf(("[master] Server identity: %s\n", mysubj)); + } else { + char *errmsg; + edg_wll_gss_get_error(&gss_code, "edg_wll_gss_acquire_cred_gsi()", &errmsg); + dprintf(("[master] %s\n", errmsg)); + free(errmsg); + dprintf(("[master] Running unauthenticated\n")); + } + memset(&sa, 0, sizeof(sa)); assert(sa.sa_handler == NULL); sa.sa_handler = catchsig; sigaction(SIGINT, &sa, NULL); @@ -190,6 +242,10 @@ int main(int argc, char *argv[]) perror("starting reg importer slave"); exit(1); } + if ( (dump_pid = slave(dump_importer, "dump-imp")) < 0 ) { + perror("starting dump importer slave"); + exit(1); + } while ( !die ) { @@ -211,6 +267,15 @@ int main(int argc, char *argv[]) exit(1); } dprintf(("[master] reg importer slave restarted [%d]\n", reg_pid)); + } else if ( pid == dump_pid ) { + dprintf(("[master] dump importer slave died [%d]\n", pid)); + if (!debug) syslog(LOG_INFO, "dump importer slave died [%d]\n", die); + if ( (dump_pid = slave(dump_importer, "dump-imp")) < 0 ) { + perror("starting dump importer slave"); + kill(0, SIGINT); + exit(1); + } + dprintf(("[master] dump importer slave restarted [%d]\n", dump_pid)); } } } @@ -223,12 +288,13 @@ int main(int argc, char *argv[]) if (!debug) syslog(LOG_INFO, "Terminating on signal %d\n", die); kill(0, die); + globus_module_deactivate_all(); unlink(pidfile); return 0; } -static int slave(int (*fn)(const char *), const char *nm) +static int slave(int (*fn)(void), const char *nm) { struct sigaction sa; sigset_t sset; @@ -238,6 +304,7 @@ static int slave(int (*fn)(const char *), const char *nm) if ( (pid = fork()) ) return pid; + name = (char *)nm; memset(&sa, 0, sizeof(sa)); assert(sa.sa_handler == NULL); sa.sa_handler = catchsig; sigaction(SIGUSR1, &sa, NULL); @@ -248,10 +315,10 @@ static int slave(int (*fn)(const char *), const char *nm) sigaddset(&sset, SIGUSR1); sigprocmask(SIG_BLOCK, &sset, NULL); - dprintf(("[%s] slave started - pid [%d]\n", nm, getpid())); + dprintf(("[%s] slave started - pid [%d]\n", name, getpid())); while ( !die && conn_cnt < MAX_REG_CONNS ) { - int ret = fn(nm); + int ret = fn(); if ( ret > 0 ) conn_cnt++; else if ( ret < 0 ) exit(1); @@ -263,17 +330,17 @@ static int slave(int (*fn)(const char *), const char *nm) } if ( die ) { - dprintf(("[%s] Terminating on signal %d\n", nm, getpid(), die)); + dprintf(("[%s] Terminating on signal %d\n", name, getpid(), die)); if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d", die); } - dprintf(("[%s] Terminating after %d connections\n", nm, conn_cnt)); + dprintf(("[%s] Terminating after %d connections\n", name, conn_cnt)); if ( !debug ) syslog(LOG_INFO, "Terminating after %d connections", conn_cnt); exit(0); } -static int reg_importer(const char *nm) +static int reg_importer(void) { struct _jpelem__RegisterJob in; struct _jpelem__RegisterJobResponse empty; @@ -285,27 +352,27 @@ static int reg_importer(const char *nm) ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname); if ( ret < 0 ) { - dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", nm, strerror(errno), lbm_errdesc)); + dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc)); if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); return -1; } else if ( ret > 0 ) { - dprintf(("[%s] JP registration request received\n", nm)); + dprintf(("[%s] JP registration request received\n", name)); if ( !debug ) syslog(LOG_INFO, "JP registration request received\n"); ret = 0; if ( !(aux = strchr(msg, '\n')) ) { - dprintf(("[%s] Wrong format of message!\n", nm)); + dprintf(("[%s] Wrong format of message!\n", name)); if ( !debug ) syslog(LOG_ERR, "Wrong format of message\n"); ret = 0; - } else { + } else do { *aux++ = '\0'; in.job = msg; in.owner = aux; - dprintf(("[%s] Registering '%s'\n", nm, msg)); + dprintf(("[%s] Registering '%s'\n", name, msg)); if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg); ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty); - ret = check_soap_fault(soap, ret, nm); - } + if ( (ret = check_soap_fault(soap, ret)) ) break; + } while (0); edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED: LBMD_TRANS_OK); free(fname); free(msg); @@ -315,42 +382,73 @@ static int reg_importer(const char *nm) return 0; } -static int dump_importer(const char *nm) +static int dump_importer(void) { - struct _jpelem__StartUpload in; - struct _jpelem__StartUploadResponse out; - int ret; - char *msg = NULL, - *fname = NULL, - *aux; + struct _jpelem__StartUpload su_in; + struct _jpelem__StartUploadResponse su_out; + struct _jpelem__CommitUpload cu_in; + struct _jpelem__CommitUploadResponse empty; + + char *msg = NULL, + *fname = NULL, + *aux; + int ret; + int fhnd; + msg_pattern_t tab[] = { + {"jobid", NULL}, + {"file", NULL}, + {"jpps", NULL}, + {NULL, NULL}}; +#define _job 0 +#define _file 1 +#define _jpps 2 ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname); if ( ret < 0 ) { - dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", nm, strerror(errno), lbm_errdesc)); + dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc)); if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); return -1; } else if ( ret > 0 ) { - dprintf(("[%s] dump JP import request received\n", nm)); - if ( !debug ) syslog(LOG_INFO, "dump JP import request received\n"); + dprintf(("[%s] dump JP import request received\n", name)); + if ( !debug ) syslog(LOG_INFO, "dump JP import request received"); ret = 0; - if ( !(aux = strchr(msg, '\n')) ) { - dprintf(("[%s] Wrong format of message!\n", nm)); - if ( !debug ) syslog(LOG_ERR, "Wrong format of message\n"); + if ( parse_msg(msg, tab) < 0 ) { + dprintf(("[%s] Wrong format of message!\n", name)); + if ( !debug ) syslog(LOG_ERR, "Wrong format of message"); ret = 0; - } else { - *aux++ = '\0'; - in.job = argv[2]; - in.class_ = argv[3]; - in.name = NULL; - in.commitBefore = atoi(argv[4]) + time(NULL); - in.contentType = argv[5]; - dprintf(("[%s] Importing LB dump file '%s'\n", nm, msg)); + } else do { + su_in.job = tab[_job].val; + su_in.class_ = "urn:org.glite.jp.primary:lb"; + su_in.name = tab[_file].val; + su_in.commitBefore = 1000 + time(NULL); + su_in.contentType = "text/lb"; + dprintf(("[%s] Importing LB dump file '%s'\n", name, tab[_file].val)); if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg); - ret = soap_call___jpsrv__StartUpload(soap, jpps, "", &in, &out); - ret = check_soap_fault(soap, ret, nm); - } + ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out); + ret = check_soap_fault(soap, ret); + /* XXX: grrrrrrr! test it!!! + if ( (ret = check_soap_fault(soap, ret)) ) break; + dprintf(("[%s] Destination: %s\n\tCommit before: %s\n", su_out.destination, ctime(&su_out.commitBefore))); + */ + + if ( (fhnd = open(tab[_file].val, O_RDONLY)) < 0 ) { + dprintf(("[%s] Can't open dump file: %s\n", name, tab[_file].val)); + if ( !debug ) syslog(LOG_ERR, "Can't open dump file: %s", tab[_file].val); + ret = 1; + break; + } + /* XXX: grrrrrrr! remove next line!!! */ + su_out.destination = "gsiftp://nain.ics.muni.cz:5678/tmp/gsiftp-dump-tst-file"; + if ( (ret = gftp_put_file(su_out.destination, fhnd)) ) break; + close(fhnd); + dprintf(("[%s] File sent, commiting the upload\n", name)); + cu_in.destination = su_out.destination; + ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty); + if ( (ret = check_soap_fault(soap, ret)) ) break; + dprintf(("[%s] Dump upload succesfull\n", name)); + } while (0); edg_wll_MaildirTransEnd(dump_mdir, fname, ret? LBMD_TRANS_FAILED: LBMD_TRANS_OK); free(fname); free(msg); @@ -361,7 +459,7 @@ static int dump_importer(const char *nm) } -static int check_soap_fault(struct soap *soap, int err, const char *msg_pref) +static int check_soap_fault(struct soap *soap, int err) { struct SOAP_ENV__Detail *detail; struct jptype__genericFault *f; @@ -371,7 +469,7 @@ static int check_soap_fault(struct soap *soap, int err, const char *msg_pref) switch ( err ) { case SOAP_OK: - dprintf(("[%s] ok\n", msg_pref)); + dprintf(("[%s] ok\n", name)); break; case SOAP_FAULT: @@ -383,8 +481,8 @@ static int check_soap_fault(struct soap *soap, int err, const char *msg_pref) detail = soap->fault->detail; reason = soap->fault->faultstring; } - dprintf(("[%s] %s\n", msg_pref, reason)); - if ( !debug ) syslog(LOG_ERR, "%s %s", msg_pref, reason); + dprintf(("[%s] %s\n", name, reason)); + if ( !debug ) syslog(LOG_ERR, "%s", reason); assert(detail->__type == SOAP_TYPE__genericFault); #if GSOAP_VERSION >=20700 f = ((struct _genericFault *) detail->fault) -> jpelem__genericFault; @@ -393,11 +491,10 @@ static int check_soap_fault(struct soap *soap, int err, const char *msg_pref) #endif while (f) { dprintf(("[%s] %s%s: %s (%s)\n", - msg_pref, indent, + name, indent, f->source, f->text, f->description)); - if ( !debug ) syslog(LOG_ERR, "%s %s%s: %s (%s)", - msg_pref, reason, - f->source, f->text, f->description); + if ( !debug ) syslog(LOG_ERR, "%s%s: %s (%s)", + reason, f->source, f->text, f->description); f = f->reason; strcat(indent, " "); } @@ -410,6 +507,163 @@ static int check_soap_fault(struct soap *soap, int err, const char *msg_pref) return 0; } +/** Parses every line looking for pattern string and stores the value into + * the given variable + * + * line format is: key[space(s)]+val + */ +int parse_msg(char *msg, msg_pattern_t tab[]) +{ + char *eol = msg, + *key, *val; + + while ( eol && *eol != '\0' ) { + int i; + + key = eol; + if ( (eol = strchr(key, '\n')) ) *eol++ = '\0'; + while ( isblank(*key) ) key++; + if ( *key == '\0' ) continue; + val = key; + while ( !isblank(*val) ) val++; + if ( *val == '\0' ) return -1; + *val++ = '\0'; + while ( isblank(*val) ) val++; + if ( *val == '\0' ) return -1; + + for ( i = 0; tab[i].key; i++ ) { + if ( !strcmp(tab[i].key, key) ) { + tab[i].val = val; + break; + } + } + } + + return 0; +} + + +#define BUFSZ 1024 + +static globus_mutex_t gLock; +static globus_cond_t gCond; +static globus_bool_t gDone; +static globus_bool_t gError = GLOBUS_FALSE; +static globus_byte_t gBuffer[BUFSZ]; +static int gOffset; + + +static void gftp_done_cb( + void *user_arg, + globus_ftp_client_handle_t *handle, + globus_object_t *err) +{ + if ( err != GLOBUS_SUCCESS ) { + char *tmp = globus_object_printable_to_string(err); + dprintf(("[%s] Error in callback: %s\n", name, tmp)); + if ( !debug ) syslog(LOG_ERR, "Error in callback: %s", tmp); + gError = GLOBUS_TRUE; + globus_libc_free(tmp); + } + globus_mutex_lock(&gLock); + gDone = GLOBUS_TRUE; + globus_cond_signal(&gCond); + globus_mutex_unlock(&gLock); +} + +static void gftp_data_cb( + void *user_arg, + globus_ftp_client_handle_t *handle, + globus_object_t *error, + globus_byte_t *buffer, + globus_size_t length, + globus_off_t offset, + globus_bool_t eof) +{ + if ( !eof ) { + int rc; + globus_mutex_lock(&gLock); + if ( (rc = read(*((int *)user_arg), gBuffer, BUFSZ)) < 0 ) { + dprintf(("[%s] Error reading dump file\n", name)); + if ( !debug ) syslog(LOG_ERR, "Error reading dump file"); + gDone = GLOBUS_TRUE; + gError = GLOBUS_TRUE; + globus_cond_signal(&gCond); + } else { + globus_ftp_client_register_write( + handle, gBuffer, rc, gOffset, rc == 0, gftp_data_cb, user_arg); + gOffset += rc; + } + globus_mutex_unlock(&gLock); + } +} + +static int gftp_put_file(const char *url, int fhnd) +{ + globus_ftp_client_handle_t hnd; + globus_ftp_client_operationattr_t op_attr; + globus_ftp_client_handleattr_t hnd_attr; + +#define put_file_err(errs) { \ + dprintf(("[%s] %s\n", name, errs)); \ + if ( !debug ) syslog(LOG_ERR, errs); \ + return 1; \ +} + if ( globus_ftp_client_handleattr_init(&hnd_attr) != GLOBUS_SUCCESS ) + put_file_err("Could not initialise handle attributes"); + + if ( globus_ftp_client_operationattr_init(&op_attr) != GLOBUS_SUCCESS ) + put_file_err("Could not initialise operation attributes"); + + if ( globus_ftp_client_operationattr_set_authorization( + &op_attr, server_cert? mycred: GSS_C_NO_CREDENTIAL, + NULL, "", 0, NULL) != GLOBUS_SUCCESS ) + put_file_err("Could not set authorization procedure"); + + if ( globus_ftp_client_handle_init(&hnd, &hnd_attr) != GLOBUS_SUCCESS ) + put_file_err("Could not initialise ftp client handle"); +#undef put_file_err + + globus_mutex_init(&gLock, GLOBUS_NULL); + globus_cond_init(&gCond, GLOBUS_NULL); + + gDone = GLOBUS_FALSE; + + /* do the op */ + if ( globus_ftp_client_put( + &hnd, url, &op_attr, + GLOBUS_NULL, gftp_done_cb, (void *)&fhnd) != GLOBUS_SUCCESS) { + dprintf(("[%s] Could not start file put\n", name)); + if ( !debug ) syslog(LOG_ERR, "Could not start file put"); + gError = GLOBUS_TRUE; + gDone = GLOBUS_TRUE; + } else { + int rc; + globus_mutex_lock(&gLock); + if ( (rc = read(fhnd, gBuffer, BUFSZ)) < 0 ) { + dprintf(("[%s] Error reading dump file\n", name)); + if ( !debug ) syslog(LOG_ERR, "Error reading dump file"); + gDone = GLOBUS_TRUE; + gError = GLOBUS_TRUE; + globus_cond_signal(&gCond); + } else { + globus_ftp_client_register_write(&hnd, + gBuffer, rc, gOffset, rc == 0, gftp_data_cb, (void *)&fhnd); + gOffset += rc; + } + globus_mutex_unlock(&gLock); + } + + globus_mutex_lock(&gLock); + while ( !gDone ) globus_cond_wait(&gCond, &gLock); + globus_mutex_unlock(&gLock); + + globus_ftp_client_handle_destroy(&hnd); + + + return (gError == GLOBUS_TRUE)? 1: 0; +} + /* XXX: we don't use it */ SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} }; -- 1.8.2.3