- new events: running/shutdown from machine, VM manager and cloud manager for be
tter state computation
- VM manager name in create event and usage in shutdown event
- VM states mapped to gLite states
REQUIRE
HOSTNAME
ID
+ VM_SOURCE
+ MANAGER
/;
VM_ID
VM_NAME
VM_PHY_HOSTNAME
+ VM_SYSTEM_HALTING
+ VM_MANAGER
/;
primary key (suffix_id)
) engine=innodb;
+create table job_connections (
+ jobid_from char(32) binary not null,
+ jobid_to char(32) binary not null,
+ jobtype int not null,
+ connection int not null,
+ primary key (jobid_from),
+ index (jobid_to)
+) engine=innodb;
int /* IN (boolean) */
);
+enum edg_wll_JobConnectionType {
+ EDG_WLL_JOBCONNECTION_UNKNOWN,
+ EDG_WLL_JOBCONNECTION_ACTIVE,
+ EDG_WLL_JOBCONNECTION_INACTIVE,
+ EDG_WLL_JOBCONNECTION_CANCELLED
+};
+
int db_store(edg_wll_Context, char *);
int db_parent_store(edg_wll_Context, edg_wll_Event *, intJobStat *);
int handle_request(edg_wll_Context,char *);
int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *register_to_JP);
int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *);
edg_wll_ErrorCode intJobStat_embryonic(edg_wll_Context ctx, glite_jobid_const_t jobid, const edg_wll_RegJobEvent *e, intJobStat *stat);
-
+int edg_wll_jobsconnection_create(edg_wll_Context ctx, glite_jobid_const_t jobid_from, glite_jobid_const_t jobid_to, enum edg_wll_StatJobtype jobtype, enum edg_wll_JobConnectionType connectiontype);
+int edg_wll_jobsconnection_modify(edg_wll_Context ctx, glite_jobid_const_t jobid_from, glite_jobid_const_t jobid_to, enum edg_wll_JobConnectionType);
+int edg_wll_jobsconnection_modifyall(edg_wll_Context ctx, glite_jobid_const_t jobid, enum edg_wll_JobConnectionType oldtype, enum edg_wll_JobConnectionType newtype);
+int edg_wll_jobsconnection_purgeall(edg_wll_Context ctx, glite_jobid_const_t jobid);
int edg_wll_delete_event(edg_wll_Context,const char *, int);
static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *oldstat, edg_wll_JobStat *newstat, int reg_to_JP);
-
+static int log_connectjob_event(edg_wll_Context ctx, glite_jobid_const_t jobif_from, glite_jobid_const_t jobid_to, enum edg_wll_StatJobtype jobtype);
+static int log_disconnectjob_event(edg_wll_Context ctx, glite_jobid_const_t jobif_from, glite_jobid_const_t jobid_to);
+static enum edg_wll_StatJobtype get_jobtype_from_connectjob(enum edg_wll_ConnectJobJobtype jobtype);
+static enum edg_wll_ConnectJobJobtype get_connectjobtype_from_jobtype(enum edg_wll_StatJobtype jobtype);
int
db_store(edg_wll_Context ctx, char *event)
reg_to_JP |= REG_SUBJOBS_TO_JP;
}
+ if (ev->any.type == EDG_WLL_EVENT_CONNECTJOB) {
+ edg_wlc_JobId jobId = NULL;
+ if (edg_wlc_JobIdParse(ev->connectJob.dest_jobid, &jobId)) {
+ edg_wll_SetError(ctx, EDG_WLL_ERROR_JOBID_FORMAT, ev->connectJob.dest_jobid);
+ goto err; /* should end here? */
+ }
+ // create connection A->B
+ edg_wll_jobsconnection_create(ctx, ev->any.jobId, jobId, get_jobtype_from_connectjob(ev->connectJob.jobtype), EDG_WLL_JOBCONNECTION_ACTIVE);
+ // and log event to create B->A (if A->B is not logged by LB)
+ if (ev->any.source != EDG_WLL_SOURCE_LB_SERVER)
+ log_connectjob_event(ctx, jobId, ev->any.jobId, newstat.jobtype);
+
+ edg_wlc_JobIdFree(jobId);
+ }
+
+ if (ev->any.type == EDG_WLL_EVENT_DISCONNECTJOB) {
+ edg_wlc_JobId jobId = NULL;
+ if (edg_wlc_JobIdParse(ev->connectJob.dest_jobid, &jobId)) {
+ edg_wll_SetError(ctx, EDG_WLL_ERROR_JOBID_FORMAT, ev->connectJob.dest_jobid);
+ goto err; /* should end here? */
+ }
+
+ // invalidate connection A->B
+ edg_wll_jobsconnection_modify(ctx, ev->any.jobId, jobId, EDG_WLL_JOBCONNECTION_CANCELLED);
+ // and log event to invalidate B->A (if A->B is not logged by LB)
+ if (ev->any.source != EDG_WLL_SOURCE_LB_SERVER)
+ log_disconnectjob_event(ctx, jobId, ev->any.jobId);
+ edg_wlc_JobIdFree(jobId);
+ }
+
+ // job is going to halt, set connections to inactive
+ if ((newstat.state != oldstat.state) && ((newstat.state == EDG_WLL_JOB_DONE) || (newstat.state == EDG_WLL_JOB_ABORTED) || (newstat.state == EDG_WLL_JOB_CANCELLED)))
+ edg_wll_jobsconnection_modifyall(ctx, ev->any.jobId, EDG_WLL_JOBCONNECTION_ACTIVE, EDG_WLL_JOBCONNECTION_INACTIVE);
+
commit:
rollback:;
} while (edg_wll_TransNeedRetry(ctx));
err:
return edg_wll_Error(ctx,NULL,NULL);
}
+
+static int log_connectjob_event(edg_wll_Context ctx, glite_jobid_const_t jobif_from, glite_jobid_const_t jobid_to, enum edg_wll_StatJobtype jobtype){
+ int ret = 0;
+ char *event_str;
+
+ edg_wll_Event *event = edg_wll_InitEvent(EDG_WLL_EVENT_CONNECTJOB);
+
+ event->any.priority = EDG_WLL_LOGFLAG_INTERNAL;
+ if (ctx->serverIdentity)
+ event->any.user = strdup(ctx->serverIdentity);
+ else
+ event->any.user = strdup("LBProxy");
+ event->any.seqcode = edg_wll_GetSequenceCode(ctx);
+ edg_wlc_JobIdDup(jobif_from, &(event->any.jobId));
+ gettimeofday(&event->any.timestamp,0);
+ event->any.source = EDG_WLL_SOURCE_LB_SERVER;
+
+ event->connectJob.dest_jobid = edg_wlc_JobIdUnparse(jobid_to);
+ event->connectJob.jobtype = get_connectjobtype_from_jobtype(jobtype);
+
+ event_str = edg_wll_UnparseEvent(ctx, event);
+
+ ret = db_store(ctx, event_str);
+
+ edg_wll_FreeEvent(event);
+ free(event);
+ free(event_str);
+
+ return ret;
+}
+
+static int log_disconnectjob_event(edg_wll_Context ctx, glite_jobid_const_t jobif_from, glite_jobid_const_t jobid_to){
+ int ret = 0;
+ char *event_str;
+
+ edg_wll_Event *event = edg_wll_InitEvent(EDG_WLL_EVENT_DISCONNECTJOB);
+
+ event->any.priority = EDG_WLL_LOGFLAG_INTERNAL;
+ if (ctx->serverIdentity)
+ event->any.user = strdup(ctx->serverIdentity);
+ else
+ event->any.user = strdup("LBProxy");
+ event->any.seqcode = edg_wll_GetSequenceCode(ctx);
+ edg_wlc_JobIdDup(jobif_from, &(event->any.jobId));
+ gettimeofday(&event->any.timestamp,0);
+ event->any.source = EDG_WLL_SOURCE_LB_SERVER;
+
+ event->connectJob.dest_jobid = edg_wlc_JobIdUnparse(jobid_to);
+
+ event_str = edg_wll_UnparseEvent(ctx, event);
+
+ ret = db_store(ctx, event_str);
+
+ edg_wll_FreeEvent(event);
+ free(event);
+ free(event_str);
+
+ return ret;
+}
+
+
+/* this is ugly, however, shift to match second structure could make incompatibility when any of them is permuted */
+static enum edg_wll_StatJobtype get_jobtype_from_connectjob(enum edg_wll_ConnectJobJobtype jobtype){
+ enum edg_wll_StatJobtype ret;
+
+ switch(jobtype){
+ case EDG_WLL_CONNECTJOB_SIMPLE:
+ ret = EDG_WLL_STAT_SIMPLE;
+ break;
+ case EDG_WLL_CONNECTJOB_DAG:
+ ret = EDG_WLL_STAT_DAG;
+ break;
+ case EDG_WLL_CONNECTJOB_PARTITIONABLE:
+ ret = EDG_WLL_STAT__PARTITIONABLE_UNUSED; /* for completeness */
+ break;
+ case EDG_WLL_CONNECTJOB_PARTITIONED:
+ ret = EDG_WLL_STAT__PARTITIONED_UNUSED; /* for completeness */
+ break;
+ case EDG_WLL_CONNECTJOB_COLLECTION:
+ ret = EDG_WLL_STAT_COLLECTION;
+ break;
+ case EDG_WLL_CONNECTJOB_PBS:
+ ret = EDG_WLL_STAT_PBS;
+ break;
+ case EDG_WLL_CONNECTJOB_CONDOR:
+ ret = EDG_WLL_STAT_CONDOR;
+ break;
+ case EDG_WLL_CONNECTJOB_CREAM:
+ ret = EDG_WLL_STAT_CREAM;
+ break;
+ case EDG_WLL_CONNECTJOB_FILE_TRANSFER_COLLECTION:
+ ret = EDG_WLL_STAT_FILE_TRANSFER_COLLECTION;
+ break;
+ case EDG_WLL_CONNECTJOB_FILE_TRANSFER:
+ ret = EDG_WLL_STAT_FILE_TRANSFER;
+ break;
+ case EDG_WLL_CONNECTJOB_VIRTUAL_MACHINE:
+ ret = EDG_WLL_STAT_VIRTUAL_MACHINE;
+ break;
+ default :
+ break;
+ }
+
+ return ret;
+}
+
+static enum edg_wll_ConnectJobJobtype get_connectjobtype_from_jobtype(enum edg_wll_StatJobtype jobtype){
+ enum edg_wll_ConnectJobJobtype ret;
+
+ switch(jobtype){
+ case EDG_WLL_STAT_SIMPLE:
+ ret = EDG_WLL_CONNECTJOB_SIMPLE;
+ break;
+ case EDG_WLL_STAT_DAG:
+ ret = EDG_WLL_CONNECTJOB_DAG;
+ break;
+ case EDG_WLL_STAT__PARTITIONABLE_UNUSED:
+ ret = EDG_WLL_CONNECTJOB_PARTITIONABLE; /* for completeness */
+ break;
+ case EDG_WLL_STAT__PARTITIONED_UNUSED:
+ ret = EDG_WLL_CONNECTJOB_PARTITIONED; /* for completeness */
+ break;
+ case EDG_WLL_STAT_COLLECTION:
+ ret = EDG_WLL_CONNECTJOB_COLLECTION;
+ break;
+ case EDG_WLL_STAT_PBS:
+ ret = EDG_WLL_CONNECTJOB_PBS;
+ break;
+ case EDG_WLL_STAT_CONDOR:
+ ret = EDG_WLL_CONNECTJOB_CONDOR;
+ break;
+ case EDG_WLL_STAT_CREAM:
+ ret = EDG_WLL_CONNECTJOB_CREAM;
+ break;
+ case EDG_WLL_STAT_FILE_TRANSFER_COLLECTION:
+ ret = EDG_WLL_CONNECTJOB_FILE_TRANSFER_COLLECTION;
+ break;
+ case EDG_WLL_STAT_FILE_TRANSFER:
+ ret = EDG_WLL_CONNECTJOB_FILE_TRANSFER;
+ break;
+ case EDG_WLL_STAT_VIRTUAL_MACHINE:
+ ret = EDG_WLL_CONNECTJOB_VIRTUAL_MACHINE;
+ break;
+ default :
+ break;
+ }
+
+ return ret;
+}
if ( purge )
{
+ edg_wll_jobsconnection_purgeall(ctx, job);
+ }
+
+ if ( purge )
+ {
trio_asprintf(&stmt,"delete from jobs where jobid = '%|Ss'",dbjob);
glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt);
if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback;
ev->userTag.name[i] = tolower(ev->userTag.name[i]);
}
}
+
+int edg_wll_jobsconnection_create(edg_wll_Context ctx, glite_jobid_const_t jobid_from, glite_jobid_const_t jobid_to, enum edg_wll_StatJobtype jobtype, enum edg_wll_JobConnectionType connectiontype){
+ char *stmt;
+
+ char *jobid_from_u, *jobid_to_u;
+ jobid_from_u = edg_wlc_JobIdGetUnique(jobid_from);
+ jobid_to_u = edg_wlc_JobIdGetUnique(jobid_to);
+
+ // clear previous connection of same jobs (e.g. resubmition on same machine)
+ trio_asprintf(&stmt,
+ "delete from job_connections where jobid_from='%s' and jobid_to='%s'",
+ jobid_from_u, jobid_to_u
+ );
+
+ edg_wll_ExecSQL(ctx,stmt,NULL);
+
+ free(stmt);
+
+ trio_asprintf(&stmt,
+ "insert into job_connections (jobid_from, jobid_to, jobtype, connection) values ('%s', '%s', %i, %i)",
+ jobid_from_u, jobid_to_u, jobtype, connectiontype
+ );
+ glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt);
+ free(jobid_from_u);
+ free(jobid_to_u);
+
+ edg_wll_ExecSQL(ctx,stmt,NULL);
+
+ free(stmt);
+
+ return edg_wll_Error(ctx, NULL, NULL);
+}
+
+int edg_wll_jobsconnection_modify(edg_wll_Context ctx, glite_jobid_const_t jobid_from, glite_jobid_const_t jobid_to, enum edg_wll_JobConnectionType connectiontype){
+ char *stmt;
+
+ char *jobid_from_u, *jobid_to_u;
+ jobid_from_u = edg_wlc_JobIdGetUnique(jobid_from);
+ jobid_to_u = edg_wlc_JobIdGetUnique(jobid_to);
+
+ trio_asprintf(&stmt,
+ "update job_connections set connection=%i where jobid_from='%s' and jobid_to='%s'",
+ connectiontype, jobid_from_u, jobid_to_u
+ );
+ glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt);
+ free(jobid_from_u);
+ free(jobid_to_u);
+
+ edg_wll_ExecSQL(ctx,stmt,NULL);
+
+ free(stmt);
+
+ return edg_wll_Error(ctx, NULL, NULL);
+}
+
+int edg_wll_jobsconnection_modifyall(edg_wll_Context ctx, glite_jobid_const_t jobid, enum edg_wll_JobConnectionType oldtype, enum edg_wll_JobConnectionType newtype){
+ char *stmt;
+
+ char *jobid_u;
+ jobid_u = edg_wlc_JobIdGetUnique(jobid);
+
+ trio_asprintf(&stmt,
+ "update job_connections set connection=%i where connection=%i and (jobid_from='%s' or jobid_to='%s')",
+ newtype, oldtype, jobid_u, jobid_u
+ );
+ glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt);
+ free(jobid_u);
+
+ edg_wll_ExecSQL(ctx,stmt,NULL);
+
+ free(stmt);
+
+ return edg_wll_Error(ctx, NULL, NULL);
+}
+
+
+int edg_wll_jobsconnection_purgeall(edg_wll_Context ctx, glite_jobid_const_t jobid){
+ char *stmt;
+
+ char *jobid_u;
+ jobid_u = edg_wlc_JobIdGetUnique(jobid);
+
+ trio_asprintf(&stmt,
+ "delete from job_connections where jobid_from='%s' or jobid_to='%s'",
+ jobid_u, jobid_u
+ );
+ glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, stmt);
+
+ free(jobid_u);
+
+ edg_wll_ExecSQL(ctx,stmt,NULL);
+
+ free(stmt);
+
+ return edg_wll_Error(ctx, NULL, NULL);
+}
+
case EDG_WLL_EVENT_REGJOB:
if (USABLE(res)) {
js->pub.vm_state = EDG_WLL_STAT_VM_PENDING;
+ js->pub.state = EDG_WLL_JOB_SUBMITTED;
}
break;
case EDG_WLL_EVENT_VMCREATE:
break;
case EDG_WLL_EVENT_VMRUNNING:
if (USABLE(res)) {
- js->pub.vm_state = EDG_WLL_STAT_VM_RUNNING;
+ switch( e->vMRunning.vm_source){
+ case EDG_WLL_VMRUNNING_CM:
+ case EDG_WLL_VMRUNNING_VMM:
+ js->pub.vm_state = EDG_WLL_STAT_VM_RUNNING;
+ break;
+ case EDG_WLL_VMRUNNING_MACHINE:
+ js->pub.vm_state = EDG_WLL_STAT_VM_REALLY_RUNNING;
+ break;
+ default:
+ break;
+ }
+ js->pub.state = EDG_WLL_JOB_RUNNING;
}
break;
case EDG_WLL_EVENT_VMSHUTDOWN:
if (USABLE(res)) {
- js->pub.vm_state = EDG_WLL_STAT_VM_SHUTDOWN;
+ switch (e->vMShutdown.vm_source){
+ case EDG_WLL_VMSHUTDOWN_CM:
+ js->pub.vm_state = EDG_WLL_STAT_VM_SHUTDOWN;
+ break;
+ case EDG_WLL_VMSHUTDOWN_VMM:
+ js->pub.vm_system_halting = 1;
+ break;
+ case EDG_WLL_VMSHUTDOWN_MACHINE:
+ js->pub.vm_system_halting = 1;
+ if (js->pub.vm_state == EDG_WLL_STAT_VM_REALLY_RUNNING)
+ js->pub.vm_state = EDG_WLL_STAT_VM_RUNNING;
+ break;
+ }
}
+ if (USABLE_DATA(res))
+ if (e->vMDone.usage)
+ rep_cond(js->pub.vm_usage, e->vMDone.usage);
break;
case EDG_WLL_EVENT_VMSTOP:
if (USABLE(res)) {
case EDG_WLL_EVENT_VMRESUME:
if (USABLE(res)) {
js->pub.vm_state = EDG_WLL_STAT_VM_PENDING;
+ js->pub.state = EDG_WLL_JOB_WAITING;
+ js->pub.vm_system_halting = 0;
//XXX clear hostname here?
}
break;
case EDG_WLL_VMDONE_OK:
case EDG_WLL_VMDONE_DELETE:
js->pub.vm_state = EDG_WLL_STAT_VM_DONE;
+ js->pub.state = EDG_WLL_JOB_DONE;
break;
case EDG_WLL_VMDONE_FAILURE:
js->pub.vm_state = EDG_WLL_STAT_VM_FAILURE;
+ js->pub.state = EDG_WLL_JOB_DONE
+;
break;
case EDG_WLL_VMDONE_STATUS_CODE_UNDEFINED:
break;
@type TakePayloadOwnership Take over ownership of actual job payload
+@type ConnectJob Connect two jobs
+ string dest_jobid ID of connected job
+ int jobtype Type of the job being connected (SIMPLE, DAG, PARTITIONABLE etc.).
+ _code_ SIMPLE The job is simple job.
+ _code_ DAG The job is dag (containing static set of subjobs).
+ _code_ PARTITIONABLE The job is partitionable (may become partitioned).
+ _code_ PARTITIONED The job is partitioned (dynamically created dag).
+ _code_ COLLECTION The job is collection (containing static set of subjobs).
+ _code_ PBS PBS job
+ _code_ CONDOR Condor job
+ _code_ CREAM CREAM job
+ _code_ FILE_TRANSFER_COLLECTION File transfer collection
+ _code_ FILE_TRANSFER File transfer
+ _code_ VIRTUAL_MACHINE Virtual machine
+
+@type DisconnectJob Disconnect two jobs
+ string dest_jobid ID of connected job
+
+
@flesh PBS
@type PBSInternalStateChange Change of internal PBS job state
string name machine name
string owner machine owner
string hostname hostname of VM
+ string manager VM manager
@type VMHost
string hostname hostname of physical machine of VM
@type VMRunning VM is running
+ int vm_source
+ _code_ CM cloud manager
+ _code_ VMM VM manager
+ _code_ MACHINE system
@type VMShutdown
+ int vm_source
+ _code_ CM cloud manager
+ _code_ VMM VM manager
+ _code_ MACHINE system
+ string usage
+ _optional_
@type VMStop
int vm_state VM job state
_null_ -1
- _code_ VM_PENDING pending
- _code_ VM_BOOT boot
- _code_ VM_RUNNING running
- _code_ VM_SHUTDOWN shutdown
- _code_ VM_STOPPED stopped
- _code_ VM_DONE done
- _code_ VM_FAILURE failure
-string vm_image VM image
-string vm_require VM requirement on physical machine
-string vm_usage VM usage
-string vm_hostname VM hostname
-string vm_machine physical machine on which VM runs
-string vm_id VM id
-string vm_name VM name
-string vm_phy_hostname hostname of physical machine running VM
+ _code_ VM_PENDING pending
+ _code_ VM_BOOT boot
+ _code_ VM_RUNNING running
+ _code_ VM_REALLY-RUNNING really running
+ _code_ VM_SHUTDOWN shutdown
+ _code_ VM_STOPPED stopped
+ _code_ VM_DONE done
+ _code_ VM_FAILURE failure
+string vm_image VM image
+string vm_require VM requirement on physical machine
+string vm_usage VM usage
+string vm_hostname VM hostname
+string vm_machine physical machine on which VM runs
+string vm_id VM id
+string vm_name VM name
+string vm_phy_hostname hostname of physical machine running VM
+bool vm_system_halting system is shooting down
+string vm_manager name of the VM manager
_pad_ 20