zombie jobs suffix introduced
authorMiloš Mulač <mulac@civ.zcu.cz>
Thu, 9 Oct 2008 08:19:10 +0000 (08:19 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Thu, 9 Oct 2008 08:19:10 +0000 (08:19 +0000)
- DB schema exetended
- supporting glite and PBS jobs
- seems working, but more testing needed

org.glite.lb.server/config/glite-lb-dbsetup.sql
org.glite.lb.server/config/glite-lb-migrate_db2version20
org.glite.lb.server/src/openserver.c
org.glite.lb.server/src/query.c
org.glite.lb.server/src/srv_purge.c

index 1e6ac2c..449d0b8 100644 (file)
@@ -135,6 +135,7 @@ create table notif_jobs (
 create table zombie_jobs (
        jobid   varchar(32)     not null,
        prefix_id       tinyint unsigned not null,
+       suffix_id       tinyint unsigned not null,
 
        primary key (jobid)
 ) engine=innodb;
@@ -146,4 +147,11 @@ create table zombie_prefixes (
        primary key (prefix_id)
 ) engine=innodb;
 
+create table zombie_suffixes (
+       suffix_id       tinyint unsigned not null auto_increment,
+       suffix          varchar(255)    binary not null,
+
+       primary key (suffix_id)
+) engine=innodb;
+
 
index a532174..54143e1 100644 (file)
@@ -123,6 +123,7 @@ mysql -u lbserver $DB_NAME -e "\
 create table zombie_jobs (\
         jobid          varchar(32)     not null,\
         prefix_id      tinyint         unsigned not null,\
+        suffix_id      tinyint         unsigned not null,\
 \
         primary key (jobid)\
 ) engine=innodb"
@@ -136,3 +137,12 @@ create table zombie_prefixes (\
         primary key (prefix_id)\
 ) engine=innodb"
 
+mysql -u lbserver $DB_NAME -e "\
+\
+create table zombie_suffixes (\
+        suffix_id       tinyint unsigned not null auto_increment,\
+        suffix          varchar(255)    binary not null,\
+\
+        primary key (suffix_id)\
+) engine=innodb"
+
index b4f506c..b816a38 100644 (file)
@@ -58,13 +58,14 @@ edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs)
        while (hit < 3 && (ret = glite_lbu_FetchRow(stmt, 1, NULL, table)) > 0) {
                if (strcasecmp(table[0], "events_flesh") == 0 ||
                strcasecmp(table[0], "zombie_jobs") == 0 ||
+               strcasecmp(table[0], "zombie_suffixes") == 0 ||
                strcasecmp(table[0], "zombie_prefixes") == 0) hit++;
                free(table[0]);
        }
        if (ret < 0) goto err;
        glite_lbu_FreeStmt(&stmt);
        if (hit != 3) {
-               ret = edg_wll_SetError(ctx, EDG_WLL_ERROR_DB_INIT, "events_flesh or zombie_jobs or zombie_prefixes table not found, migration to new schema needed");
+               ret = edg_wll_SetError(ctx, EDG_WLL_ERROR_DB_INIT, "events_flesh or zombie_jobs or zombie_prefixes or zombie_suffixes table not found, migration to new schema needed");
                goto close_db;
        }
 
index c888126..2b4ae3e 100644 (file)
@@ -331,7 +331,6 @@ int edg_wll_QueryJobsServer(
                                           *qbase = NULL,
                                           *zquery = NULL,
                                           *res[3],
-                                          *prefix,
                                           *dbjob,
                                           *zomb_where = NULL,
                                           *zomb_where_temp = NULL,
@@ -509,7 +508,7 @@ limit_cycle_cleanup:
                if(!jobid_only_query(conditions)) {
                        i = 0;
                        while(conditions[i]) {
-                               asprintf(&zomb_where_temp,"%s AND (", zomb_where ? zomb_where : "");
+                               asprintf(&zomb_where_temp,"%s%s", zomb_where ? zomb_where : "(", zomb_where ? " AND (" : "");
                                free(zomb_where); 
                                zomb_where = zomb_where_temp; zomb_where_temp = NULL;
 
@@ -518,12 +517,10 @@ limit_cycle_cleanup:
                                while(conditions[i][j].attr) {
                                
                                        if(conditions[i][j].attr == EDG_WLL_QUERY_ATTR_JOBID) {
-                                               dbjob = glite_jobid_getUnique(conditions[i][j].value.j);
-                                               prefix = glite_jobid_getServer(conditions[i][j].value.j);
-                                               trio_asprintf(&zomb_where_temp,"%s%s((p.prefix = '%|Ss') AND (j.jobid = '%|Ss'))",
+                                               dbjob = glite_jobid_unparse(conditions[i][j].value.j);
+                                               trio_asprintf(&zomb_where_temp,"%s%s(result.dg_jobid='%|Ss')",
                                                        zomb_where,
                                                        first_or ? " OR " : "", 
-                                                       prefix,
                                                        dbjob);
                                                free(dbjob); 
                                                free(zquery);
@@ -539,8 +536,12 @@ limit_cycle_cleanup:
                                i++;
                        }
 
-                       trio_asprintf(&zquery,"SELECT p.prefix,j.jobid FROM zombie_prefixes as p, zombie_jobs as j "
-                                            "WHERE (p.prefix_id = j.prefix_id) %s", zomb_where);
+                       trio_asprintf(&zquery,"SELECT * FROM "
+                                               "(SELECT  concat('https://',p.prefix,'/',j.jobid,s.suffix) AS dg_jobid FROM "
+                                               "zombie_suffixes AS s, zombie_jobs AS j, zombie_prefixes AS p WHERE "
+                                               "(s.suffix_id = j.suffix_id) AND (p.prefix_id = j.prefix_id)) AS result "
+                                               "WHERE %s", zomb_where);        
+
 
                        j = edg_wll_ExecSQL(ctx,zquery,&sh);
 
@@ -549,11 +550,12 @@ limit_cycle_cleanup:
                                states_out      = (edg_wll_JobStat *) calloc(j+1, sizeof(*states_out));
 
                                i = 0; 
-                               while ( (ret=edg_wll_FetchRow(ctx,sh,2,NULL,res)) > 0 ) {
-                                       asprintf(&full_jobid,"https://%s/%s",res[0],res[1]);
+                               while ( (ret=edg_wll_FetchRow(ctx,sh,sizeof(res),NULL,res)) > 0 ) {
+                                       asprintf(&full_jobid,"https://%s/%s%s",res[0],res[1],res[2]);
                                        edg_wlc_JobIdParse(full_jobid, jobs_out+i);
                                        edg_wlc_JobIdParse(full_jobid, &(states_out[i].jobId));
                                        states_out[i].state = EDG_WLL_JOB_PURGED;
+                                       free(res[0]); free(res[1]); free(res[2]);
 
                                        i++;
                                }
index 5c670b2..fae2d96 100644 (file)
@@ -512,6 +512,64 @@ static int dump_events(edg_wll_Context ctx, glite_jobid_const_t job, int dump, c
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+
+static int get_jobid_suffix(edg_wll_Context ctx, glite_jobid_const_t job, char **unique, char **suffix)
+{
+       char            *ptr = NULL, *dbjob = NULL;
+       edg_wll_JobStat stat;
+
+       
+       memset(&stat, 0, sizeof(stat));
+       if (edg_wll_JobStatusServer(ctx, job, 0 /*no flags*/, &stat)) {
+               goto err;
+       }
+       
+       dbjob = glite_jobid_getUnique(job);
+
+       switch (stat.jobtype) {
+               case EDG_WLL_STAT_SIMPLE:
+               case EDG_WLL_STAT_DAG:
+               case EDG_WLL_STAT__PARTITIONABLE_UNUSED:
+               case EDG_WLL_STAT__PARTITIONED_UNUSED:
+               case EDG_WLL_STAT_COLLECTION:
+                       // glite jobs, no suffix
+                       *suffix = strdup("");
+                       *unique = strdup(dbjob);
+                       break;
+
+               case EDG_WLL_STAT_PBS:
+                       // PBS jobs; suffix is everything starting from first '.'
+                       ptr = strchr(dbjob,'.');
+                       if (ptr) {
+                               *suffix = strdup(ptr);
+                               ptr[0] = '\0';
+                               *unique = strdup(dbjob);
+                               ptr[0] = '.';
+                       }
+                       else {
+                               edg_wll_SetError(ctx,EINVAL,"Uknown PBS job format");
+                               goto err;
+                       }
+                       break;
+
+               case EDG_WLL_STAT_CONDOR:
+                       // condor jobs
+                       assert(0); // XXX: todo
+                       break;
+               default:
+                       edg_wll_SetError(ctx,EINVAL,"Uknown job type");
+                       goto err;
+                       break;
+       }
+
+err:   
+       edg_wll_FreeStatus(&stat);
+       free(dbjob);
+
+       return edg_wll_Error(ctx, NULL, NULL);
+}
+
+
 int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, int purge_from_proxy_only)
 {
        char    *dbjob = NULL;
@@ -519,8 +577,8 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
        glite_lbu_Statement     q;
        int             ret,dumped = 0;
        char    *res[10];
-       char    *prefix = NULL;
-       char    *prefix_id;
+       char    *prefix = NULL, *suffix = NULL, *root = NULL;
+       char    *prefix_id, *suffix_id;
        int     sql_retval;
 
        edg_wll_ResetError(ctx);
@@ -583,6 +641,13 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
 
                if ( purge )
                {
+                       // get job suffix before its state is deleted
+                       if ( get_jobid_suffix(ctx, job, &root, &suffix) ) goto rollback;
+               
+               }
+
+               if ( purge )
+               {
                        trio_asprintf(&stmt,"delete from jobs where jobid = '%|Ss'",dbjob);
                        if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback;
                        free(stmt); stmt = NULL;
@@ -601,6 +666,7 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
 
                if ( purge )
                {
+                       /* Store zombie prefix */
                        prefix = glite_jobid_getServer(job);
                
                        // See if that prefix is already stored in the database 
@@ -630,14 +696,42 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
                        ret = edg_wll_FetchRow(ctx,q, 1, NULL, &prefix_id);
                        glite_lbu_FreeStmt(&q);
 
-                       trio_asprintf(&stmt,"insert into zombie_jobs (jobid, prefix_id) VALUES ('%|Ss', %|Ss)", dbjob, prefix_id);
+                       /* Store zombie suffix */
+
+                       // See if that suffix is already stored in the database 
+                       trio_asprintf(&stmt,"select suffix_id from zombie_suffixes where suffix = '%|Ss'", suffix);
+
+                       sql_retval = edg_wll_ExecSQL(ctx,stmt,&q);
+                       free(stmt); stmt = NULL;
+
+                       if (sql_retval < 0) goto rollback;
+
+                       if (sql_retval == 0) { //suffix does not exist yet
+                               glite_lbu_FreeStmt(&q);
+
+                               trio_asprintf(&stmt,"insert into zombie_suffixes (suffix) VALUES ('%|Ss')", suffix);
+
+                               if (edg_wll_ExecSQL(ctx,stmt,&q) <= 0) goto rollback;
+
+                               free(stmt); stmt = NULL;
+                               glite_lbu_FreeStmt(&q);
+
+                               // The record should exist now, however we need to look up the suffix_id 
+                               trio_asprintf(&stmt,"select suffix_id from zombie_suffixes where suffix = '%|Ss'", suffix);
+
+                               if (edg_wll_ExecSQL(ctx,stmt,&q) <= 0) goto rollback;
+                               free(stmt); stmt = NULL;
+                       }
+                       ret = edg_wll_FetchRow(ctx,q, 1, NULL, &suffix_id);
+                       glite_lbu_FreeStmt(&q);
+
+                       /* Store zombie job */
+                       trio_asprintf(&stmt,"insert into zombie_jobs (jobid, prefix_id, suffix_id)"
+                                       " VALUES ('%|Ss', '%|Ss', '%|Ss')", root, prefix_id, suffix_id);
 
                        if (edg_wll_ExecSQL(ctx,stmt,&q) < 0) goto rollback;
                        glite_lbu_FreeStmt(&q);
                        free(stmt); stmt = NULL;
-                       free(prefix_id); prefix_id = NULL;
-
-                       free(prefix);
                }
 
                if (dump >= 0) 
@@ -683,8 +777,15 @@ rollback:;
 
 
 err:
+       free(root);
+       free(suffix);
+       free(prefix);
+       free(prefix_id);
+       free(suffix_id);
        free(dbjob);
        free(stmt);
+       glite_lbu_FreeStmt(&q);
+
        return edg_wll_Error(ctx,NULL,NULL);
 }