builds but nothing else...
authorJan Pospíšil <honik@ntc.zcu.cz>
Fri, 1 Feb 2008 17:34:50 +0000 (17:34 +0000)
committerJan Pospíšil <honik@ntc.zcu.cz>
Fri, 1 Feb 2008 17:34:50 +0000 (17:34 +0000)
org.glite.lb.state-machine/Makefile
org.glite.lb.state-machine/interface/intjobstat.h [new file with mode: 0644]
org.glite.lb.state-machine/interface/lb-job-attrs.xsd [new file with mode: 0644]
org.glite.lb.state-machine/interface/lb-job-record.xsd [new file with mode: 0644]
org.glite.lb.state-machine/src/job_attrs.xsl [new file with mode: 0644]
org.glite.lb.state-machine/src/lb_plugin.c
org.glite.lb.state-machine/src/process_event.c
org.glite.lb.state-machine/src/process_event_condor.c
org.glite.lb.state-machine/src/process_event_pbs.c
org.glite.lb.state-machine/src/seqcode_aux.c [new file with mode: 0644]
org.glite.lb.state-machine/src/seqcode_aux.h [new file with mode: 0644]

index dfecd47..680fa07 100644 (file)
@@ -10,6 +10,8 @@ PREFIX=/opt/glite
 
 -include Makefile.inc
 
+VPATH = ../src:../interface
+
 default all: compile
 
 CC=gcc
@@ -22,6 +24,7 @@ DEBUG:=-g -O0 -Wall
 
 CFLAGS:= \
        ${DEBUG} \
+       -I${classads_prefix}/include \
        -I${stagedir}/include -I${top_srcdir}/src -I. \
        -I${top_srcdir}/interface 
 
@@ -38,21 +41,41 @@ COMMON_LIBS:= -L${stagedir}/lib  -lglite_lb_common_${nothrflavour} -lglite_secur
 PLUGIN_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour}\
        ${classadslib} -lstdc++ ${expatlib} -lexpat\
 
-PLUGIN_LOBJS:=
-MACHINE_BASE_OBJS:=
+PLUGIN_LOBJS:=seqcode_aux.lo process_event.lo process_event_pbs.lo process_event_condor.lo lb_plugin.lo
+MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o
+
+PLUGIN_LIB=glite_lb_plugin.la
+MACHINE_LIB=libglite_lb_statemachine.a
 
 default all: compile
 
-compile: 
+compile: ${PLUGIN_LIB} ${MACHINE_LIB}
+
+${PLUGIN_LIB}: ${PLUGIN_LOBJS}
+       ${SOLINK} -o $@ ${PLUGIN_LOBJS} ${PLUGIN_LIBS}
+
+${MACHINE_LIB}:  ${MACHINE_OBJS}
+       ar crv $@ ${MACHINE_OBJS}
+       -ranlib $@
+
 
 doc:
 
 stage: compile
-       $(MAKE) install PREFIX=${stagedir} DOSTAGE=yes
+       $(MAKE) install PREFIX=${stagedir}
 
 install:
+       -mkdir -p ${PREFIX}/lib ${PREFIX}/interface ${PREFIX}/include/glite/lb
+       install -m 644 ${MACHINE_LIB} ${PREFIX}/lib 
+       ${INSTALL} -m 755 ${PLUGIN_LIB} ${PREFIX}/lib
+       install -m 644 ${top_srcdir}/interface/lb-job-attrs.xsd ${PREFIX}/interface
+       install -m 644 ${top_srcdir}/interface/lb-job-record.xsd ${PREFIX}/interface
+       install -m 644 ${top_srcdir}/interface/intjobstat.h ${PREFIX}/include/glite/lb
+
 
 clean:
+       rm -rvf .libs *.o *.lo ${PLUGIN_LIB} ${MACHINE_LIB}
+       rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/
 
 %.c: %.c.T
        rm -f $@
@@ -70,6 +93,9 @@ clean:
 %.lo: %.c
        ${COMPILE} -o $@ -c $<
 
-lb_plugin.lo: lb_plugin.c jp_job_attrs.h
+lb_plugin.lo: lb_plugin.c job_attrs.h
        ${COMPILE} -DPLUGIN_DEBUG -o $@ -c $<
 
+job_attrs.h: lb-job-attrs.xsd job_attrs.xsl
+       ${XSLTPROC} ../src/job_attrs.xsl $< >$@
+
diff --git a/org.glite.lb.state-machine/interface/intjobstat.h b/org.glite.lb.state-machine/interface/intjobstat.h
new file mode 100644 (file)
index 0000000..45ad4a6
--- /dev/null
@@ -0,0 +1,104 @@
+#ifndef GLITE_LB_INTJOBSTAT_H
+#define GLITE_LB_INTJOBSTAT_H
+
+#ident "$Header$"
+
+#include "glite/lb/jobstat.h"
+
+/*
+ * Internal representation of job state
+ * (includes edg_wll_JobStat API structure)
+ */
+
+/* convention: revision X.XX - DESCRIPTION                     */
+/* where X.XX is version from indent + 1 (version after commit) */
+/* and DESCRIPTION is short hit why version changed            */
+
+#define INTSTAT_VERSION "revision 1.31 - proxy merge"
+
+
+// Internal error codes 
+
+#define RET_FAIL        0
+#define RET_OK          1
+#define RET_FATAL       RET_FAIL
+#define RET_SOON        2
+#define RET_LATE        3
+#define RET_BADSEQ      4
+#define RET_SUSPECT     5
+#define RET_IGNORE      6
+#define RET_BADBRANCH   7
+#define RET_GOODBRANCH  8
+#define RET_TOOOLD      9
+#define RET_UNREG      10
+#define RET_INTERNAL    100
+
+
+// shallow resubmission container - holds state of each branch
+// (useful when state restore is needed after ReallyRunning event)
+//
+typedef struct _branch_state {
+       int     branch;
+       char    *destination;
+       char    *ce_node;
+       char    *jdl;
+       /*!! if adding new field, modify also free_branch_state() */
+} branch_state;
+
+
+typedef struct _intJobStat {
+               edg_wll_JobStat pub;
+               int             resubmit_type;
+               char            *last_seqcode;
+               char            *last_cancel_seqcode;
+               char            *branch_tag_seqcode;            
+               char            *last_branch_seqcode;
+               char            *deep_resubmit_seqcode;
+               branch_state    *branch_states;         // branch zero terminated array
+
+               struct timeval  last_pbs_event_timestamp;
+               int             pbs_reruning;           // true if rerun event arrived
+
+               /*!! if adding new field, modify also destroy_intJobStat_extension() */
+       } intJobStat;
+
+typedef enum _edg_wll_PBSEventSource {
+       EDG_WLL_PBS_EVENT_SOURCE_UNDEF = 0,
+       EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER,
+       EDG_WLL_PBS_EVENT_SOURCE_SERVER,
+       EDG_WLL_PBS_EVENT_SOURCE_MOM,
+       EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING,
+       EDG_WLL_PBS_EVENT_SOURCE__LAST
+} edg_wll_PBSEventSource;
+
+typedef enum _edg_wll_CondorEventSource {
+       EDG_WLL_CONDOR_EVENT_SOURCE_UNDEF = 0,
+       EDG_WLL_CONDOR_EVENT_SOURCE_COLLECTOR,
+       EDG_WLL_CONDOR_EVENT_SOURCE_MASTER,
+       EDG_WLL_CONDOR_EVENT_SOURCE_MATCH,
+       EDG_WLL_CONDOR_EVENT_SOURCE_NEGOTIATOR,
+       EDG_WLL_CONDOR_EVENT_SOURCE_SCHED,
+       EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW,
+       EDG_WLL_CONDOR_EVENT_SOURCE_STARTER,
+       EDG_WLL_CONDOR_EVENT_SOURCE_START,
+       EDG_WLL_CONDOR_EVENT_SOURCE_JOBQUEUE,
+       EDG_WLL_CONDOR_EVENT_SOURCE__LAST
+} edg_wll_CondorEventSource;
+
+typedef enum _subjobClassCodes {
+       SUBJOB_CLASS_UNDEF = 0,
+       SUBJOB_CLASS_RUNNING,
+       SUBJOB_CLASS_DONE,
+       SUBJOB_CLASS_ABORTED,
+       SUBJOB_CLASS_CLEARED,
+       SUBJOB_CLASS_REST
+} subjobClassCodes;
+
+void destroy_intJobStat(intJobStat *);
+void destroy_intJobStat_extension(intJobStat *p);
+
+
+void init_intJobStat(intJobStat *p);
+
+
+#endif /* GLITE_LB_INTJOBSTAT_H */
diff --git a/org.glite.lb.state-machine/interface/lb-job-attrs.xsd b/org.glite.lb.state-machine/interface/lb-job-attrs.xsd
new file mode 100644 (file)
index 0000000..a2a68bb
--- /dev/null
@@ -0,0 +1,132 @@
+<?xml version="1.0"?>
+<!DOCTYPE schema PUBLIC "-//W3C//DTD XMLSCHEMA 200102//EN" "http://www.w3.org/2001/XMLSchema.dtd">
+
+<!-- $Header$ -->
+
+<xs:schema 
+       xmlns:xs="http://www.w3.org/2001/XMLSchema"
+       xmlns:a="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+       targetNamespace="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+       version="1.0"
+       elementFormDefault="qualified"
+       attributeFormDefault="unqualified"
+>
+       <xs:simpleType name="statusType">
+               <xs:restriction base="xs:string">
+                       <xs:enumeration value="SUBMITTED" />
+                       <xs:enumeration value="WAITING" />
+                       <xs:enumeration value="READY" />
+                       <xs:enumeration value="SCHEDULED" />
+                       <xs:enumeration value="RUNNING" />
+                       <xs:enumeration value="DONE" />
+                       <xs:enumeration value="CLEARED" />
+                       <xs:enumeration value="ABORTED" />
+                       <xs:enumeration value="CANCELLED" />
+                       <xs:enumeration value="UNKNOWN" />
+               </xs:restriction>
+       </xs:simpleType>
+
+       <xs:simpleType name="doneType">
+               <xs:restriction base="xs:string">
+                       <xs:enumeration value="OK"/>
+                       <xs:enumeration value="FAIL"/>
+                       <xs:enumeration value="CANCEL"/>
+               </xs:restriction>
+       </xs:simpleType>
+
+       <xs:complexType name="historyStatusType">
+               <xs:complexContent>
+                       <xs:restriction base="xs:anyType">
+                               <xs:attribute name="name" type="a:statusType" use="required"/>
+                               <xs:attribute name="timestamp" type="xs:dateTime" use="required"/>
+                               <xs:attribute name="reason" type="xs:string" use="optional"/>
+                       </xs:restriction>
+               </xs:complexContent>
+       </xs:complexType>
+
+       <xs:simpleType name="jobTypeType">
+               <xs:restriction base="xs:string">
+                       <xs:enumeration value="SIMPLE"/>
+                       <xs:enumeration value="DAG"/>
+               </xs:restriction>
+       </xs:simpleType>
+
+       <xs:complexType name="statusSequenceType">
+               <xs:sequence>
+                       <xs:element name="status" type="a:historyStatusType" minOccurs="0" maxOccurs="unbounded"/>
+               </xs:sequence>
+       </xs:complexType>
+
+       <xs:complexType name="jobIdSequenceType">
+               <xs:sequence>
+                       <xs:element name="jobId" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+               </xs:sequence>
+       </xs:complexType>
+
+
+
+       <xs:element name="jobId" type="xs:string" />
+       <xs:element name="LBserver" type="xs:string" />
+
+       <xs:element name="user" type="xs:string">
+               <xs:annotation> <xs:documentation>Job owner according to LB</xs:documentation> </xs:annotation>
+       </xs:element>
+
+       <!-- Parent jobId of DAG -->
+       <xs:element name="parent" type="xs:string" />
+
+       <!-- the following 6 elements have to be retrieved from JDL -->
+       <xs:element name="VO" type="xs:string" />
+       <xs:element name="aTag" type="xs:string" />
+       <xs:element name="rQType" type="xs:string" />
+       <xs:element name="eDuration" type="xs:duration" />
+       <xs:element name="eNodes" type="xs:int" />
+       <xs:element name="eProc" type="xs:int" />
+
+       <xs:element name="RB" type="xs:string" />
+       <xs:element name="CE" type="xs:string" />
+       <xs:element name="host" type="xs:string" /> <!-- worker node -->
+
+       <!-- Genevieve's spec has "site" and "country" here, but I've got no idea how to retrieve them -->
+       <xs:element name="UIHost" type="xs:string" />
+
+       <!-- not mandatory, currently LB hasn't got the info -->
+       <xs:element name="CPUTime" type="xs:duration" />
+       <xs:element name="NProc" type="xs:int" />
+
+       <!-- In LB the real final status is Cleared
+               However, Done, Aborted, or Cancelled should be reported here -->
+       <xs:element name="finalStatus" type="a:statusType" />
+       <xs:element name="finalStatusDate" type="xs:dateTime" />
+       <xs:element name="finalStatusReason" type="xs:string" />
+       <xs:element name="LRMSDoneStatus" type="a:doneType" />
+       <xs:element name="LRMSStatusReason" type="xs:string" />
+
+       <xs:element name="retryCount" type="xs:int" />
+       <xs:element name="additionalReason" type="xs:string" />
+
+       <xs:element name="jobType" type="a:jobTypeType" />
+       <xs:element name="nsubjobs" type="xs:int" />
+       <xs:element name="subjobs" type="a:jobIdSequenceType" />
+
+       <!-- timestamps of the state history of the last resubmission cycle,
+               i.e. it is guaranteed that each state apears here only once.
+               Cf. stateEnterTimes in LB JobStatus -->
+       <xs:element name="lastStatusHistory" type="a:statusSequenceType" />
+
+       <!-- timestamps of the whole state history, including all resubmission cycles -->
+       <xs:element name="fullStatusHistory" type="a:statusSequenceType" />
+
+       <xs:element name="JDL" type="xs:string" />
+
+<!-- No idea where to get these from:
+
+   ENVIRONMENT
+
+   testbed                         production, preproduction, specific
+   release                         middleware release (LCG, g-lite...)
+   version                         version of middleware
+   job_history_version          in case of structure changes
+-->
+
+</xs:schema>
diff --git a/org.glite.lb.state-machine/interface/lb-job-record.xsd b/org.glite.lb.state-machine/interface/lb-job-record.xsd
new file mode 100644 (file)
index 0000000..2ad2683
--- /dev/null
@@ -0,0 +1,90 @@
+<?xml version="1.0"?>
+<!DOCTYPE schema PUBLIC "-//W3C//DTD XMLSCHEMA 200102//EN" "http://www.w3.org/2001/XMLSchema.dtd">
+
+<!-- $Header$ -->
+
+<xs:schema 
+       xmlns:xs="http://www.w3.org/2001/XMLSchema"
+       xmlns:jr="http://egee.cesnet.cz/en/Schema/LB/JobRecord"
+       xmlns:a="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+       targetNamespace="http://egee.cesnet.cz/en/Schema/LB/JobRecord"
+       version="1.0"
+       elementFormDefault="qualified"
+       attributeFormDefault="unqualified"
+>
+
+  <xs:import
+               namespace="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+               schemaLocation="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+       />
+
+       <xs:complexType name="jobRecordType">
+               <xs:sequence>
+
+                       <xs:element ref="a:jobId" minOccurs="1" maxOccurs="1"/>
+                       <xs:element ref="a:user" minOccurs="1" maxOccurs="1"/>
+                       <xs:element ref="a:parent" minOccurs="0" maxOccurs="1"/>
+
+       <!-- the following 6 elements have to be retrieved from JDL -->
+                       <xs:element ref="a:VO" minOccurs="0" maxOccurs="1"/>
+                       <xs:element ref="a:aTag" minOccurs="0" maxOccurs="1"/>
+                       <xs:element ref="a:rQType" minOccurs="0" maxOccurs="1"/>
+                       <xs:element ref="a:eDuration" minOccurs="0" maxOccurs="1"/>
+                       <xs:element ref="a:eNodes" minOccurs="0" maxOccurs="1"/>
+                       <xs:element ref="a:eProc" minOccurs="0" maxOccurs="1"/>
+
+                       <xs:element ref="a:RB" minOccurs="1" maxOccurs="1"/>
+                       <xs:element ref="a:CE" minOccurs="0" maxOccurs="1"/>
+                       <xs:element ref="a:host" minOccurs="0" maxOccurs="1"/> <!-- worker node -->
+
+       <!-- Genevieve's spec has "site" and "country" here, but I've got no idea how to retrieve them -->
+                       <xs:element ref="a:UIHost" minOccurs="0" maxOccurs="1"/>
+
+       <!-- not mandatory, currently LB hasn't got the info -->
+                       <xs:element ref="a:CPUTime" minOccurs="0" maxOccurs="1"/>
+                       <xs:element ref="a:NProc" minOccurs="0" maxOccurs="1"/>
+
+       <!-- In LB the real final status is Cleared
+                               However, Done, Aborted, or Cancelled should be reported here -->
+                       <xs:element ref="a:finalStatus" minOccurs="1" maxOccurs="1"/>
+                       <xs:element ref="a:finalStatusDate" minOccurs="1" maxOccurs="1"/>
+                       <xs:element ref="a:finalStatusReason" minOccurs="0" maxOccurs="1"/>
+                       <xs:element ref="a:LRMSDoneStatus" minOccurs="0" maxOccurs="1"/>
+                       <xs:element ref="a:LRMSStatusReason" minOccurs="0" maxOccurs="1"/>
+
+                       <xs:element ref="a:retryCount" minOccurs="1" maxOccurs="1"/>
+                       <xs:element ref="a:additionalReason" minOccurs="0" maxOccurs="1"/>
+
+                       <xs:element ref="a:jobType" minOccurs="1" maxOccurs="1"/>
+                       <xs:element ref="a:nsubjobs" minOccurs="0" maxOccurs="1"/>
+                       <xs:element ref="a:subjobs" minOccurs="0" maxOccurs="1"/>
+
+                       <!-- timestamps of the state history of the last resubmission cycle,
+                               i.e. it is guaranteed that each state apears here only once.
+                               Cf. stateEnterTimes in LB JobStatus -->
+
+                       <xs:element ref="a:lastStatusHistory" minOccurs="1" maxOccurs="1"/>
+
+                       <!-- timestamps of the whole state history, including all resubmission cycles -->
+                       <xs:element ref="a:fullStatusHistory" minOccurs="1" maxOccurs="1"/>
+
+                       <xs:element ref="a:JDL" minOccurs="0" maxOccurs="1"/>
+               </xs:sequence>
+
+               <!-- No idea where to get these from:
+
+   ENVIRONMENT
+
+   testbed                         production, preproduction, specific
+   release                         middleware release (LCG, g-lite...)
+   version                         version of middleware
+   job_history_version          in case of structure changes
+        -->
+
+               <xs:attribute name="jobid" type="xs:string" use="required"/>
+       </xs:complexType>
+
+
+       <xs:element name="jobRecord" type="jr:jobRecordType"/>
+
+</xs:schema>
diff --git a/org.glite.lb.state-machine/src/job_attrs.xsl b/org.glite.lb.state-machine/src/job_attrs.xsl
new file mode 100644 (file)
index 0000000..0df2f3d
--- /dev/null
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+
+<xsl:stylesheet version="1.0"
+       xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
+       xmlns:xs="http://www.w3.org/2001/XMLSchema"
+>
+
+<xsl:output method="text"/>
+
+<xsl:template match="xs:schema">
+#ifndef GLITE_LB_JP_JOB_ATTR_H
+#define GLITE_LB_JP_JOB_ATTR_H
+#define GLITE_JP_LB_NS "http://egee.cesnet.cz/en/Schema/LB/Attributes"
+#define GLITE_JP_LB_JDL_NS "http://egee.cesnet.cz/en/Schema/LB/Attributes:JDL"
+       <xsl:apply-templates select="xs:element"/>
+#define GLITE_JP_LB_CLASSAD_NS "http://jdl"
+#endif
+</xsl:template>
+
+<xsl:template match="xs:element">
+/** <xsl:value-of select="xs:documentation/text()"/> */
+#define GLITE_JP_LB_<xsl:value-of select="@name"/>     GLITE_JP_LB_NS &quot;:<xsl:value-of select="@name"/>&quot;
+</xsl:template>
+
+</xsl:stylesheet>
index 6630fa6..afc6e34 100644 (file)
 
 #include <cclassad.h>
 
+#include "glite/lbu/trio.h"
+
+#include "glite/lb/jobstat.h"
+#include "glite/lb/events.h"
+#include "glite/lb/producer.h"
+#include "glite/lb/events_parse.h"
+
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+/*
 #include "glite/lb/context.h"
 #include "glite/lb/jobstat.h"
 #include "glite/lb/events.h"
 #include "glite/lb/events_parse.h"
-#include "glite/lb/trio.h"
 #include "glite/lb/producer.h"
 
 #include "jobstat.h"
 #include "get_events.h"
+*/
 
 #include "glite/jp/types.h"
 #include "glite/jp/context.h"
 #include "glite/jp/attr.h"
 #include "glite/jp/utils.h"
 #include "glite/jp/known_attr.h"
-#include "jp_job_attrs.h"
+#include "job_attrs.h"
 
 #define INITIAL_NUMBER_EVENTS 100
 #define INITIAL_NUMBER_STATES EDG_WLL_NUMBER_OF_STATCODES
-#define LB_PLUGIN_NAMESPACE "urn:org.glite.lb"
 
 /*typedef struct _lb_buffer_t {
        char                    *buf;
@@ -57,6 +68,7 @@ typedef struct _lb_handle {
 #define check_strdup(s) ((s) ? strdup(s) : NULL)
 
 extern int processEvent(intJobStat *, edg_wll_Event *, int, int, char **);
+static void edg_wll_SortPEvents(edg_wll_Event **);
 
 static int lb_query(void *fpctx, void *handle, const char *attr, glite_jp_attrval_t **attrval);
 static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle);
@@ -1008,3 +1020,26 @@ fail:
        return retval;
 }
 */
+
+
+static int compare_pevents_by_seq(const void *a, const void *b)
+{
+        const edg_wll_Event **e = (const edg_wll_Event **) a;
+        const edg_wll_Event **f = (const edg_wll_Event **) b;
+       return compare_events_by_seq(*e,*f);
+}
+
+
+static void edg_wll_SortPEvents(edg_wll_Event **e)
+{
+       edg_wll_Event **p;
+       int     n;
+
+       if (!e) return;
+       p = e;
+       for (n=0; *p; n++) {
+               p++;
+       }
+       qsort(e,n,sizeof(*e),compare_pevents_by_seq);
+}
+
index 1bce002..c41d63c 100644 (file)
 #include "glite/lb/producer.h"
 #include "glite/lb/context-int.h"
 
-#include "jobstat.h"
-#include "lock.h"
+#include "glite/lbu/trio.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
 
 /* TBD: share in whole logging or workload */
 #ifdef __GNUC__
 #endif
 
 static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+int processEvent_Condor(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+
+int add_stringlist(char ***lptr, const char *new_item);
 
 int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
 {
@@ -44,7 +50,7 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                                js->pub.jobtype = EDG_WLL_STAT_CONDOR;
                                break;
                        default:
-                               asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype);
+                               trio_asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype);
                                return RET_FAIL;
        }
 
@@ -59,7 +65,7 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                        return processEvent_Condor(js,e,ev_seq,strict,errstring);
                case -1: return RET_UNREG;
                default: 
-                       asprintf(errstring,"undefined job type %d",js->pub.jobtype);
+                       trio_asprintf(errstring,"undefined job type %d",js->pub.jobtype);
                        return RET_FAIL;
        }
 }
@@ -229,7 +235,7 @@ static void reset_branch(intJobStat *js, edg_wll_Event *e)
 static char* location_string(const char *source, const char *host, const char *instance)
 {
        char *ret;
-       asprintf(&ret, "%s/%s/%s", source, host, instance);
+       trio_asprintf(&ret, "%s/%s/%s", source, host, instance);
        return ret;
 }
 
@@ -265,7 +271,7 @@ static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUS
 static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
 {
        edg_wll_JobStatCode     old_state = js->pub.state;
-       enum edg_wll_StatDone_code      old_done_code = js->pub.done_code;
+/* unused      enum edg_wll_StatDone_code      old_done_code = js->pub.done_code; */
        edg_wll_JobStatCode     new_state = EDG_WLL_JOB_UNKNOWN;
        int                     res = RET_OK,
                                fine_res = RET_OK;
@@ -880,7 +886,7 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                        rep(js->last_cancel_seqcode, e->any.seqcode);
                } else {
 
-/* the first set of LM events (Accept, Transfer/* -> LRMS)
+/* the first set of LM events (Accept, Transfer/XX -> LRMS)
    should not should shift the state (to Ready, Scheduled) but NOT to
    update js->last_seqcode completely, in order not to block following
    LRMS events which are likely to arrive later but should still affect
@@ -905,7 +911,7 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                        js->pub.network_server == NULL) {
                        char *inst;
                        inst = e->any.src_instance;
-                       asprintf(&js->pub.network_server, "%s%s%s",
+                       trio_asprintf(&js->pub.network_server, "%s%s%s",
                                e->any.host,
                                inst != NULL ? ":" : " ",
                                inst != NULL ? inst : "");
@@ -957,3 +963,15 @@ void destroy_intJobStat(intJobStat *p)
        destroy_intJobStat_extension(p);
        memset(p, 0, sizeof(intJobStat));
 }
+
+void init_intJobStat(intJobStat *p)
+{
+       memset(p, 0, sizeof(intJobStat));
+       p->pub.jobtype = -1 /* why? EDG_WLL_STAT_SIMPLE */;
+       p->pub.children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
+       p->pub.children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES;
+       p->pub.stateEnterTimes = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
+       p->pub.stateEnterTimes[0] = EDG_WLL_NUMBER_OF_STATCODES;
+       /* TBD: generate */
+}
+
index e884205..cb52cc0 100644 (file)
@@ -10,8 +10,8 @@
 #include "glite/lb/producer.h"
 #include "glite/lb/context-int.h"
 
-#include "jobstat.h"
-#include "lock.h"
+#include "intjobstat.h"
+#include "seqcode_aux.h"
 
 /* TBD: share in whole logging or workload */
 #ifdef __GNUC__
index 64c69d7..b74a8ae 100644 (file)
 #include "glite/lb/producer.h"
 #include "glite/lb/context-int.h"
 
-#include "jobstat.h"
-#include "lock.h"
+#include "glite/lbu/trio.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
 
 /* TBD: share in whole logging or workload */
 #ifdef __GNUC__
@@ -165,7 +167,7 @@ int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, c
                        if (USABLE_DATA(res)) {
                                char *new_resource_usage;
        
-                               asprintf(&new_resource_usage,"%s%s\t%s = %f [%s]",
+                               trio_asprintf(&new_resource_usage,"%s%s\t%s = %f [%s]",
                                        (js->pub.pbs_resource_usage) ? js->pub.pbs_resource_usage : "",
                                        (js->pub.pbs_resource_usage) ? "\n": "",
                                        e->PBSResourceUsage.name,
@@ -185,7 +187,7 @@ int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, c
                        if (USABLE_DATA(res)) {
                                char *new_error_desc;
 
-                               asprintf(&new_error_desc,"%s%s\t%s",
+                               trio_asprintf(&new_error_desc,"%s%s\t%s",
                                        (js->pub.pbs_error_desc) ? js->pub.pbs_error_desc : "",
                                        (js->pub.pbs_error_desc) ? "\n" : "",
                                        e->PBSError.error_desc);
diff --git a/org.glite.lb.state-machine/src/seqcode_aux.c b/org.glite.lb.state-machine/src/seqcode_aux.c
new file mode 100644 (file)
index 0000000..f1d9b90
--- /dev/null
@@ -0,0 +1,281 @@
+#ident "$Header$"
+
+#include <stdio.h>
+#include <string.h>
+#include <syslog.h>
+#include <assert.h>
+
+#include "glite/lbu/trio.h"
+#include "glite/lb/context-int.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+
+/*
+#include <inttypes.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <stdarg.h>
+#include <regex.h>
+#include <syslog.h>
+
+#include "glite/jobid/cjobid.h"
+#include "glite/lbu/trio.h"
+#include "glite/lbu/db.h"
+#include "glite/lb/context-int.h"
+
+#include "store.h"
+#include "index.h"
+#include "jobstat.h"
+#include "get_events.h"
+*/
+
+int component_seqcode(const char *a, edg_wll_Source index)
+{
+       unsigned int    c[EDG_WLL_SOURCE__LAST];
+       int             res;
+       char            sc[EDG_WLL_SEQ_SIZE];
+
+       if (!strstr(a, "LBS")) snprintf(sc,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
+       else snprintf(sc,EDG_WLL_SEQ_SIZE,"%s",a);
+
+       res =  sscanf(sc, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+                       &c[EDG_WLL_SOURCE_USER_INTERFACE],
+                       &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+                       &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+                       &c[EDG_WLL_SOURCE_BIG_HELPER],
+                       &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+                       &c[EDG_WLL_SOURCE_LOG_MONITOR],
+                       &c[EDG_WLL_SOURCE_LRMS],
+                       &c[EDG_WLL_SOURCE_APPLICATION],
+                       &c[EDG_WLL_SOURCE_LB_SERVER]);
+       if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME:              syslog(LOG_ERR, "unparsable sequence code %s\n", sc); */
+               fprintf(stderr, "unparsable sequence code %s\n", sc);
+               return -1;
+       }
+
+       return(c[index]);       
+}
+
+char * set_component_seqcode(char *a,edg_wll_Source index,int val)
+{
+       unsigned int    c[EDG_WLL_SOURCE__LAST];
+       int             res;
+       char            *ret;
+       char            sc[EDG_WLL_SEQ_SIZE];
+
+       if (!strstr(a, "LBS")) snprintf(sc,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
+       else snprintf(sc,EDG_WLL_SEQ_SIZE,"%s",a);
+
+       res =  sscanf(sc, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+                       &c[EDG_WLL_SOURCE_USER_INTERFACE],
+                       &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+                       &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+                       &c[EDG_WLL_SOURCE_BIG_HELPER],
+                       &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+                       &c[EDG_WLL_SOURCE_LOG_MONITOR],
+                       &c[EDG_WLL_SOURCE_LRMS],
+                       &c[EDG_WLL_SOURCE_APPLICATION],
+                       &c[EDG_WLL_SOURCE_LB_SERVER]);
+       if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME:              syslog(LOG_ERR, "unparsable sequence code %s\n", sc); */
+               fprintf(stderr, "unparsable sequence code %s\n", sc);
+               return NULL;
+       }
+
+       c[index] = val;
+       trio_asprintf(&ret,"UI=%06d:NS=%010d:WM=%06d:BH=%010d:JSS=%06d"
+                                ":LM=%06d:LRMS=%06d:APP=%06d:LBS=%06d",
+                        c[EDG_WLL_SOURCE_USER_INTERFACE],
+                        c[EDG_WLL_SOURCE_NETWORK_SERVER],
+                        c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+                        c[EDG_WLL_SOURCE_BIG_HELPER],
+                        c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+                        c[EDG_WLL_SOURCE_LOG_MONITOR],
+                        c[EDG_WLL_SOURCE_LRMS],
+                        c[EDG_WLL_SOURCE_APPLICATION],
+                        c[EDG_WLL_SOURCE_LB_SERVER]);
+       return ret;
+}
+
+int before_deep_resubmission(const char *a, const char *b)
+{
+       if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) < 
+           component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) )
+               return(1);
+       else
+               return(0);
+
+}
+
+int same_branch(const char *a, const char *b) 
+{
+       if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) == 
+           component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) )
+               return(1);
+       else
+               return(0);
+}
+
+int edg_wll_compare_pbs_seq(const char *a,const char *b)
+{
+       char    timestamp_a[14], pos_a[10], src_a;
+       char    timestamp_b[14], pos_b[10], src_b;
+       int     ev_code_a, ev_code_b;
+       int     res;
+
+       res = sscanf(a,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_a, pos_a, &ev_code_a, &src_a);   
+
+       if (res != 4) {
+/* FIXME:              syslog(LOG_ERR, "unparsable sequence code %s\n", a); */
+               fprintf(stderr, "unparsable sequence code %s\n", a);
+               return -1;
+       }
+
+       res = sscanf(b,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_b, pos_b, &ev_code_b, &src_b);   
+
+       if (res != 4) {
+/* FIXME:              syslog(LOG_ERR, "unparsable sequence code %s\n", b); */
+               fprintf(stderr, "unparsable sequence code %s\n", b);
+               return -1;
+       }
+
+       /* wild card for PBSJobReg - this event should always come as firt one          */
+       /* bacause it hold job.type, which is necessary for further event processing    */
+       if (ev_code_a == EDG_WLL_EVENT_REGJOB) return -1;
+       if (ev_code_b == EDG_WLL_EVENT_REGJOB) return 1;
+
+       /* sort event w.t.r. to timestamps */
+       if ((res = strcmp(timestamp_a,timestamp_b)) != 0) {
+               return res;
+       }
+       else {
+               /* if timestamps equal, sort if w.t.r. to file position */
+               /* if you both events come from the same log file       */
+               if (src_a == src_b) {
+                       /* zero mean in fact duplicate events in log    */
+                       return strcmp(pos_a,pos_b);
+               }
+               /* if the events come from diffrent log files           */
+               /* it is possible to prioritize some src log file       */
+               else    {
+                       /* prioritize events from pbs_mom */
+                       if (src_a == 'm') return 1;
+                       if (src_b == 'm') return -1;
+
+                       /* then prioritize events from pbs_server */
+                       if (src_a == 's') return 1;
+                       if (src_b == 's') return -1;
+
+                       /* other priorities comes here... */
+               }       
+       }
+
+       return 0;
+}
+
+edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) {
+       switch (pbs_seq_num[EDG_WLL_SEQ_PBS_SIZE - 2]) {
+               case 'c': return(EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER);
+               case 's': return(EDG_WLL_PBS_EVENT_SOURCE_SERVER);
+               case 'm': return(EDG_WLL_PBS_EVENT_SOURCE_MOM);
+               case 'a': return(EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING);
+               default: return(EDG_WLL_PBS_EVENT_SOURCE_UNDEF);
+       }
+}
+
+edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) {
+       switch (condor_seq_num[EDG_WLL_SEQ_CONDOR_SIZE - 2]) {
+               case 'L': return(EDG_WLL_CONDOR_EVENT_SOURCE_COLLECTOR);
+               case 'M': return(EDG_WLL_CONDOR_EVENT_SOURCE_MASTER);
+               case 'm': return(EDG_WLL_CONDOR_EVENT_SOURCE_MATCH);
+               case 'N': return(EDG_WLL_CONDOR_EVENT_SOURCE_NEGOTIATOR);
+               case 'C': return(EDG_WLL_CONDOR_EVENT_SOURCE_SCHED);
+               case 'H': return(EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW);
+               case 's': return(EDG_WLL_CONDOR_EVENT_SOURCE_STARTER);
+               case 'S': return(EDG_WLL_CONDOR_EVENT_SOURCE_START);
+               case 'j': return(EDG_WLL_CONDOR_EVENT_SOURCE_JOBQUEUE);
+               default: return(EDG_WLL_CONDOR_EVENT_SOURCE_UNDEF);
+       }
+}
+
+int edg_wll_compare_seq(const char *a, const char *b)
+{
+       unsigned int    c[EDG_WLL_SOURCE__LAST];
+       unsigned int    d[EDG_WLL_SOURCE__LAST];
+       int             res, i;
+       char            sca[EDG_WLL_SEQ_SIZE], scb[EDG_WLL_SEQ_SIZE];
+
+
+       if ( (strstr(a,"TIMESTAMP=") == a) && (strstr(b,"TIMESTAMP=") == b) ) 
+               return edg_wll_compare_pbs_seq(a,b);
+
+       if (!strstr(a, "LBS")) snprintf(sca,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
+       else snprintf(sca,EDG_WLL_SEQ_SIZE,"%s",a);
+       if (!strstr(b, "LBS")) snprintf(scb,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",b);
+       else snprintf(scb,EDG_WLL_SEQ_SIZE,"%s",b);
+
+       assert(EDG_WLL_SOURCE__LAST == 10);
+
+       res =  sscanf(sca, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+                       &c[EDG_WLL_SOURCE_USER_INTERFACE],
+                       &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+                       &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+                       &c[EDG_WLL_SOURCE_BIG_HELPER],
+                       &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+                       &c[EDG_WLL_SOURCE_LOG_MONITOR],
+                       &c[EDG_WLL_SOURCE_LRMS],
+                       &c[EDG_WLL_SOURCE_APPLICATION],
+                       &c[EDG_WLL_SOURCE_LB_SERVER]);
+       if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME:              syslog(LOG_ERR, "unparsable sequence code %s\n", sca); */
+               fprintf(stderr, "unparsable sequence code %s\n", sca);
+               return -1;
+       }
+
+       res =  sscanf(scb, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+                       &d[EDG_WLL_SOURCE_USER_INTERFACE],
+                       &d[EDG_WLL_SOURCE_NETWORK_SERVER],
+                       &d[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+                       &d[EDG_WLL_SOURCE_BIG_HELPER],
+                       &d[EDG_WLL_SOURCE_JOB_SUBMISSION],
+                       &d[EDG_WLL_SOURCE_LOG_MONITOR],
+                       &d[EDG_WLL_SOURCE_LRMS],
+                       &d[EDG_WLL_SOURCE_APPLICATION],
+                       &d[EDG_WLL_SOURCE_LB_SERVER]);
+       if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME:              syslog(LOG_ERR, "unparsable sequence code %s\n", scb); */
+               fprintf(stderr, "unparsable sequence code %s\n", scb);
+               return 1;
+       }
+
+       for (i = EDG_WLL_SOURCE_USER_INTERFACE ; i < EDG_WLL_SOURCE__LAST; i++) {
+               if (c[i] < d[i]) return -1;
+               if (c[i] > d[i]) return  1;
+       }
+
+       return 0;
+}
+
+
+int compare_events_by_seq(const void *a, const void *b)
+{
+        const edg_wll_Event *e = (edg_wll_Event *) a;
+        const edg_wll_Event *f = (edg_wll_Event *) b;
+       int ret;
+
+
+       ret = edg_wll_compare_seq(e->any.seqcode, f->any.seqcode);
+       if (ret) return ret;
+       
+       if (e->any.timestamp.tv_sec < f->any.timestamp.tv_sec) return -1;
+       if (e->any.timestamp.tv_sec > f->any.timestamp.tv_sec) return 1;
+       if (e->any.timestamp.tv_usec < f->any.timestamp.tv_usec) return -1;
+       if (e->any.timestamp.tv_usec > f->any.timestamp.tv_usec) return 1;
+       return 0;
+}
+
diff --git a/org.glite.lb.state-machine/src/seqcode_aux.h b/org.glite.lb.state-machine/src/seqcode_aux.h
new file mode 100644 (file)
index 0000000..5fd1b3a
--- /dev/null
@@ -0,0 +1,22 @@
+#ident "$Header$"
+
+int component_seqcode(const char *a, edg_wll_Source index);
+
+char * set_component_seqcode(char *a,edg_wll_Source index,int val);
+
+int before_deep_resubmission(const char *a, const char *b);
+
+int same_branch(const char *a, const char *b) ;
+
+int edg_wll_compare_pbs_seq(const char *a,const char *b);
+#define edg_wll_compare_condor_seq edg_wll_compare_pbs_seq
+
+edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) ;
+
+edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) ;
+
+int edg_wll_compare_seq(const char *a, const char *b);
+
+int compare_events_by_seq(const void *a, const void *b);
+
+