-include Makefile.inc
+VPATH = ../src:../interface
+
default all: compile
CC=gcc
CFLAGS:= \
${DEBUG} \
+ -I${classads_prefix}/include \
-I${stagedir}/include -I${top_srcdir}/src -I. \
-I${top_srcdir}/interface
PLUGIN_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour}\
${classadslib} -lstdc++ ${expatlib} -lexpat\
-PLUGIN_LOBJS:=
-MACHINE_BASE_OBJS:=
+PLUGIN_LOBJS:=seqcode_aux.lo process_event.lo process_event_pbs.lo process_event_condor.lo lb_plugin.lo
+MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o
+
+PLUGIN_LIB=glite_lb_plugin.la
+MACHINE_LIB=libglite_lb_statemachine.a
default all: compile
-compile:
+compile: ${PLUGIN_LIB} ${MACHINE_LIB}
+
+${PLUGIN_LIB}: ${PLUGIN_LOBJS}
+ ${SOLINK} -o $@ ${PLUGIN_LOBJS} ${PLUGIN_LIBS}
+
+${MACHINE_LIB}: ${MACHINE_OBJS}
+ ar crv $@ ${MACHINE_OBJS}
+ -ranlib $@
+
doc:
stage: compile
- $(MAKE) install PREFIX=${stagedir} DOSTAGE=yes
+ $(MAKE) install PREFIX=${stagedir}
install:
+ -mkdir -p ${PREFIX}/lib ${PREFIX}/interface ${PREFIX}/include/glite/lb
+ install -m 644 ${MACHINE_LIB} ${PREFIX}/lib
+ ${INSTALL} -m 755 ${PLUGIN_LIB} ${PREFIX}/lib
+ install -m 644 ${top_srcdir}/interface/lb-job-attrs.xsd ${PREFIX}/interface
+ install -m 644 ${top_srcdir}/interface/lb-job-record.xsd ${PREFIX}/interface
+ install -m 644 ${top_srcdir}/interface/intjobstat.h ${PREFIX}/include/glite/lb
+
clean:
+ rm -rvf .libs *.o *.lo ${PLUGIN_LIB} ${MACHINE_LIB}
+ rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/
%.c: %.c.T
rm -f $@
%.lo: %.c
${COMPILE} -o $@ -c $<
-lb_plugin.lo: lb_plugin.c jp_job_attrs.h
+lb_plugin.lo: lb_plugin.c job_attrs.h
${COMPILE} -DPLUGIN_DEBUG -o $@ -c $<
+job_attrs.h: lb-job-attrs.xsd job_attrs.xsl
+ ${XSLTPROC} ../src/job_attrs.xsl $< >$@
+
--- /dev/null
+#ifndef GLITE_LB_INTJOBSTAT_H
+#define GLITE_LB_INTJOBSTAT_H
+
+#ident "$Header$"
+
+#include "glite/lb/jobstat.h"
+
+/*
+ * Internal representation of job state
+ * (includes edg_wll_JobStat API structure)
+ */
+
+/* convention: revision X.XX - DESCRIPTION */
+/* where X.XX is version from indent + 1 (version after commit) */
+/* and DESCRIPTION is short hit why version changed */
+
+#define INTSTAT_VERSION "revision 1.31 - proxy merge"
+
+
+// Internal error codes
+
+#define RET_FAIL 0
+#define RET_OK 1
+#define RET_FATAL RET_FAIL
+#define RET_SOON 2
+#define RET_LATE 3
+#define RET_BADSEQ 4
+#define RET_SUSPECT 5
+#define RET_IGNORE 6
+#define RET_BADBRANCH 7
+#define RET_GOODBRANCH 8
+#define RET_TOOOLD 9
+#define RET_UNREG 10
+#define RET_INTERNAL 100
+
+
+// shallow resubmission container - holds state of each branch
+// (useful when state restore is needed after ReallyRunning event)
+//
+typedef struct _branch_state {
+ int branch;
+ char *destination;
+ char *ce_node;
+ char *jdl;
+ /*!! if adding new field, modify also free_branch_state() */
+} branch_state;
+
+
+typedef struct _intJobStat {
+ edg_wll_JobStat pub;
+ int resubmit_type;
+ char *last_seqcode;
+ char *last_cancel_seqcode;
+ char *branch_tag_seqcode;
+ char *last_branch_seqcode;
+ char *deep_resubmit_seqcode;
+ branch_state *branch_states; // branch zero terminated array
+
+ struct timeval last_pbs_event_timestamp;
+ int pbs_reruning; // true if rerun event arrived
+
+ /*!! if adding new field, modify also destroy_intJobStat_extension() */
+ } intJobStat;
+
+typedef enum _edg_wll_PBSEventSource {
+ EDG_WLL_PBS_EVENT_SOURCE_UNDEF = 0,
+ EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER,
+ EDG_WLL_PBS_EVENT_SOURCE_SERVER,
+ EDG_WLL_PBS_EVENT_SOURCE_MOM,
+ EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING,
+ EDG_WLL_PBS_EVENT_SOURCE__LAST
+} edg_wll_PBSEventSource;
+
+typedef enum _edg_wll_CondorEventSource {
+ EDG_WLL_CONDOR_EVENT_SOURCE_UNDEF = 0,
+ EDG_WLL_CONDOR_EVENT_SOURCE_COLLECTOR,
+ EDG_WLL_CONDOR_EVENT_SOURCE_MASTER,
+ EDG_WLL_CONDOR_EVENT_SOURCE_MATCH,
+ EDG_WLL_CONDOR_EVENT_SOURCE_NEGOTIATOR,
+ EDG_WLL_CONDOR_EVENT_SOURCE_SCHED,
+ EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW,
+ EDG_WLL_CONDOR_EVENT_SOURCE_STARTER,
+ EDG_WLL_CONDOR_EVENT_SOURCE_START,
+ EDG_WLL_CONDOR_EVENT_SOURCE_JOBQUEUE,
+ EDG_WLL_CONDOR_EVENT_SOURCE__LAST
+} edg_wll_CondorEventSource;
+
+typedef enum _subjobClassCodes {
+ SUBJOB_CLASS_UNDEF = 0,
+ SUBJOB_CLASS_RUNNING,
+ SUBJOB_CLASS_DONE,
+ SUBJOB_CLASS_ABORTED,
+ SUBJOB_CLASS_CLEARED,
+ SUBJOB_CLASS_REST
+} subjobClassCodes;
+
+void destroy_intJobStat(intJobStat *);
+void destroy_intJobStat_extension(intJobStat *p);
+
+
+void init_intJobStat(intJobStat *p);
+
+
+#endif /* GLITE_LB_INTJOBSTAT_H */
--- /dev/null
+<?xml version="1.0"?>
+<!DOCTYPE schema PUBLIC "-//W3C//DTD XMLSCHEMA 200102//EN" "http://www.w3.org/2001/XMLSchema.dtd">
+
+<!-- $Header$ -->
+
+<xs:schema
+ xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:a="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+ targetNamespace="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+ version="1.0"
+ elementFormDefault="qualified"
+ attributeFormDefault="unqualified"
+>
+ <xs:simpleType name="statusType">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="SUBMITTED" />
+ <xs:enumeration value="WAITING" />
+ <xs:enumeration value="READY" />
+ <xs:enumeration value="SCHEDULED" />
+ <xs:enumeration value="RUNNING" />
+ <xs:enumeration value="DONE" />
+ <xs:enumeration value="CLEARED" />
+ <xs:enumeration value="ABORTED" />
+ <xs:enumeration value="CANCELLED" />
+ <xs:enumeration value="UNKNOWN" />
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:simpleType name="doneType">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="OK"/>
+ <xs:enumeration value="FAIL"/>
+ <xs:enumeration value="CANCEL"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:complexType name="historyStatusType">
+ <xs:complexContent>
+ <xs:restriction base="xs:anyType">
+ <xs:attribute name="name" type="a:statusType" use="required"/>
+ <xs:attribute name="timestamp" type="xs:dateTime" use="required"/>
+ <xs:attribute name="reason" type="xs:string" use="optional"/>
+ </xs:restriction>
+ </xs:complexContent>
+ </xs:complexType>
+
+ <xs:simpleType name="jobTypeType">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="SIMPLE"/>
+ <xs:enumeration value="DAG"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:complexType name="statusSequenceType">
+ <xs:sequence>
+ <xs:element name="status" type="a:historyStatusType" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="jobIdSequenceType">
+ <xs:sequence>
+ <xs:element name="jobId" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+
+
+ <xs:element name="jobId" type="xs:string" />
+ <xs:element name="LBserver" type="xs:string" />
+
+ <xs:element name="user" type="xs:string">
+ <xs:annotation> <xs:documentation>Job owner according to LB</xs:documentation> </xs:annotation>
+ </xs:element>
+
+ <!-- Parent jobId of DAG -->
+ <xs:element name="parent" type="xs:string" />
+
+ <!-- the following 6 elements have to be retrieved from JDL -->
+ <xs:element name="VO" type="xs:string" />
+ <xs:element name="aTag" type="xs:string" />
+ <xs:element name="rQType" type="xs:string" />
+ <xs:element name="eDuration" type="xs:duration" />
+ <xs:element name="eNodes" type="xs:int" />
+ <xs:element name="eProc" type="xs:int" />
+
+ <xs:element name="RB" type="xs:string" />
+ <xs:element name="CE" type="xs:string" />
+ <xs:element name="host" type="xs:string" /> <!-- worker node -->
+
+ <!-- Genevieve's spec has "site" and "country" here, but I've got no idea how to retrieve them -->
+ <xs:element name="UIHost" type="xs:string" />
+
+ <!-- not mandatory, currently LB hasn't got the info -->
+ <xs:element name="CPUTime" type="xs:duration" />
+ <xs:element name="NProc" type="xs:int" />
+
+ <!-- In LB the real final status is Cleared
+ However, Done, Aborted, or Cancelled should be reported here -->
+ <xs:element name="finalStatus" type="a:statusType" />
+ <xs:element name="finalStatusDate" type="xs:dateTime" />
+ <xs:element name="finalStatusReason" type="xs:string" />
+ <xs:element name="LRMSDoneStatus" type="a:doneType" />
+ <xs:element name="LRMSStatusReason" type="xs:string" />
+
+ <xs:element name="retryCount" type="xs:int" />
+ <xs:element name="additionalReason" type="xs:string" />
+
+ <xs:element name="jobType" type="a:jobTypeType" />
+ <xs:element name="nsubjobs" type="xs:int" />
+ <xs:element name="subjobs" type="a:jobIdSequenceType" />
+
+ <!-- timestamps of the state history of the last resubmission cycle,
+ i.e. it is guaranteed that each state apears here only once.
+ Cf. stateEnterTimes in LB JobStatus -->
+ <xs:element name="lastStatusHistory" type="a:statusSequenceType" />
+
+ <!-- timestamps of the whole state history, including all resubmission cycles -->
+ <xs:element name="fullStatusHistory" type="a:statusSequenceType" />
+
+ <xs:element name="JDL" type="xs:string" />
+
+<!-- No idea where to get these from:
+
+ ENVIRONMENT
+
+ testbed production, preproduction, specific
+ release middleware release (LCG, g-lite...)
+ version version of middleware
+ job_history_version in case of structure changes
+-->
+
+</xs:schema>
--- /dev/null
+<?xml version="1.0"?>
+<!DOCTYPE schema PUBLIC "-//W3C//DTD XMLSCHEMA 200102//EN" "http://www.w3.org/2001/XMLSchema.dtd">
+
+<!-- $Header$ -->
+
+<xs:schema
+ xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:jr="http://egee.cesnet.cz/en/Schema/LB/JobRecord"
+ xmlns:a="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+ targetNamespace="http://egee.cesnet.cz/en/Schema/LB/JobRecord"
+ version="1.0"
+ elementFormDefault="qualified"
+ attributeFormDefault="unqualified"
+>
+
+ <xs:import
+ namespace="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+ schemaLocation="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+ />
+
+ <xs:complexType name="jobRecordType">
+ <xs:sequence>
+
+ <xs:element ref="a:jobId" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:user" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:parent" minOccurs="0" maxOccurs="1"/>
+
+ <!-- the following 6 elements have to be retrieved from JDL -->
+ <xs:element ref="a:VO" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:aTag" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:rQType" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:eDuration" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:eNodes" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:eProc" minOccurs="0" maxOccurs="1"/>
+
+ <xs:element ref="a:RB" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:CE" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:host" minOccurs="0" maxOccurs="1"/> <!-- worker node -->
+
+ <!-- Genevieve's spec has "site" and "country" here, but I've got no idea how to retrieve them -->
+ <xs:element ref="a:UIHost" minOccurs="0" maxOccurs="1"/>
+
+ <!-- not mandatory, currently LB hasn't got the info -->
+ <xs:element ref="a:CPUTime" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:NProc" minOccurs="0" maxOccurs="1"/>
+
+ <!-- In LB the real final status is Cleared
+ However, Done, Aborted, or Cancelled should be reported here -->
+ <xs:element ref="a:finalStatus" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:finalStatusDate" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:finalStatusReason" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:LRMSDoneStatus" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:LRMSStatusReason" minOccurs="0" maxOccurs="1"/>
+
+ <xs:element ref="a:retryCount" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:additionalReason" minOccurs="0" maxOccurs="1"/>
+
+ <xs:element ref="a:jobType" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:nsubjobs" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:subjobs" minOccurs="0" maxOccurs="1"/>
+
+ <!-- timestamps of the state history of the last resubmission cycle,
+ i.e. it is guaranteed that each state apears here only once.
+ Cf. stateEnterTimes in LB JobStatus -->
+
+ <xs:element ref="a:lastStatusHistory" minOccurs="1" maxOccurs="1"/>
+
+ <!-- timestamps of the whole state history, including all resubmission cycles -->
+ <xs:element ref="a:fullStatusHistory" minOccurs="1" maxOccurs="1"/>
+
+ <xs:element ref="a:JDL" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+
+ <!-- No idea where to get these from:
+
+ ENVIRONMENT
+
+ testbed production, preproduction, specific
+ release middleware release (LCG, g-lite...)
+ version version of middleware
+ job_history_version in case of structure changes
+ -->
+
+ <xs:attribute name="jobid" type="xs:string" use="required"/>
+ </xs:complexType>
+
+
+ <xs:element name="jobRecord" type="jr:jobRecordType"/>
+
+</xs:schema>
--- /dev/null
+<?xml version="1.0"?>
+
+<xsl:stylesheet version="1.0"
+ xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
+ xmlns:xs="http://www.w3.org/2001/XMLSchema"
+>
+
+<xsl:output method="text"/>
+
+<xsl:template match="xs:schema">
+#ifndef GLITE_LB_JP_JOB_ATTR_H
+#define GLITE_LB_JP_JOB_ATTR_H
+#define GLITE_JP_LB_NS "http://egee.cesnet.cz/en/Schema/LB/Attributes"
+#define GLITE_JP_LB_JDL_NS "http://egee.cesnet.cz/en/Schema/LB/Attributes:JDL"
+ <xsl:apply-templates select="xs:element"/>
+#define GLITE_JP_LB_CLASSAD_NS "http://jdl"
+#endif
+</xsl:template>
+
+<xsl:template match="xs:element">
+/** <xsl:value-of select="xs:documentation/text()"/> */
+#define GLITE_JP_LB_<xsl:value-of select="@name"/> GLITE_JP_LB_NS ":<xsl:value-of select="@name"/>"
+</xsl:template>
+
+</xsl:stylesheet>
#include <cclassad.h>
+#include "glite/lbu/trio.h"
+
+#include "glite/lb/jobstat.h"
+#include "glite/lb/events.h"
+#include "glite/lb/producer.h"
+#include "glite/lb/events_parse.h"
+
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+/*
#include "glite/lb/context.h"
#include "glite/lb/jobstat.h"
#include "glite/lb/events.h"
#include "glite/lb/events_parse.h"
-#include "glite/lb/trio.h"
#include "glite/lb/producer.h"
#include "jobstat.h"
#include "get_events.h"
+*/
#include "glite/jp/types.h"
#include "glite/jp/context.h"
#include "glite/jp/attr.h"
#include "glite/jp/utils.h"
#include "glite/jp/known_attr.h"
-#include "jp_job_attrs.h"
+#include "job_attrs.h"
#define INITIAL_NUMBER_EVENTS 100
#define INITIAL_NUMBER_STATES EDG_WLL_NUMBER_OF_STATCODES
-#define LB_PLUGIN_NAMESPACE "urn:org.glite.lb"
/*typedef struct _lb_buffer_t {
char *buf;
#define check_strdup(s) ((s) ? strdup(s) : NULL)
extern int processEvent(intJobStat *, edg_wll_Event *, int, int, char **);
+static void edg_wll_SortPEvents(edg_wll_Event **);
static int lb_query(void *fpctx, void *handle, const char *attr, glite_jp_attrval_t **attrval);
static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle);
return retval;
}
*/
+
+
+static int compare_pevents_by_seq(const void *a, const void *b)
+{
+ const edg_wll_Event **e = (const edg_wll_Event **) a;
+ const edg_wll_Event **f = (const edg_wll_Event **) b;
+ return compare_events_by_seq(*e,*f);
+}
+
+
+static void edg_wll_SortPEvents(edg_wll_Event **e)
+{
+ edg_wll_Event **p;
+ int n;
+
+ if (!e) return;
+ p = e;
+ for (n=0; *p; n++) {
+ p++;
+ }
+ qsort(e,n,sizeof(*e),compare_pevents_by_seq);
+}
+
#include "glite/lb/producer.h"
#include "glite/lb/context-int.h"
-#include "jobstat.h"
-#include "lock.h"
+#include "glite/lbu/trio.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
/* TBD: share in whole logging or workload */
#ifdef __GNUC__
#endif
static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+int processEvent_Condor(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+
+int add_stringlist(char ***lptr, const char *new_item);
int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
{
js->pub.jobtype = EDG_WLL_STAT_CONDOR;
break;
default:
- asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype);
+ trio_asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype);
return RET_FAIL;
}
return processEvent_Condor(js,e,ev_seq,strict,errstring);
case -1: return RET_UNREG;
default:
- asprintf(errstring,"undefined job type %d",js->pub.jobtype);
+ trio_asprintf(errstring,"undefined job type %d",js->pub.jobtype);
return RET_FAIL;
}
}
static char* location_string(const char *source, const char *host, const char *instance)
{
char *ret;
- asprintf(&ret, "%s/%s/%s", source, host, instance);
+ trio_asprintf(&ret, "%s/%s/%s", source, host, instance);
return ret;
}
static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
{
edg_wll_JobStatCode old_state = js->pub.state;
- enum edg_wll_StatDone_code old_done_code = js->pub.done_code;
+/* unused enum edg_wll_StatDone_code old_done_code = js->pub.done_code; */
edg_wll_JobStatCode new_state = EDG_WLL_JOB_UNKNOWN;
int res = RET_OK,
fine_res = RET_OK;
rep(js->last_cancel_seqcode, e->any.seqcode);
} else {
-/* the first set of LM events (Accept, Transfer/* -> LRMS)
+/* the first set of LM events (Accept, Transfer/XX -> LRMS)
should not should shift the state (to Ready, Scheduled) but NOT to
update js->last_seqcode completely, in order not to block following
LRMS events which are likely to arrive later but should still affect
js->pub.network_server == NULL) {
char *inst;
inst = e->any.src_instance;
- asprintf(&js->pub.network_server, "%s%s%s",
+ trio_asprintf(&js->pub.network_server, "%s%s%s",
e->any.host,
inst != NULL ? ":" : " ",
inst != NULL ? inst : "");
destroy_intJobStat_extension(p);
memset(p, 0, sizeof(intJobStat));
}
+
+void init_intJobStat(intJobStat *p)
+{
+ memset(p, 0, sizeof(intJobStat));
+ p->pub.jobtype = -1 /* why? EDG_WLL_STAT_SIMPLE */;
+ p->pub.children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
+ p->pub.children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES;
+ p->pub.stateEnterTimes = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
+ p->pub.stateEnterTimes[0] = EDG_WLL_NUMBER_OF_STATCODES;
+ /* TBD: generate */
+}
+
#include "glite/lb/producer.h"
#include "glite/lb/context-int.h"
-#include "jobstat.h"
-#include "lock.h"
+#include "intjobstat.h"
+#include "seqcode_aux.h"
/* TBD: share in whole logging or workload */
#ifdef __GNUC__
#include "glite/lb/producer.h"
#include "glite/lb/context-int.h"
-#include "jobstat.h"
-#include "lock.h"
+#include "glite/lbu/trio.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
/* TBD: share in whole logging or workload */
#ifdef __GNUC__
if (USABLE_DATA(res)) {
char *new_resource_usage;
- asprintf(&new_resource_usage,"%s%s\t%s = %f [%s]",
+ trio_asprintf(&new_resource_usage,"%s%s\t%s = %f [%s]",
(js->pub.pbs_resource_usage) ? js->pub.pbs_resource_usage : "",
(js->pub.pbs_resource_usage) ? "\n": "",
e->PBSResourceUsage.name,
if (USABLE_DATA(res)) {
char *new_error_desc;
- asprintf(&new_error_desc,"%s%s\t%s",
+ trio_asprintf(&new_error_desc,"%s%s\t%s",
(js->pub.pbs_error_desc) ? js->pub.pbs_error_desc : "",
(js->pub.pbs_error_desc) ? "\n" : "",
e->PBSError.error_desc);
--- /dev/null
+#ident "$Header$"
+
+#include <stdio.h>
+#include <string.h>
+#include <syslog.h>
+#include <assert.h>
+
+#include "glite/lbu/trio.h"
+#include "glite/lb/context-int.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+
+/*
+#include <inttypes.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <stdarg.h>
+#include <regex.h>
+#include <syslog.h>
+
+#include "glite/jobid/cjobid.h"
+#include "glite/lbu/trio.h"
+#include "glite/lbu/db.h"
+#include "glite/lb/context-int.h"
+
+#include "store.h"
+#include "index.h"
+#include "jobstat.h"
+#include "get_events.h"
+*/
+
+int component_seqcode(const char *a, edg_wll_Source index)
+{
+ unsigned int c[EDG_WLL_SOURCE__LAST];
+ int res;
+ char sc[EDG_WLL_SEQ_SIZE];
+
+ if (!strstr(a, "LBS")) snprintf(sc,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
+ else snprintf(sc,EDG_WLL_SEQ_SIZE,"%s",a);
+
+ res = sscanf(sc, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+ &c[EDG_WLL_SOURCE_USER_INTERFACE],
+ &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+ &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ &c[EDG_WLL_SOURCE_BIG_HELPER],
+ &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ &c[EDG_WLL_SOURCE_LOG_MONITOR],
+ &c[EDG_WLL_SOURCE_LRMS],
+ &c[EDG_WLL_SOURCE_APPLICATION],
+ &c[EDG_WLL_SOURCE_LB_SERVER]);
+ if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", sc); */
+ fprintf(stderr, "unparsable sequence code %s\n", sc);
+ return -1;
+ }
+
+ return(c[index]);
+}
+
+char * set_component_seqcode(char *a,edg_wll_Source index,int val)
+{
+ unsigned int c[EDG_WLL_SOURCE__LAST];
+ int res;
+ char *ret;
+ char sc[EDG_WLL_SEQ_SIZE];
+
+ if (!strstr(a, "LBS")) snprintf(sc,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
+ else snprintf(sc,EDG_WLL_SEQ_SIZE,"%s",a);
+
+ res = sscanf(sc, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+ &c[EDG_WLL_SOURCE_USER_INTERFACE],
+ &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+ &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ &c[EDG_WLL_SOURCE_BIG_HELPER],
+ &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ &c[EDG_WLL_SOURCE_LOG_MONITOR],
+ &c[EDG_WLL_SOURCE_LRMS],
+ &c[EDG_WLL_SOURCE_APPLICATION],
+ &c[EDG_WLL_SOURCE_LB_SERVER]);
+ if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", sc); */
+ fprintf(stderr, "unparsable sequence code %s\n", sc);
+ return NULL;
+ }
+
+ c[index] = val;
+ trio_asprintf(&ret,"UI=%06d:NS=%010d:WM=%06d:BH=%010d:JSS=%06d"
+ ":LM=%06d:LRMS=%06d:APP=%06d:LBS=%06d",
+ c[EDG_WLL_SOURCE_USER_INTERFACE],
+ c[EDG_WLL_SOURCE_NETWORK_SERVER],
+ c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ c[EDG_WLL_SOURCE_BIG_HELPER],
+ c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ c[EDG_WLL_SOURCE_LOG_MONITOR],
+ c[EDG_WLL_SOURCE_LRMS],
+ c[EDG_WLL_SOURCE_APPLICATION],
+ c[EDG_WLL_SOURCE_LB_SERVER]);
+ return ret;
+}
+
+int before_deep_resubmission(const char *a, const char *b)
+{
+ if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) <
+ component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) )
+ return(1);
+ else
+ return(0);
+
+}
+
+int same_branch(const char *a, const char *b)
+{
+ if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) ==
+ component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) )
+ return(1);
+ else
+ return(0);
+}
+
+int edg_wll_compare_pbs_seq(const char *a,const char *b)
+{
+ char timestamp_a[14], pos_a[10], src_a;
+ char timestamp_b[14], pos_b[10], src_b;
+ int ev_code_a, ev_code_b;
+ int res;
+
+ res = sscanf(a,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_a, pos_a, &ev_code_a, &src_a);
+
+ if (res != 4) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", a); */
+ fprintf(stderr, "unparsable sequence code %s\n", a);
+ return -1;
+ }
+
+ res = sscanf(b,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_b, pos_b, &ev_code_b, &src_b);
+
+ if (res != 4) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", b); */
+ fprintf(stderr, "unparsable sequence code %s\n", b);
+ return -1;
+ }
+
+ /* wild card for PBSJobReg - this event should always come as firt one */
+ /* bacause it hold job.type, which is necessary for further event processing */
+ if (ev_code_a == EDG_WLL_EVENT_REGJOB) return -1;
+ if (ev_code_b == EDG_WLL_EVENT_REGJOB) return 1;
+
+ /* sort event w.t.r. to timestamps */
+ if ((res = strcmp(timestamp_a,timestamp_b)) != 0) {
+ return res;
+ }
+ else {
+ /* if timestamps equal, sort if w.t.r. to file position */
+ /* if you both events come from the same log file */
+ if (src_a == src_b) {
+ /* zero mean in fact duplicate events in log */
+ return strcmp(pos_a,pos_b);
+ }
+ /* if the events come from diffrent log files */
+ /* it is possible to prioritize some src log file */
+ else {
+ /* prioritize events from pbs_mom */
+ if (src_a == 'm') return 1;
+ if (src_b == 'm') return -1;
+
+ /* then prioritize events from pbs_server */
+ if (src_a == 's') return 1;
+ if (src_b == 's') return -1;
+
+ /* other priorities comes here... */
+ }
+ }
+
+ return 0;
+}
+
+edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) {
+ switch (pbs_seq_num[EDG_WLL_SEQ_PBS_SIZE - 2]) {
+ case 'c': return(EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER);
+ case 's': return(EDG_WLL_PBS_EVENT_SOURCE_SERVER);
+ case 'm': return(EDG_WLL_PBS_EVENT_SOURCE_MOM);
+ case 'a': return(EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING);
+ default: return(EDG_WLL_PBS_EVENT_SOURCE_UNDEF);
+ }
+}
+
+edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) {
+ switch (condor_seq_num[EDG_WLL_SEQ_CONDOR_SIZE - 2]) {
+ case 'L': return(EDG_WLL_CONDOR_EVENT_SOURCE_COLLECTOR);
+ case 'M': return(EDG_WLL_CONDOR_EVENT_SOURCE_MASTER);
+ case 'm': return(EDG_WLL_CONDOR_EVENT_SOURCE_MATCH);
+ case 'N': return(EDG_WLL_CONDOR_EVENT_SOURCE_NEGOTIATOR);
+ case 'C': return(EDG_WLL_CONDOR_EVENT_SOURCE_SCHED);
+ case 'H': return(EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW);
+ case 's': return(EDG_WLL_CONDOR_EVENT_SOURCE_STARTER);
+ case 'S': return(EDG_WLL_CONDOR_EVENT_SOURCE_START);
+ case 'j': return(EDG_WLL_CONDOR_EVENT_SOURCE_JOBQUEUE);
+ default: return(EDG_WLL_CONDOR_EVENT_SOURCE_UNDEF);
+ }
+}
+
+int edg_wll_compare_seq(const char *a, const char *b)
+{
+ unsigned int c[EDG_WLL_SOURCE__LAST];
+ unsigned int d[EDG_WLL_SOURCE__LAST];
+ int res, i;
+ char sca[EDG_WLL_SEQ_SIZE], scb[EDG_WLL_SEQ_SIZE];
+
+
+ if ( (strstr(a,"TIMESTAMP=") == a) && (strstr(b,"TIMESTAMP=") == b) )
+ return edg_wll_compare_pbs_seq(a,b);
+
+ if (!strstr(a, "LBS")) snprintf(sca,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
+ else snprintf(sca,EDG_WLL_SEQ_SIZE,"%s",a);
+ if (!strstr(b, "LBS")) snprintf(scb,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",b);
+ else snprintf(scb,EDG_WLL_SEQ_SIZE,"%s",b);
+
+ assert(EDG_WLL_SOURCE__LAST == 10);
+
+ res = sscanf(sca, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+ &c[EDG_WLL_SOURCE_USER_INTERFACE],
+ &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+ &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ &c[EDG_WLL_SOURCE_BIG_HELPER],
+ &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ &c[EDG_WLL_SOURCE_LOG_MONITOR],
+ &c[EDG_WLL_SOURCE_LRMS],
+ &c[EDG_WLL_SOURCE_APPLICATION],
+ &c[EDG_WLL_SOURCE_LB_SERVER]);
+ if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", sca); */
+ fprintf(stderr, "unparsable sequence code %s\n", sca);
+ return -1;
+ }
+
+ res = sscanf(scb, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+ &d[EDG_WLL_SOURCE_USER_INTERFACE],
+ &d[EDG_WLL_SOURCE_NETWORK_SERVER],
+ &d[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ &d[EDG_WLL_SOURCE_BIG_HELPER],
+ &d[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ &d[EDG_WLL_SOURCE_LOG_MONITOR],
+ &d[EDG_WLL_SOURCE_LRMS],
+ &d[EDG_WLL_SOURCE_APPLICATION],
+ &d[EDG_WLL_SOURCE_LB_SERVER]);
+ if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", scb); */
+ fprintf(stderr, "unparsable sequence code %s\n", scb);
+ return 1;
+ }
+
+ for (i = EDG_WLL_SOURCE_USER_INTERFACE ; i < EDG_WLL_SOURCE__LAST; i++) {
+ if (c[i] < d[i]) return -1;
+ if (c[i] > d[i]) return 1;
+ }
+
+ return 0;
+}
+
+
+int compare_events_by_seq(const void *a, const void *b)
+{
+ const edg_wll_Event *e = (edg_wll_Event *) a;
+ const edg_wll_Event *f = (edg_wll_Event *) b;
+ int ret;
+
+
+ ret = edg_wll_compare_seq(e->any.seqcode, f->any.seqcode);
+ if (ret) return ret;
+
+ if (e->any.timestamp.tv_sec < f->any.timestamp.tv_sec) return -1;
+ if (e->any.timestamp.tv_sec > f->any.timestamp.tv_sec) return 1;
+ if (e->any.timestamp.tv_usec < f->any.timestamp.tv_usec) return -1;
+ if (e->any.timestamp.tv_usec > f->any.timestamp.tv_usec) return 1;
+ return 0;
+}
+
--- /dev/null
+#ident "$Header$"
+
+int component_seqcode(const char *a, edg_wll_Source index);
+
+char * set_component_seqcode(char *a,edg_wll_Source index,int val);
+
+int before_deep_resubmission(const char *a, const char *b);
+
+int same_branch(const char *a, const char *b) ;
+
+int edg_wll_compare_pbs_seq(const char *a,const char *b);
+#define edg_wll_compare_condor_seq edg_wll_compare_pbs_seq
+
+edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) ;
+
+edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) ;
+
+int edg_wll_compare_seq(const char *a, const char *b);
+
+int compare_events_by_seq(const void *a, const void *b);
+
+