From c048f475afb1876269e099cb87905aa71afa6698 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Thu, 9 Oct 2008 08:19:10 +0000 Subject: [PATCH] zombie jobs suffix introduced - DB schema exetended - supporting glite and PBS jobs - seems working, but more testing needed --- org.glite.lb.server/config/glite-lb-dbsetup.sql | 8 ++ .../config/glite-lb-migrate_db2version20 | 10 ++ org.glite.lb.server/src/openserver.c | 3 +- org.glite.lb.server/src/query.c | 22 ++-- org.glite.lb.server/src/srv_purge.c | 113 +++++++++++++++++++-- 5 files changed, 139 insertions(+), 17 deletions(-) diff --git a/org.glite.lb.server/config/glite-lb-dbsetup.sql b/org.glite.lb.server/config/glite-lb-dbsetup.sql index 1e6ac2c..449d0b8 100644 --- a/org.glite.lb.server/config/glite-lb-dbsetup.sql +++ b/org.glite.lb.server/config/glite-lb-dbsetup.sql @@ -135,6 +135,7 @@ create table notif_jobs ( create table zombie_jobs ( jobid varchar(32) not null, prefix_id tinyint unsigned not null, + suffix_id tinyint unsigned not null, primary key (jobid) ) engine=innodb; @@ -146,4 +147,11 @@ create table zombie_prefixes ( primary key (prefix_id) ) engine=innodb; +create table zombie_suffixes ( + suffix_id tinyint unsigned not null auto_increment, + suffix varchar(255) binary not null, + + primary key (suffix_id) +) engine=innodb; + diff --git a/org.glite.lb.server/config/glite-lb-migrate_db2version20 b/org.glite.lb.server/config/glite-lb-migrate_db2version20 index a532174..54143e1 100644 --- a/org.glite.lb.server/config/glite-lb-migrate_db2version20 +++ b/org.glite.lb.server/config/glite-lb-migrate_db2version20 @@ -123,6 +123,7 @@ mysql -u lbserver $DB_NAME -e "\ create table zombie_jobs (\ jobid varchar(32) not null,\ prefix_id tinyint unsigned not null,\ + suffix_id tinyint unsigned not null,\ \ primary key (jobid)\ ) engine=innodb" @@ -136,3 +137,12 @@ create table zombie_prefixes (\ primary key (prefix_id)\ ) engine=innodb" +mysql -u lbserver $DB_NAME -e "\ +\ +create table zombie_suffixes (\ + suffix_id tinyint unsigned not null auto_increment,\ + suffix varchar(255) binary not null,\ +\ + primary key (suffix_id)\ +) engine=innodb" + diff --git a/org.glite.lb.server/src/openserver.c b/org.glite.lb.server/src/openserver.c index b4f506c..b816a38 100644 --- a/org.glite.lb.server/src/openserver.c +++ b/org.glite.lb.server/src/openserver.c @@ -58,13 +58,14 @@ edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs) while (hit < 3 && (ret = glite_lbu_FetchRow(stmt, 1, NULL, table)) > 0) { if (strcasecmp(table[0], "events_flesh") == 0 || strcasecmp(table[0], "zombie_jobs") == 0 || + strcasecmp(table[0], "zombie_suffixes") == 0 || strcasecmp(table[0], "zombie_prefixes") == 0) hit++; free(table[0]); } if (ret < 0) goto err; glite_lbu_FreeStmt(&stmt); if (hit != 3) { - ret = edg_wll_SetError(ctx, EDG_WLL_ERROR_DB_INIT, "events_flesh or zombie_jobs or zombie_prefixes table not found, migration to new schema needed"); + ret = edg_wll_SetError(ctx, EDG_WLL_ERROR_DB_INIT, "events_flesh or zombie_jobs or zombie_prefixes or zombie_suffixes table not found, migration to new schema needed"); goto close_db; } diff --git a/org.glite.lb.server/src/query.c b/org.glite.lb.server/src/query.c index c888126..2b4ae3e 100644 --- a/org.glite.lb.server/src/query.c +++ b/org.glite.lb.server/src/query.c @@ -331,7 +331,6 @@ int edg_wll_QueryJobsServer( *qbase = NULL, *zquery = NULL, *res[3], - *prefix, *dbjob, *zomb_where = NULL, *zomb_where_temp = NULL, @@ -509,7 +508,7 @@ limit_cycle_cleanup: if(!jobid_only_query(conditions)) { i = 0; while(conditions[i]) { - asprintf(&zomb_where_temp,"%s AND (", zomb_where ? zomb_where : ""); + asprintf(&zomb_where_temp,"%s%s", zomb_where ? zomb_where : "(", zomb_where ? " AND (" : ""); free(zomb_where); zomb_where = zomb_where_temp; zomb_where_temp = NULL; @@ -518,12 +517,10 @@ limit_cycle_cleanup: while(conditions[i][j].attr) { if(conditions[i][j].attr == EDG_WLL_QUERY_ATTR_JOBID) { - dbjob = glite_jobid_getUnique(conditions[i][j].value.j); - prefix = glite_jobid_getServer(conditions[i][j].value.j); - trio_asprintf(&zomb_where_temp,"%s%s((p.prefix = '%|Ss') AND (j.jobid = '%|Ss'))", + dbjob = glite_jobid_unparse(conditions[i][j].value.j); + trio_asprintf(&zomb_where_temp,"%s%s(result.dg_jobid='%|Ss')", zomb_where, first_or ? " OR " : "", - prefix, dbjob); free(dbjob); free(zquery); @@ -539,8 +536,12 @@ limit_cycle_cleanup: i++; } - trio_asprintf(&zquery,"SELECT p.prefix,j.jobid FROM zombie_prefixes as p, zombie_jobs as j " - "WHERE (p.prefix_id = j.prefix_id) %s", zomb_where); + trio_asprintf(&zquery,"SELECT * FROM " + "(SELECT concat('https://',p.prefix,'/',j.jobid,s.suffix) AS dg_jobid FROM " + "zombie_suffixes AS s, zombie_jobs AS j, zombie_prefixes AS p WHERE " + "(s.suffix_id = j.suffix_id) AND (p.prefix_id = j.prefix_id)) AS result " + "WHERE %s", zomb_where); + j = edg_wll_ExecSQL(ctx,zquery,&sh); @@ -549,11 +550,12 @@ limit_cycle_cleanup: states_out = (edg_wll_JobStat *) calloc(j+1, sizeof(*states_out)); i = 0; - while ( (ret=edg_wll_FetchRow(ctx,sh,2,NULL,res)) > 0 ) { - asprintf(&full_jobid,"https://%s/%s",res[0],res[1]); + while ( (ret=edg_wll_FetchRow(ctx,sh,sizeof(res),NULL,res)) > 0 ) { + asprintf(&full_jobid,"https://%s/%s%s",res[0],res[1],res[2]); edg_wlc_JobIdParse(full_jobid, jobs_out+i); edg_wlc_JobIdParse(full_jobid, &(states_out[i].jobId)); states_out[i].state = EDG_WLL_JOB_PURGED; + free(res[0]); free(res[1]); free(res[2]); i++; } diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index 5c670b2..fae2d96 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -512,6 +512,64 @@ static int dump_events(edg_wll_Context ctx, glite_jobid_const_t job, int dump, c return edg_wll_Error(ctx,NULL,NULL); } + +static int get_jobid_suffix(edg_wll_Context ctx, glite_jobid_const_t job, char **unique, char **suffix) +{ + char *ptr = NULL, *dbjob = NULL; + edg_wll_JobStat stat; + + + memset(&stat, 0, sizeof(stat)); + if (edg_wll_JobStatusServer(ctx, job, 0 /*no flags*/, &stat)) { + goto err; + } + + dbjob = glite_jobid_getUnique(job); + + switch (stat.jobtype) { + case EDG_WLL_STAT_SIMPLE: + case EDG_WLL_STAT_DAG: + case EDG_WLL_STAT__PARTITIONABLE_UNUSED: + case EDG_WLL_STAT__PARTITIONED_UNUSED: + case EDG_WLL_STAT_COLLECTION: + // glite jobs, no suffix + *suffix = strdup(""); + *unique = strdup(dbjob); + break; + + case EDG_WLL_STAT_PBS: + // PBS jobs; suffix is everything starting from first '.' + ptr = strchr(dbjob,'.'); + if (ptr) { + *suffix = strdup(ptr); + ptr[0] = '\0'; + *unique = strdup(dbjob); + ptr[0] = '.'; + } + else { + edg_wll_SetError(ctx,EINVAL,"Uknown PBS job format"); + goto err; + } + break; + + case EDG_WLL_STAT_CONDOR: + // condor jobs + assert(0); // XXX: todo + break; + default: + edg_wll_SetError(ctx,EINVAL,"Uknown job type"); + goto err; + break; + } + +err: + edg_wll_FreeStatus(&stat); + free(dbjob); + + return edg_wll_Error(ctx, NULL, NULL); +} + + int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, int purge_from_proxy_only) { char *dbjob = NULL; @@ -519,8 +577,8 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i glite_lbu_Statement q; int ret,dumped = 0; char *res[10]; - char *prefix = NULL; - char *prefix_id; + char *prefix = NULL, *suffix = NULL, *root = NULL; + char *prefix_id, *suffix_id; int sql_retval; edg_wll_ResetError(ctx); @@ -583,6 +641,13 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i if ( purge ) { + // get job suffix before its state is deleted + if ( get_jobid_suffix(ctx, job, &root, &suffix) ) goto rollback; + + } + + if ( purge ) + { trio_asprintf(&stmt,"delete from jobs where jobid = '%|Ss'",dbjob); if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback; free(stmt); stmt = NULL; @@ -601,6 +666,7 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i if ( purge ) { + /* Store zombie prefix */ prefix = glite_jobid_getServer(job); // See if that prefix is already stored in the database @@ -630,14 +696,42 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i ret = edg_wll_FetchRow(ctx,q, 1, NULL, &prefix_id); glite_lbu_FreeStmt(&q); - trio_asprintf(&stmt,"insert into zombie_jobs (jobid, prefix_id) VALUES ('%|Ss', %|Ss)", dbjob, prefix_id); + /* Store zombie suffix */ + + // See if that suffix is already stored in the database + trio_asprintf(&stmt,"select suffix_id from zombie_suffixes where suffix = '%|Ss'", suffix); + + sql_retval = edg_wll_ExecSQL(ctx,stmt,&q); + free(stmt); stmt = NULL; + + if (sql_retval < 0) goto rollback; + + if (sql_retval == 0) { //suffix does not exist yet + glite_lbu_FreeStmt(&q); + + trio_asprintf(&stmt,"insert into zombie_suffixes (suffix) VALUES ('%|Ss')", suffix); + + if (edg_wll_ExecSQL(ctx,stmt,&q) <= 0) goto rollback; + + free(stmt); stmt = NULL; + glite_lbu_FreeStmt(&q); + + // The record should exist now, however we need to look up the suffix_id + trio_asprintf(&stmt,"select suffix_id from zombie_suffixes where suffix = '%|Ss'", suffix); + + if (edg_wll_ExecSQL(ctx,stmt,&q) <= 0) goto rollback; + free(stmt); stmt = NULL; + } + ret = edg_wll_FetchRow(ctx,q, 1, NULL, &suffix_id); + glite_lbu_FreeStmt(&q); + + /* Store zombie job */ + trio_asprintf(&stmt,"insert into zombie_jobs (jobid, prefix_id, suffix_id)" + " VALUES ('%|Ss', '%|Ss', '%|Ss')", root, prefix_id, suffix_id); if (edg_wll_ExecSQL(ctx,stmt,&q) < 0) goto rollback; glite_lbu_FreeStmt(&q); free(stmt); stmt = NULL; - free(prefix_id); prefix_id = NULL; - - free(prefix); } if (dump >= 0) @@ -683,8 +777,15 @@ rollback:; err: + free(root); + free(suffix); + free(prefix); + free(prefix_id); + free(suffix_id); free(dbjob); free(stmt); + glite_lbu_FreeStmt(&q); + return edg_wll_Error(ctx,NULL,NULL); } -- 1.8.2.3