extern int unset_proxy_flag(edg_wll_Context, edg_wlc_JobId);
extern int edg_wll_NotifMatch(edg_wll_Context, const edg_wll_JobStat *);
-static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq);
+static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP);
int
db_store(edg_wll_Context ctx,char *ucs, char *event)
{
edg_wll_Event *ev;
- int seq;
+ int seq, reg_to_JP = 0;
int err;
+ int local_job;
edg_wll_JobStat newstat;
- char *srvName = NULL;
- unsigned int srvPort;
ev = NULL;
if(edg_wll_ParseEvent(ctx, event, &ev))
goto err;
+ local_job = is_job_local(ctx, ev->any.jobId);
#ifdef LB_PERF
if (sink_mode == GLITE_LB_SINK_STORE) {
}
#endif
- edg_wlc_JobIdGetServerParts(ev->any.jobId, &srvName, &srvPort);
-
if(use_db) {
char *ed;
int code;
if (edg_wll_LockJob(ctx,ev->any.jobId)) goto err;
- store_job_server_proxy(ctx, ev, srvName, srvPort);
+ store_job_server_proxy(ctx, ev);
code = edg_wll_Error(ctx,NULL,&ed);
edg_wll_UnlockJob(ctx,ev->any.jobId); /* XXX: ignore error */
if (code) {
* if jobid prefix hostname matches server hostname -> they will
* sooner or later arrive to server too and are stored in common DB
*/
- if (ctx->isProxy && ctx->serverRunning && (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) ) {
- if (!strcmp(ctx->srvName, srvName)) {
- free(srvName);
+ if (ctx->isProxy && local_job) {
+ if (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) {
return 0;
}
-
+ else {
+ /* these are re-registrations of subjobs on proxy */
+ /* embryonic registrations does not trigger registration in JP */
+ reg_to_JP = 1;
+ }
}
/* XXX: if event type is user tag, convert the tag name to lowercase!
*/
if (err) goto err;
- db_store_finalize(ctx, event, ev, &newstat, seq);
+ db_store_finalize(ctx, event, ev, &newstat, seq, reg_to_JP);
err:
free(ev);
}
- if (srvName) free(srvName);
-
if ( newstat.state ) edg_wll_FreeStatus(&newstat);
assert(event);
}
- db_store_finalize(ctx, event, ev, &newstat, seq);
+ db_store_finalize(ctx, event, ev, &newstat, seq, 0);
err:
-static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq) {
-printf("%d\n", seq);
- if ( ctx->isProxy ) {
- /*
- * send event to the proper BK server
- * event with priority flag EDG_WLL_LOGFLAG_DIRECT (typically RegJob) is not sent
- */
-
-#ifdef LB_PERF
- if( sink_mode == GLITE_LB_SINK_SEND ) {
- glite_wll_perftest_consumeEvent(ev);
- } else
-#endif
-
- /* XXX: ending here may break the backward compatibility */
- if (!(ev->any.priority & EDG_WLL_LOGFLAG_PROXY)) {
- edg_wll_UpdateError(ctx, 0, "db_actual_store() WARNING: the event is not PROXY");
- //return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "db_actual_store() ERROR: the event is not PROXY");
- }
+static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP)
+{
+ int local_job = is_job_local(ctx, ev->any.jobId);
- if (!(ev->any.priority & EDG_WLL_LOGFLAG_DIRECT)) {
- if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) ) {
- return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error.");
- }
- }
- /* LB proxy purge */
- if (newstat->remove_from_proxy) {
- edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
- }
- } else
#ifdef LB_PERF
if( sink_mode == GLITE_LB_SINK_SEND ) {
glite_wll_perftest_consumeEvent(ev);
- } else
-#endif
- {
- char *jobIdHost = NULL;
- unsigned int jobIdPort;
-
-
- /* Purge proxy flag */
- edg_wlc_JobIdGetServerParts(ev->any.jobId, &jobIdHost, &jobIdPort);
- if ( newstat->remove_from_proxy && (ctx->srvPort == jobIdPort) &&
- !strcmp(jobIdHost,ctx->srvName) )
- {
- if (unset_proxy_flag(ctx, ev->any.jobId) < 0) {
- free(jobIdHost);
- return(edg_wll_Error(ctx,NULL,NULL));
- }
+ return edg_wll_Error(ctx,NULL,NULL);
}
- free(jobIdHost);
-
- if ( newstat->state ) {
- edg_wll_NotifMatch(ctx, newstat);
- }
- if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB && seq == 0) {
+#endif
+
+ /* Send regitration to JP
+ */
+ if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB && seq == 0 &&
+ (!ctx->isProxy || reg_to_JP) ) {
char *jids, *msg;
if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) {
}
free(msg);
}
- }
- return edg_wll_Error(ctx,NULL,NULL);
+
+
+ if ( ctx->isProxy ) {
+ /*
+ * send event to the proper BK server
+ * event with priority flag EDG_WLL_LOGFLAG_DIRECT (typically RegJob) is not sent
+ */
+
+ /* XXX: ending here may break the backward compatibility */
+ if (!(ev->any.priority & EDG_WLL_LOGFLAG_PROXY)) {
+ edg_wll_UpdateError(ctx, 0, "db_actual_store() WARNING: the event is not PROXY");
+ //return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "db_actual_store() ERROR: the event is not PROXY");
+ }
+
+ if (!(ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) && !local_job) {
+ if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) ) {
+ return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error.");
+ }
+ }
+ else {
+ /* event will not arrive to server, only flag was set */
+ /* check whether some pending notifications are not triggered */
+ if ( newstat->state ) {
+ edg_wll_NotifMatch(ctx, newstat);
+ }
+ }
+
+ /* LB proxy purge */
+ if (newstat->remove_from_proxy) {
+ edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
+ }
+ } else
+ {
+ /* Purge proxy flag */
+ if ( newstat->remove_from_proxy && local_job ) {
+ if (unset_proxy_flag(ctx, ev->any.jobId) < 0) {
+ return(edg_wll_Error(ctx,NULL,NULL));
+ }
+ }
+
+ if ( newstat->state ) {
+ edg_wll_NotifMatch(ctx, newstat);
+ }
+ }
+
+
+ return edg_wll_Error(ctx,NULL,NULL);
}
return edg_wll_Error(ctx,NULL,NULL);
}
+/* test whether job shares LB proxy and server DB or not */
+int is_job_local(edg_wll_Context ctx, edg_wlc_JobId jobId)
+{
+ char *srvName = NULL;
+ unsigned int srvPort;
+ int ret;
+
+
+ edg_wlc_JobIdGetServerParts(jobId, &srvName, &srvPort);
+ ret = ((ctx->srvPort == srvPort) && !strcmp(srvName,ctx->srvName));
+ free(srvName);
+
+ return(ret);
+}
-int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobIdHost, unsigned int jobIdPort)
+int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event)
{
char *unique = edg_wlc_JobIdGetUnique(event->any.jobId);
char *q = NULL, *owner = NULL, *userid = NULL;
glite_lbu_Statement stmt = NULL;
int nar;
char *can_peername = NULL;
+ int local_job = is_job_local(ctx, event->any.jobId);
edg_wll_ResetError(ctx);
if (event->any.type == EDG_WLL_EVENT_REGJOB) {
if (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) {
/* first synchronous registration */
- if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) {
+ if (local_job) {
/* we are both server and proxy for this job */
trio_asprintf(&q,"update jobs set proxy=1 where jobid='%|Ss'",
unique);
}
else {
/* supplementary re-registration (JDL of subjob, etc.) */
- if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) {
+ if (local_job) {
/* previous registration via GSI required */
trio_asprintf(&q,"update jobs set jobid='%|Ss', proxy='1' where jobid='%|Ss'",
unique, unique);