From: Zdeněk Šustr Date: Wed, 23 May 2012 14:00:21 +0000 (+0000) Subject: - Generate fake registration events for subjobs on dump X-Git-Tag: merge_32_head_src~9 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=855e62742698403f380a604d1fce18f7e4df5693;p=jra1mw.git - Generate fake registration events for subjobs on dump - Register jobs on load, recalculate states - crude but works --- diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index bb5dea8..2c47495 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -27,6 +27,7 @@ limitations under the License. #include "glite/lb/context-int.h" #include "glite/lb/events.h" #include "glite/lb/events_parse.h" +#include "glite/lb/ulm_parse.h" #include "purge.h" #include "store.h" #include "il_lbproxy.h" @@ -65,6 +66,12 @@ db_store(edg_wll_Context ctx, char *event) local_job = is_job_local(ctx, ev->any.jobId); + if(ctx->event_load) { + char buff[30]; + if(sscanf(event, "DG.ARRIVED=%s %*s", buff) == 1) + edg_wll_ULMDateToTimeval(buff, &(ev->any.arrived)); + } + if (!ctx->isProxy && check_store_authz(ctx, ev) != 0) goto err; diff --git a/org.glite.lb.server/src/dump.c b/org.glite.lb.server/src/dump.c index c25f552..ea85059 100644 --- a/org.glite.lb.server/src/dump.c +++ b/org.glite.lb.server/src/dump.c @@ -31,6 +31,8 @@ limitations under the License. #include "glite/lb/context-int.h" #include "glite/lb/events_parse.h" #include "glite/lb/ulm_parse.h" +#include "glite/lb/intjobstat.h" +#include "glite/lb/intjobstat_supp.h" #include "query.h" #include "get_events.h" @@ -46,18 +48,25 @@ static int handle_specials(edg_wll_Context,time_t *); int edg_wll_DumpEventsServer(edg_wll_Context ctx,const edg_wll_DumpRequest *req,edg_wll_DumpResult *result) { - char *from_s, *to_s, *stmt, *time_s, *ptr; + char *from_s, *to_s, *stmt, *stmt2, *time_s, *ptr; char *tmpfname; time_t start,end; glite_lbu_Statement q = NULL; + glite_lbu_Statement q2 = NULL; char *res[11]; + char *res2[6]; int event; edg_wll_Event e; + edg_wll_Event *f; int ret,dump = 2; /* TODO: manage dump file */ time_t from = req->from,to = req->to; + char *event_s, *dumpline; + int len, written, total; + intJobStat *stat; - from_s = to_s = stmt = NULL; + from_s = to_s = stmt = stmt2 = NULL; memset(res,0,sizeof res); + memset(res2,0,sizeof res2); memset(&e,0,sizeof e); time(&start); @@ -82,7 +91,7 @@ int edg_wll_DumpEventsServer(edg_wll_Context ctx,const edg_wll_DumpRequest *req, "and j.jobid = e.jobid " "and j.dg_jobid like 'https://%|Ss:%d/%%' " "and arrived > %s and arrived <= %s " - "order by arrived", + "order by arrived,event", ctx->srvName,ctx->srvPort, from_s,to_s); glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt); @@ -109,43 +118,88 @@ int edg_wll_DumpEventsServer(edg_wll_Context ctx,const edg_wll_DumpRequest *req, edg_wll_ResetError(ctx); } else { - char *event_s = edg_wll_UnparseEvent(ctx,&e); + event_s = edg_wll_UnparseEvent(ctx,&e); char arr_s[100]; - int len, written, total; - strcpy(arr_s, "DG.ARRIVED="); - edg_wll_ULMTimevalToDate(e.any.arrived.tv_sec, - e.any.arrived.tv_usec, - arr_s+strlen("DG.ARRIVED=")); - len = strlen(arr_s); + edg_wll_ULMTimevalToDate(e.any.arrived.tv_sec, e.any.arrived.tv_usec, arr_s); + asprintf(&dumpline, "DG.ARRIVED=%s %s\n", arr_s, event_s); + len = strlen(dumpline); total = 0; while (total != len) { - written = write(dump,arr_s+total,len-total); + written = write(dump,dumpline+total,len-total); if (written < 0 && errno != EAGAIN) { edg_wll_SetError(ctx,errno,"writing dump file"); - free(event_s); - goto clean; - } - total += written; - } - write(dump, " ", 1); - len = strlen(event_s); - total = 0; - while (total != len) { - written = write(dump,event_s+total,len-total); - if (written < 0 && errno != EAGAIN) { - edg_wll_SetError(ctx,errno,"writing dump file"); - free(event_s); - goto clean; + break; } total += written; } - write(dump,"\n",1); free(event_s); } edg_wll_FreeEvent(&e); memset(&e,0,sizeof e); } + // Take care of implicit subjob registration events + trio_asprintf(&stmt2, + "select s.jobid,s.parent_job,ef.ulm,j.dg_jobid,s.int_status,e.arrived from states s, events_flesh ef, events e,jobs j " + "where s.parent_job<>'*no parent job*' AND " + "ef.jobid=s.parent_job AND (e.code=%d OR e.code=%d) AND " + "e.jobid=ef.jobid AND e.event=ef.event AND " + "e.arrived > %s AND e.arrived <= %s AND " + "j.jobid=s.jobid", + EDG_WLL_EVENT_REGJOB, EDG_WLL_EVENT_FILETRANSFERREGISTER, + from_s,to_s); + glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt2); + if (edg_wll_ExecSQL(ctx,stmt2,&q2) < 0) goto clean; + + while ((ret = edg_wll_FetchRow(ctx,q2,sizeof(res2)/sizeof(res2[0]),NULL,res2)) > 0) { + glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG, "Dumping subjob %s, parent %s", res2[0], res2[1]); + + edg_wll_ParseEvent(ctx,res2[2],&f); + + f->regJob.nsubjobs = 0; + f->regJob.parent = f->any.jobId; + + f->any.jobId=NULL; + edg_wlc_JobIdParse(res2[3], &f->any.jobId); + + f->any.arrived.tv_sec = glite_lbu_StrToTime(res2[5]); + f->any.arrived.tv_usec = 0; + + char *rest; + stat = dec_intJobStat(res2[4], &rest); + //nasty but not the only similar solution in code + switch (stat->pub.jobtype) { + case EDG_WLL_STAT_SIMPLE: + f->regJob.jobtype = EDG_WLL_REGJOB_SIMPLE; break; + case EDG_WLL_STAT_FILE_TRANSFER: + f->regJob.jobtype = EDG_WLL_REGJOB_FILE_TRANSFER; break; + default: + f->regJob.jobtype = EDG_WLL_REGJOB_JOBTYPE_UNDEFINED; + glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_WARN, "Job %s has type %d but it also lists a parent job %s", res2[2], stat->pub.jobtype, res2[1]); + } + + char arr_s[100]; + event_s = edg_wll_UnparseEvent(ctx,f); + edg_wll_ULMTimevalToDate(f->any.arrived.tv_sec, f->any.arrived.tv_usec, arr_s); + asprintf(&dumpline, "DG.ARRIVED=%s %s\n", arr_s, event_s); + + len = strlen(dumpline); + total = 0; + while (total != len) { + written = write(dump,dumpline+total,len-total); + if (written < 0 && errno != EAGAIN) { + edg_wll_SetError(ctx,errno,"writing dump file"); + break; + } + total += written; + } + edg_wll_FreeStatus(intJobStat_to_JobStat(stat)); + free(event_s); + free(dumpline); + edg_wll_FreeEvent(f); + if (total != len) goto clean; + } + time(&end); time_s = time_to_string(start, &ptr); edg_wll_SetServerState(ctx,EDG_WLL_STATE_DUMP_START,time_s); @@ -164,8 +218,10 @@ int edg_wll_DumpEventsServer(edg_wll_Context ctx,const edg_wll_DumpRequest *req, clean: edg_wll_FreeEvent(&e); glite_lbu_FreeStmt(&q); + glite_lbu_FreeStmt(&q2); free(stmt); + free(stmt2); free(from_s); free(to_s); return edg_wll_Error(ctx,NULL,NULL); diff --git a/org.glite.lb.server/src/load.c b/org.glite.lb.server/src/load.c index c547aba..77bdcf7 100644 --- a/org.glite.lb.server/src/load.c +++ b/org.glite.lb.server/src/load.c @@ -50,13 +50,10 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req, { int fd, reject_fd = -1, - JPreg, readret, i, ret; size_t maxsize; char *line = NULL, *errdesc, buff[30]; - edg_wll_Event *event; - edg_wlc_JobId jobid = NULL; edg_wll_ResetError(ctx); @@ -80,9 +77,9 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req, if ( readret == 0 ) break; + ctx->event_load = 1; i++; - if ( sscanf(line, "DG.ARRIVED=%s %*s", buff) != 1 - || edg_wll_ParseEvent(ctx, line, &event) ) + if (db_store(ctx, line)) { char errs[100]; sprintf(errs, "Error parsing event at line %d", i); @@ -91,23 +88,6 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req, fprintf(stderr, "%s", errs); continue; } - edg_wll_ULMDateToTimeval(buff, &(event->any.arrived)); - - if ( i == 1 ) - { - result->from = event->any.arrived.tv_sec; - result->to = event->any.arrived.tv_sec; - } - ctx->event_load = 1; - - do { - if (edg_wll_Transaction(ctx)) goto err; - - store_job_server_proxy(ctx, event, &JPreg); - - edg_wll_StoreEvent(ctx, event, line, NULL); - - } while (edg_wll_TransNeedRetry(ctx)); if ((ret = edg_wll_Error(ctx, NULL, &errdesc)) != 0) { int len = strlen(line), @@ -149,42 +129,9 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req, } write(reject_fd,"\n",1); } - else { - result->to = event->any.arrived.tv_sec; - if ( jobid ) - { - char *md5_jobid = edg_wlc_JobIdGetUnique(jobid); - - if ( strcmp(md5_jobid, edg_wlc_JobIdGetUnique(event->any.jobId)) ) - { - edg_wll_JobStat st; - - edg_wll_JobStatusServer(ctx, jobid, 0, &st); - edg_wll_FreeStatus(&st); - - edg_wlc_JobIdFree(jobid); - edg_wlc_JobIdDup(event->any.jobId, &jobid); - } - free(md5_jobid); - } - else - edg_wlc_JobIdDup(event->any.jobId, &jobid); - } - cycle_clean: ctx->event_load = 0; - edg_wll_FreeEvent(event); - } - -err: - if ( jobid ) - { - edg_wll_JobStat st; - - edg_wll_JobStatusServer(ctx, jobid, 0, &st); - edg_wll_FreeStatus(&st); - edg_wlc_JobIdFree(jobid); } if ( reject_fd != -1 )