Support for Zombies (Remembering purged jobs).
authorZdeněk Šustr <sustr4@cesnet.cz>
Tue, 29 Apr 2008 11:32:48 +0000 (11:32 +0000)
committerZdeněk Šustr <sustr4@cesnet.cz>
Tue, 29 Apr 2008 11:32:48 +0000 (11:32 +0000)
org.glite.lb.server/config/glite-lb-dbsetup.sql
org.glite.lb.server/src/query.c
org.glite.lb.server/src/srv_purge.c

index 163995e..6d705ba 100644 (file)
@@ -133,14 +133,14 @@ create table notif_jobs (
 ) engine=innodb;
 
 create table zombie_jobs (
-       binary_jobid    binary(16)      not null,
-       prefix_id       binary(1)       not null,
+       jobid   varchar(22)     not null,
+       prefix_id       tinyint unsigned not null,
 
-       primary key (binary_jobid)
+       primary key (jobid)
 ) engine=innodb;
 
 create table zombie_prefixes (
-       prefix_id       binary(1)       not null,
+       prefix_id       tinyint unsigned not null auto_increment,
        prefix          varchar(255)    binary not null,
 
        primary key (prefix_id)
index baa2751..5c62ac3 100644 (file)
@@ -296,6 +296,25 @@ cleanup:
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+int jobid_only_query(const edg_wll_QueryRec **conditions) {
+       int i = 0, j;
+       int jobid_in_this_or;
+       int not_jobid_only = 0;
+
+       while(conditions[i]) {
+               jobid_in_this_or = 0;
+               j = 0;
+               while(conditions[i][j].attr) {
+                       if(conditions[i][j].attr == EDG_WLL_QUERY_ATTR_JOBID) jobid_in_this_or++;
+                       j++;
+               }
+               if (!jobid_in_this_or) not_jobid_only++;
+               i++;
+       }
+
+       return not_jobid_only;
+}
+
 int edg_wll_QueryJobsServer(
                edg_wll_Context ctx,
                const edg_wll_QueryRec **conditions,
@@ -308,18 +327,24 @@ int edg_wll_QueryJobsServer(
                                           *tags_where = NULL,
                                           *q = NULL,
                                           *qbase = NULL,
-                                          *res[3];
+                                          *zquery = NULL,
+                                          *res[3],
+                                          *prefix,
+                                          *dbjob,
+                                          *zomb_where = NULL,
+                                          *zomb_where_temp = NULL,
+                                          *full_jobid = NULL;
        edg_wlc_JobId      *jobs_out = NULL;
        edg_wll_JobStat    *states_out = NULL;
        glite_lbu_Statement             sh;
        int                                     i = 0,
+                                               j = 0,
                                                ret = 0,
                                                eperm = 0,
                                                limit = 0, offset = 0,
                                                limit_loop = 1,
-                                               where_flags = 0;
-                                               
-
+                                               where_flags = 0,
+                                               first_or;
 
        memset(res,0,sizeof res);
        edg_wll_ResetError(ctx);
@@ -330,6 +355,8 @@ int edg_wll_QueryJobsServer(
                goto cleanup;
        }
 
+
+
        if ( (ctx->p_query_results == EDG_WLL_QUERYRES_ALL) &&
                        (!conditions[0] || conditions[1] ||
                        (conditions[0][0].attr != EDG_WLL_QUERY_ATTR_OWNER) ||
@@ -476,8 +503,65 @@ limit_cycle_cleanup:
        } while ( limit_loop );
 
        if ( !*jobs_out ) {
+               if(!jobid_only_query(conditions)) {
+                       i = 0;
+                       while(conditions[i]) {
+                               asprintf(&zomb_where_temp,"%s AND (", zomb_where ? zomb_where : "");
+                               free(zomb_where); 
+                               zomb_where = zomb_where_temp; zomb_where_temp = NULL;
+
+                               first_or = 0;
+                               j = 0;
+                               while(conditions[i][j].attr) {
+                               
+                                       if(conditions[i][j].attr == EDG_WLL_QUERY_ATTR_JOBID) {
+                                               dbjob = edg_wlc_JobIdGetUnique(conditions[i][j].value.j);
+                                               prefix = glite_jobid_getServer(conditions[i][j].value.c);
+                                               trio_asprintf(&zomb_where_temp,"%s%s((p.prefix = '%|Ss') AND (j.jobid = '%|Ss'))",
+                                                       zomb_where,
+                                                       first_or ? " OR " : "", 
+                                                       prefix,
+                                                       dbjob);
+                                               free(dbjob); 
+                                               free(zquery);
+                                               free(zomb_where);
+                                               zomb_where = zomb_where_temp; zomb_where_temp = NULL;
+                                               first_or++;
+                                       }
+                                       j++;
+                               }
+                               asprintf(&zomb_where_temp,"%s)", zomb_where ? zomb_where : "");
+                               free(zomb_where); 
+                               zomb_where = zomb_where_temp; zomb_where_temp = NULL;
+                               i++;
+                       }
+
+                       trio_asprintf(&zquery,"SELECT p.prefix,j.jobid FROM zombie_prefixes as p, zombie_jobs as j "
+                                            "WHERE (p.prefix_id = j.prefix_id) %s", zomb_where);
+
+                       j = edg_wll_ExecSQL(ctx,zquery,&sh);
+
+                       if (j > 0) {
+                               jobs_out        = (edg_wlc_JobId *) calloc(j+1, sizeof(*jobs_out));
+                               states_out      = (edg_wll_JobStat *) calloc(j+1, sizeof(*states_out));
+
+                               i = 0; 
+                               while ( (ret=edg_wll_FetchRow(ctx,sh,2,NULL,res)) > 0 ) {
+                                       asprintf(&full_jobid,"https://%s/%s",res[0],res[1]);
+                                       edg_wlc_JobIdParse(full_jobid, jobs_out+i);
+                                       edg_wlc_JobIdParse(full_jobid, &(states_out[i].jobId));
+                                       states_out[i].state = EDG_WLL_JOB_PURGED;
+
+                                       i++;
+                               }
+                       }
+                       glite_lbu_FreeStmt(&sh);
+               }
+       }
+
+       if ( !*jobs_out ) {
                if (eperm) edg_wll_SetError(ctx, EPERM, "matching jobs found but authorization failed");
-                 else     edg_wll_SetError(ctx, ENOENT, "no matching jobs found");
+               else edg_wll_SetError(ctx, ENOENT, "no matching jobs found");
        }
 
        if ( i && (ret == 0) )
index ea11650..ebf3da5 100644 (file)
@@ -470,7 +470,9 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
        glite_lbu_Statement     q;
        int             ret,dumped = 0;
        char    *res[10];
-
+       char    *prefix = NULL;
+       char    *prefix_id;
+       int     sql_retval;
 
        edg_wll_ResetError(ctx);
        if ( !purge && dump < 0 ) return 0;
@@ -548,6 +550,47 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
                        free(stmt); stmt = NULL;
                }
 
+               if ( purge )
+               {
+                       prefix = glite_jobid_getServer(job);
+               
+                       // See if that prefix is already stored in the database 
+                       trio_asprintf(&stmt,"select prefix_id from zombie_prefixes where prefix = '%|Ss'", prefix);
+
+                       sql_retval = edg_wll_ExecSQL(ctx,stmt,&q);
+                       free(stmt); stmt = NULL;
+
+                       if (sql_retval < 0) goto rollback;
+
+                       if (sql_retval == 0) { //prefix does not exist yet
+                               glite_lbu_FreeStmt(&q);
+
+                               trio_asprintf(&stmt,"insert into zombie_prefixes (prefix) VALUES ('%|Ss')", prefix);
+
+                               if (edg_wll_ExecSQL(ctx,stmt,&q) <= 0) goto rollback;
+
+                               free(stmt); stmt = NULL;
+                               glite_lbu_FreeStmt(&q);
+
+                               // The record should exist now, however we need to look up the prefix_id 
+                               trio_asprintf(&stmt,"select prefix_id from zombie_prefixes where prefix = '%|Ss'", prefix);
+
+                               if (edg_wll_ExecSQL(ctx,stmt,&q) <= 0) goto rollback;
+                               free(stmt); stmt = NULL;
+                       }
+                       ret = edg_wll_FetchRow(ctx,q, 1, NULL, &prefix_id);
+                       glite_lbu_FreeStmt(&q);
+
+                       trio_asprintf(&stmt,"insert into zombie_jobs (jobid, prefix_id) VALUES ('%|Ss', %|Ss)", dbjob, prefix_id);
+
+                       if (edg_wll_ExecSQL(ctx,stmt,&q) < 0) goto rollback;
+                       glite_lbu_FreeStmt(&q);
+                       free(stmt); stmt = NULL;
+                       free(prefix_id); prefix_id = NULL;
+
+                       free(prefix);
+               }
+
                if (dump >= 0) 
                        trio_asprintf(&stmt,
                                "select event,code,prog,host,u.cert_subj,time_stamp,usec,level,arrived,seqcode "