#include "glite/lb/context-int.h"
#include "glite/lb/events_parse.h"
#include "glite/lb/ulm_parse.h"
+#include "glite/lb/intjobstat.h"
+#include "glite/lb/intjobstat_supp.h"
#include "query.h"
#include "get_events.h"
int edg_wll_DumpEventsServer(edg_wll_Context ctx,const edg_wll_DumpRequest *req,edg_wll_DumpResult *result)
{
- char *from_s, *to_s, *stmt, *time_s, *ptr;
+ char *from_s, *to_s, *stmt, *stmt2, *time_s, *ptr;
char *tmpfname;
time_t start,end;
glite_lbu_Statement q = NULL;
+ glite_lbu_Statement q2 = NULL;
char *res[11];
+ char *res2[6];
int event;
edg_wll_Event e;
+ edg_wll_Event *f;
int ret,dump = 2; /* TODO: manage dump file */
time_t from = req->from,to = req->to;
+ char *event_s, *dumpline;
+ int len, written, total;
+ intJobStat *stat;
- from_s = to_s = stmt = NULL;
+ from_s = to_s = stmt = stmt2 = NULL;
memset(res,0,sizeof res);
+ memset(res2,0,sizeof res2);
memset(&e,0,sizeof e);
time(&start);
"and j.jobid = e.jobid "
"and j.dg_jobid like 'https://%|Ss:%d/%%' "
"and arrived > %s and arrived <= %s "
- "order by arrived",
+ "order by arrived,event",
ctx->srvName,ctx->srvPort,
from_s,to_s);
glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt);
edg_wll_ResetError(ctx);
}
else {
- char *event_s = edg_wll_UnparseEvent(ctx,&e);
+ event_s = edg_wll_UnparseEvent(ctx,&e);
char arr_s[100];
- int len, written, total;
- strcpy(arr_s, "DG.ARRIVED=");
- edg_wll_ULMTimevalToDate(e.any.arrived.tv_sec,
- e.any.arrived.tv_usec,
- arr_s+strlen("DG.ARRIVED="));
- len = strlen(arr_s);
+ edg_wll_ULMTimevalToDate(e.any.arrived.tv_sec, e.any.arrived.tv_usec, arr_s);
+ asprintf(&dumpline, "DG.ARRIVED=%s %s\n", arr_s, event_s);
+ len = strlen(dumpline);
total = 0;
while (total != len) {
- written = write(dump,arr_s+total,len-total);
+ written = write(dump,dumpline+total,len-total);
if (written < 0 && errno != EAGAIN) {
edg_wll_SetError(ctx,errno,"writing dump file");
- free(event_s);
- goto clean;
- }
- total += written;
- }
- write(dump, " ", 1);
- len = strlen(event_s);
- total = 0;
- while (total != len) {
- written = write(dump,event_s+total,len-total);
- if (written < 0 && errno != EAGAIN) {
- edg_wll_SetError(ctx,errno,"writing dump file");
- free(event_s);
- goto clean;
+ break;
}
total += written;
}
- write(dump,"\n",1);
free(event_s);
}
edg_wll_FreeEvent(&e); memset(&e,0,sizeof e);
}
+ // Take care of implicit subjob registration events
+ trio_asprintf(&stmt2,
+ "select s.jobid,s.parent_job,ef.ulm,j.dg_jobid,s.int_status,e.arrived from states s, events_flesh ef, events e,jobs j "
+ "where s.parent_job<>'*no parent job*' AND "
+ "ef.jobid=s.parent_job AND (e.code=%d OR e.code=%d) AND "
+ "e.jobid=ef.jobid AND e.event=ef.event AND "
+ "e.arrived > %s AND e.arrived <= %s AND "
+ "j.jobid=s.jobid",
+ EDG_WLL_EVENT_REGJOB, EDG_WLL_EVENT_FILETRANSFERREGISTER,
+ from_s,to_s);
+ glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt2);
+ if (edg_wll_ExecSQL(ctx,stmt2,&q2) < 0) goto clean;
+
+ while ((ret = edg_wll_FetchRow(ctx,q2,sizeof(res2)/sizeof(res2[0]),NULL,res2)) > 0) {
+ glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG, "Dumping subjob %s, parent %s", res2[0], res2[1]);
+
+ edg_wll_ParseEvent(ctx,res2[2],&f);
+
+ f->regJob.nsubjobs = 0;
+ f->regJob.parent = f->any.jobId;
+
+ f->any.jobId=NULL;
+ edg_wlc_JobIdParse(res2[3], &f->any.jobId);
+
+ f->any.arrived.tv_sec = glite_lbu_StrToTime(res2[5]);
+ f->any.arrived.tv_usec = 0;
+
+ char *rest;
+ stat = dec_intJobStat(res2[4], &rest);
+ //nasty but not the only similar solution in code
+ switch (stat->pub.jobtype) {
+ case EDG_WLL_STAT_SIMPLE:
+ f->regJob.jobtype = EDG_WLL_REGJOB_SIMPLE; break;
+ case EDG_WLL_STAT_FILE_TRANSFER:
+ f->regJob.jobtype = EDG_WLL_REGJOB_FILE_TRANSFER; break;
+ default:
+ f->regJob.jobtype = EDG_WLL_REGJOB_JOBTYPE_UNDEFINED;
+ glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_WARN, "Job %s has type %d but it also lists a parent job %s", res2[2], stat->pub.jobtype, res2[1]);
+ }
+
+ char arr_s[100];
+ event_s = edg_wll_UnparseEvent(ctx,f);
+ edg_wll_ULMTimevalToDate(f->any.arrived.tv_sec, f->any.arrived.tv_usec, arr_s);
+ asprintf(&dumpline, "DG.ARRIVED=%s %s\n", arr_s, event_s);
+
+ len = strlen(dumpline);
+ total = 0;
+ while (total != len) {
+ written = write(dump,dumpline+total,len-total);
+ if (written < 0 && errno != EAGAIN) {
+ edg_wll_SetError(ctx,errno,"writing dump file");
+ break;
+ }
+ total += written;
+ }
+ edg_wll_FreeStatus(intJobStat_to_JobStat(stat));
+ free(event_s);
+ free(dumpline);
+ edg_wll_FreeEvent(f);
+ if (total != len) goto clean;
+ }
+
time(&end);
time_s = time_to_string(start, &ptr);
edg_wll_SetServerState(ctx,EDG_WLL_STATE_DUMP_START,time_s);
clean:
edg_wll_FreeEvent(&e);
glite_lbu_FreeStmt(&q);
+ glite_lbu_FreeStmt(&q2);
free(stmt);
+ free(stmt2);
free(from_s);
free(to_s);
return edg_wll_Error(ctx,NULL,NULL);
{
int fd,
reject_fd = -1,
- JPreg,
readret, i, ret;
size_t maxsize;
char *line = NULL, *errdesc,
buff[30];
- edg_wll_Event *event;
- edg_wlc_JobId jobid = NULL;
edg_wll_ResetError(ctx);
if ( readret == 0 )
break;
+ ctx->event_load = 1;
i++;
- if ( sscanf(line, "DG.ARRIVED=%s %*s", buff) != 1
- || edg_wll_ParseEvent(ctx, line, &event) )
+ if (db_store(ctx, line))
{
char errs[100];
sprintf(errs, "Error parsing event at line %d", i);
fprintf(stderr, "%s", errs);
continue;
}
- edg_wll_ULMDateToTimeval(buff, &(event->any.arrived));
-
- if ( i == 1 )
- {
- result->from = event->any.arrived.tv_sec;
- result->to = event->any.arrived.tv_sec;
- }
- ctx->event_load = 1;
-
- do {
- if (edg_wll_Transaction(ctx)) goto err;
-
- store_job_server_proxy(ctx, event, &JPreg);
-
- edg_wll_StoreEvent(ctx, event, line, NULL);
-
- } while (edg_wll_TransNeedRetry(ctx));
if ((ret = edg_wll_Error(ctx, NULL, &errdesc)) != 0) {
int len = strlen(line),
}
write(reject_fd,"\n",1);
}
- else {
- result->to = event->any.arrived.tv_sec;
- if ( jobid )
- {
- char *md5_jobid = edg_wlc_JobIdGetUnique(jobid);
-
- if ( strcmp(md5_jobid, edg_wlc_JobIdGetUnique(event->any.jobId)) )
- {
- edg_wll_JobStat st;
-
- edg_wll_JobStatusServer(ctx, jobid, 0, &st);
- edg_wll_FreeStatus(&st);
-
- edg_wlc_JobIdFree(jobid);
- edg_wlc_JobIdDup(event->any.jobId, &jobid);
- }
- free(md5_jobid);
- }
- else
- edg_wlc_JobIdDup(event->any.jobId, &jobid);
- }
-
cycle_clean:
ctx->event_load = 0;
- edg_wll_FreeEvent(event);
- }
-
-err:
- if ( jobid )
- {
- edg_wll_JobStat st;
-
- edg_wll_JobStatusServer(ctx, jobid, 0, &st);
- edg_wll_FreeStatus(&st);
- edg_wlc_JobIdFree(jobid);
}
if ( reject_fd != -1 )