- Generate fake registration events for subjobs on dump
authorZdeněk Šustr <sustr4@cesnet.cz>
Wed, 23 May 2012 14:00:21 +0000 (14:00 +0000)
committerZdeněk Šustr <sustr4@cesnet.cz>
Wed, 23 May 2012 14:00:21 +0000 (14:00 +0000)
- Register jobs on load, recalculate states
  - crude but works

org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/dump.c
org.glite.lb.server/src/load.c

index bb5dea8..2c47495 100644 (file)
@@ -27,6 +27,7 @@ limitations under the License.
 #include "glite/lb/context-int.h"
 #include "glite/lb/events.h"
 #include "glite/lb/events_parse.h"
+#include "glite/lb/ulm_parse.h"
 #include "purge.h"
 #include "store.h"
 #include "il_lbproxy.h"
@@ -65,6 +66,12 @@ db_store(edg_wll_Context ctx, char *event)
 
   local_job = is_job_local(ctx, ev->any.jobId);
 
+  if(ctx->event_load) {
+    char buff[30];
+    if(sscanf(event, "DG.ARRIVED=%s %*s", buff) == 1)
+      edg_wll_ULMDateToTimeval(buff, &(ev->any.arrived));
+  }
+
   if (!ctx->isProxy && check_store_authz(ctx, ev) != 0)
     goto err;
 
index c25f552..ea85059 100644 (file)
@@ -31,6 +31,8 @@ limitations under the License.
 #include "glite/lb/context-int.h"
 #include "glite/lb/events_parse.h"
 #include "glite/lb/ulm_parse.h"
+#include "glite/lb/intjobstat.h"
+#include "glite/lb/intjobstat_supp.h"
 
 #include "query.h"
 #include "get_events.h"
@@ -46,18 +48,25 @@ static int handle_specials(edg_wll_Context,time_t *);
 
 int edg_wll_DumpEventsServer(edg_wll_Context ctx,const edg_wll_DumpRequest *req,edg_wll_DumpResult *result)
 {
-       char    *from_s, *to_s, *stmt, *time_s, *ptr;
+       char    *from_s, *to_s, *stmt, *stmt2, *time_s, *ptr;
        char    *tmpfname;
        time_t  start,end;
        glite_lbu_Statement     q = NULL;
+       glite_lbu_Statement     q2 = NULL;
        char            *res[11];
+       char            *res2[6];
        int     event;
        edg_wll_Event   e;
+       edg_wll_Event   *f;
        int     ret,dump = 2;   /* TODO: manage dump file */
        time_t  from = req->from,to = req->to;
+       char *event_s, *dumpline;
+       int             len, written, total;
+       intJobStat *stat;
 
-       from_s = to_s = stmt = NULL;
+       from_s = to_s = stmt = stmt2 = NULL;
        memset(res,0,sizeof res);
+       memset(res2,0,sizeof res2);
        memset(&e,0,sizeof e);
 
        time(&start);
@@ -82,7 +91,7 @@ int edg_wll_DumpEventsServer(edg_wll_Context ctx,const edg_wll_DumpRequest *req,
                        "and j.jobid = e.jobid "
                        "and j.dg_jobid like 'https://%|Ss:%d/%%' "
                        "and arrived > %s and arrived <= %s "
-                       "order by arrived",
+                       "order by arrived,event",
                        ctx->srvName,ctx->srvPort,
                        from_s,to_s);
        glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt);
@@ -109,43 +118,88 @@ int edg_wll_DumpEventsServer(edg_wll_Context ctx,const edg_wll_DumpRequest *req,
                        edg_wll_ResetError(ctx);
                }
                else {
-                       char    *event_s = edg_wll_UnparseEvent(ctx,&e);
+                       event_s = edg_wll_UnparseEvent(ctx,&e);
                        char    arr_s[100];
-                       int             len, written, total;
 
-                       strcpy(arr_s, "DG.ARRIVED=");
-                       edg_wll_ULMTimevalToDate(e.any.arrived.tv_sec,
-                                                       e.any.arrived.tv_usec,
-                                                       arr_s+strlen("DG.ARRIVED="));
-                       len = strlen(arr_s);
+                       edg_wll_ULMTimevalToDate(e.any.arrived.tv_sec, e.any.arrived.tv_usec, arr_s);
+                       asprintf(&dumpline, "DG.ARRIVED=%s %s\n", arr_s, event_s);
+                       len = strlen(dumpline); 
                        total = 0;
                        while (total != len) {
-                               written = write(dump,arr_s+total,len-total);
+                               written = write(dump,dumpline+total,len-total);
                                if (written < 0 && errno != EAGAIN) {
                                        edg_wll_SetError(ctx,errno,"writing dump file");
-                                       free(event_s);
-                                       goto clean;
-                               }
-                               total += written;
-                       }
-                       write(dump, " ", 1);
-                       len = strlen(event_s);
-                       total = 0;
-                       while (total != len) {
-                               written = write(dump,event_s+total,len-total);
-                               if (written < 0 && errno != EAGAIN) {
-                                       edg_wll_SetError(ctx,errno,"writing dump file");
-                                       free(event_s);
-                                       goto clean;
+                                       break;
                                }
                                total += written;
                        }
-                       write(dump,"\n",1);
                        free(event_s);
                }
                edg_wll_FreeEvent(&e); memset(&e,0,sizeof e);
        }
 
+       // Take care of implicit subjob registration events
+        trio_asprintf(&stmt2,
+                        "select s.jobid,s.parent_job,ef.ulm,j.dg_jobid,s.int_status,e.arrived from states s, events_flesh ef, events e,jobs j "
+                        "where s.parent_job<>'*no parent job*' AND "
+                       "ef.jobid=s.parent_job AND (e.code=%d OR e.code=%d) AND "
+                       "e.jobid=ef.jobid AND e.event=ef.event AND "
+                       "e.arrived > %s AND e.arrived <= %s AND "
+                       "j.jobid=s.jobid",
+                       EDG_WLL_EVENT_REGJOB, EDG_WLL_EVENT_FILETRANSFERREGISTER,
+                       from_s,to_s);
+       glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt2);
+       if (edg_wll_ExecSQL(ctx,stmt2,&q2) < 0) goto clean;
+
+       while ((ret = edg_wll_FetchRow(ctx,q2,sizeof(res2)/sizeof(res2[0]),NULL,res2)) > 0) {
+               glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG, "Dumping subjob %s, parent %s", res2[0], res2[1]);
+
+               edg_wll_ParseEvent(ctx,res2[2],&f);
+
+               f->regJob.nsubjobs = 0;
+               f->regJob.parent = f->any.jobId;
+
+               f->any.jobId=NULL;
+               edg_wlc_JobIdParse(res2[3], &f->any.jobId);
+
+               f->any.arrived.tv_sec = glite_lbu_StrToTime(res2[5]);
+               f->any.arrived.tv_usec = 0;
+
+               char *rest;
+               stat = dec_intJobStat(res2[4], &rest);
+               //nasty but not the only similar solution in code
+               switch (stat->pub.jobtype) {
+                       case EDG_WLL_STAT_SIMPLE:
+                               f->regJob.jobtype = EDG_WLL_REGJOB_SIMPLE; break;
+                       case EDG_WLL_STAT_FILE_TRANSFER:
+                               f->regJob.jobtype = EDG_WLL_REGJOB_FILE_TRANSFER; break;
+                       default:
+                               f->regJob.jobtype = EDG_WLL_REGJOB_JOBTYPE_UNDEFINED;
+                               glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_WARN, "Job %s has type %d but it also lists a parent job %s", res2[2], stat->pub.jobtype, res2[1]);
+               }
+
+               char    arr_s[100];
+               event_s = edg_wll_UnparseEvent(ctx,f);
+               edg_wll_ULMTimevalToDate(f->any.arrived.tv_sec, f->any.arrived.tv_usec, arr_s);
+               asprintf(&dumpline, "DG.ARRIVED=%s %s\n", arr_s, event_s);
+
+               len = strlen(dumpline); 
+               total = 0;
+               while (total != len) {
+                       written = write(dump,dumpline+total,len-total);
+                       if (written < 0 && errno != EAGAIN) {
+                               edg_wll_SetError(ctx,errno,"writing dump file");
+                               break;
+                       }
+                       total += written;
+               }
+               edg_wll_FreeStatus(intJobStat_to_JobStat(stat));
+               free(event_s);
+               free(dumpline);
+               edg_wll_FreeEvent(f);
+               if (total != len) goto clean; 
+       }
+
        time(&end);
        time_s = time_to_string(start, &ptr);
        edg_wll_SetServerState(ctx,EDG_WLL_STATE_DUMP_START,time_s);
@@ -164,8 +218,10 @@ int edg_wll_DumpEventsServer(edg_wll_Context ctx,const edg_wll_DumpRequest *req,
 clean:
        edg_wll_FreeEvent(&e);
        glite_lbu_FreeStmt(&q);
+       glite_lbu_FreeStmt(&q2);
 
        free(stmt);
+       free(stmt2);
        free(from_s);
        free(to_s);
        return edg_wll_Error(ctx,NULL,NULL);
index c547aba..77bdcf7 100644 (file)
@@ -50,13 +50,10 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req,
 {
        int                                     fd,
                                                reject_fd = -1,
-                                               JPreg,
                                                readret, i, ret;
        size_t                                  maxsize;
        char                       *line = NULL, *errdesc,
                                                buff[30];
-       edg_wll_Event      *event;
-       edg_wlc_JobId           jobid = NULL;
 
 
        edg_wll_ResetError(ctx);
@@ -80,9 +77,9 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req,
                if ( readret == 0 )
                        break;
 
+               ctx->event_load = 1;
                i++;
-               if (   sscanf(line, "DG.ARRIVED=%s %*s", buff) != 1
-                       || edg_wll_ParseEvent(ctx, line, &event) )
+               if (db_store(ctx, line))
                {
                        char errs[100];
                        sprintf(errs, "Error parsing event at line %d", i);
@@ -91,23 +88,6 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req,
                        fprintf(stderr, "%s", errs);
                        continue;
                }
-               edg_wll_ULMDateToTimeval(buff, &(event->any.arrived));
-
-               if ( i == 1 )
-               {
-                       result->from = event->any.arrived.tv_sec;
-                       result->to = event->any.arrived.tv_sec;
-               }
-               ctx->event_load = 1;
-               
-               do {
-                       if (edg_wll_Transaction(ctx)) goto err;
-
-                       store_job_server_proxy(ctx, event, &JPreg);
-
-                       edg_wll_StoreEvent(ctx, event, line, NULL); 
-
-               } while (edg_wll_TransNeedRetry(ctx));
 
                if ((ret = edg_wll_Error(ctx, NULL, &errdesc)) != 0) {
                        int             len = strlen(line),
@@ -149,42 +129,9 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req,
                        }
                        write(reject_fd,"\n",1);
                }
-               else {
-                       result->to = event->any.arrived.tv_sec;
-                       if ( jobid )
-                       {
-                               char *md5_jobid = edg_wlc_JobIdGetUnique(jobid);
-                               
-                               if ( strcmp(md5_jobid, edg_wlc_JobIdGetUnique(event->any.jobId)) )
-                               {
-                                       edg_wll_JobStat st;
-
-                                       edg_wll_JobStatusServer(ctx, jobid, 0, &st);
-                                       edg_wll_FreeStatus(&st);
-
-                                       edg_wlc_JobIdFree(jobid);
-                                       edg_wlc_JobIdDup(event->any.jobId, &jobid);
-                               }
-                               free(md5_jobid);
-                       }
-                       else
-                               edg_wlc_JobIdDup(event->any.jobId, &jobid);
-               }
-
 
 cycle_clean:
                ctx->event_load = 0;
-               edg_wll_FreeEvent(event);
-       }
-
-err:
-       if ( jobid )
-       {
-               edg_wll_JobStat st;
-
-               edg_wll_JobStatusServer(ctx, jobid, 0, &st);
-               edg_wll_FreeStatus(&st);
-               edg_wlc_JobIdFree(jobid);
        }
 
        if ( reject_fd != -1 )