Job check and registration attempt on failed dump upload.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 27 Mar 2008 18:43:58 +0000 (18:43 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 27 Mar 2008 18:43:58 +0000 (18:43 +0000)
org.glite.jp.client/src/jpimporter.c

index a1d4a88..d9709ee 100644 (file)
@@ -24,6 +24,7 @@
 #include "jptype_map.h"
 #include "glite/security/glite_gsplugin.h"
 #include "glite/security/glite_gscompat.h"
+#include "glite/jp/known_attr.h"
 
 #include "globus_ftp_client.h"
 #include "jp_client.h"
@@ -493,12 +494,16 @@ static int dump_importer(void)
        struct _jpelem__StartUploadResponse             su_out;
        struct _jpelem__CommitUpload                    cu_in;
        struct _jpelem__CommitUploadResponse    empty;
+       struct _jpelem__RegisterJob                     rj_in;
+       struct _jpelem__RegisterJobResponse             rj_empty;
+       struct _jpelem__GetJobAttributes                gja_in;
+       struct _jpelem__GetJobAttributesResponse        gja_out;
        static int              readnew = 1;
        char               *msg = NULL,
                                   *fname = NULL,
                                   *bname;
        char                        fspec[PATH_MAX];
-       int                             ret;
+       int                             ret, retry_upload;
        int                             fhnd;
        msg_pattern_t   tab[] = {
                                                {"jobid", NULL},
@@ -566,15 +571,48 @@ static int dump_importer(void)
                        }
                        if (perf.name && !perf.limit) stats_get_limit(&perf, name);
                }
+               soap_begin(soap)
                if (!(sink & 2)) {
 #endif
-               refresh_connection(soap);
-               ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
-               if ( (ret = check_soap_fault(soap, ret)) ) break;
+               retry_upload = 2;
+               do {
+                       refresh_connection(soap);
+                       ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
+                       if ( (ret = check_soap_fault(soap, ret)) ) {
+                       /* unsuccessful dump, register job */
+                               refresh_connection(soap);
+                               /* check job existence */
+                               memset(&gja_in, 0, sizeof gja_in);
+                               memset(&gja_out, 0, sizeof gja_out);
+                               gja_in.jobid = su_in.job;
+                               GLITE_SECURITY_GSOAP_LIST_CREATE(soap, &gja_in, attributes, struct _jpelem__GetJobAttributes, 1);
+                               GLITE_SECURITY_GSOAP_LIST_GET(gja_in.attributes, 0) = GLITE_JP_ATTR_REGTIME;
+                               ret = soap_call___jpsrv__GetJobAttributes(soap, jpps, "", &gja_in, &gja_out);
+                               if (ret == 0) {
+                                       dprintf("[%s] Dump failed when job %s exists\n", name, su_in.job);
+                                       ret = -1;
+                                       break;
+                               }
+                               GLITE_SECURITY_GSOAP_LIST_GET(gja_in.attributes, 0) = NULL;
+                               /* register job */
+                               dprintf("[%s] Failsafe registration '%s'\n", name, rj_in.job);
+                               if ( !debug ) syslog(LOG_INFO, "Failsafe registration '%s'\n",rj_in.job);
+                               refresh_connection(soap);
+                               rj_in.job = su_in.job;
+                               rj_in.owner = mycred->name;
+                               ret = soap_call___jpsrv__RegisterJob(soap, tab[_jpps].val?:jpps, "", &rj_in, &rj_empty);
+                               if ( (ret = check_soap_fault(soap, ret)) ) break;
+                               dprintf("[%s] \tjobid: %s\n[%s] \towner: %s\n", name, rj_in.job, name, rj_in.owner);
+                               retry_upload--;
+                               ret = 1;
+                       }
+               } while (ret != 0 && retry_upload > 0);
+               if (ret) break;
                dprintf("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore));
                if (su_out.destination == NULL) {
                        dprintf("[%s] StartUpload returned NULL destination\n", name);
                        if ( !debug ) syslog(LOG_ERR, "StartUpload returned NULL destination");
+                       ret = 1;
                        break;
                }
 
@@ -618,6 +656,7 @@ static int dump_importer(void)
                                dprintf("[%s] %s removed\n", name, tab[_file].val);
                }
        } while (0);
+       soap_end(soap);
 
        glite_lbu_MaildirTransEnd(dump_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);
        free(fname);