From: Aleš Křenek Date: Thu, 7 Oct 2004 13:40:15 +0000 (+0000) Subject: pullup of INFN CVS changes -- mainly notifications X-Git-Tag: glite-lb-common_R_0_2_0~123 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=71e28295c97af7f8c148bcd0825819529571da62;p=jra1mw.git pullup of INFN CVS changes -- mainly notifications --- diff --git a/org.glite.lb.client/project/version.properties b/org.glite.lb.client/project/version.properties index 33e2b5c..a312a30 100644 --- a/org.glite.lb.client/project/version.properties +++ b/org.glite.lb.client/project/version.properties @@ -1,4 +1,4 @@ #Thu Oct 07 13:53:42 CEST 2004 -module.version=0.1.0 +module.version=0.2.0 module.build=34 module.age=0 diff --git a/org.glite.lb.client/src/Notification.cpp b/org.glite.lb.client/src/Notification.cpp index 30ba1d7..0441626 100644 --- a/org.glite.lb.client/src/Notification.cpp +++ b/org.glite.lb.client/src/Notification.cpp @@ -35,8 +35,10 @@ freeQueryRecVector(edg_wll_QueryRec *); Notification::Notification(void) { try { - int ret = edg_wll_InitContext(&ctx); - check_result(ret,ctx,"edg_wll_InitContext"); + int ret = edg_wll_InitContext(&this->ctx); + check_result(ret,this->ctx,"edg_wll_InitContext"); + this->notifId = NULL; + this->valid = 0; } catch (Exception &e) { STACK_ADD; throw; @@ -46,10 +48,17 @@ Notification::Notification(void) Notification::Notification(const std::string notifid_str) { try { - int ret = edg_wll_InitContext(&ctx); - check_result(ret,ctx,"edg_wll_InitContext"); - ret = edg_wll_NotifIdParse(notifid_str.c_str(),¬ifId); - check_result(ret,ctx,"edg_wll_NotifIdParse"); + char *host; + unsigned int port; + int ret = edg_wll_InitContext(&this->ctx); + check_result(ret,this->ctx,"edg_wll_InitContext"); + ret = edg_wll_NotifIdParse(notifid_str.c_str(),&this->notifId); + check_result(ret,this->ctx,"edg_wll_NotifIdParse"); + edg_wll_NotifIdGetServerParts(this->notifId,&host,&port); + edg_wll_SetParam(this->ctx, EDG_WLL_PARAM_NOTIF_SERVER, host); + edg_wll_SetParam(this->ctx, EDG_WLL_PARAM_NOTIF_SERVER_PORT, port); + free(host); + this->valid = 0; } catch (Exception &e) { STACK_ADD; throw; @@ -63,7 +72,8 @@ Notification::Notification(const std::string host,const u_int16_t port) check_result(ret,ctx,"edg_wll_InitContext"); edg_wll_SetParam(ctx, EDG_WLL_PARAM_NOTIF_SERVER, host.c_str()); edg_wll_SetParam(ctx, EDG_WLL_PARAM_NOTIF_SERVER_PORT, port); - ret = edg_wll_NotifIdCreate(host.c_str(),port,¬ifId); + this->notifId = NULL; + this->valid = 0; check_result(ret,ctx,"edg_wll_NotifIdCreate"); } catch (Exception &e) { STACK_ADD; @@ -75,7 +85,8 @@ Notification::Notification(const std::string host,const u_int16_t port) Notification::~Notification(void) { try { - edg_wll_FreeContext(ctx); + edg_wll_FreeContext(this->ctx); + edg_wll_NotifIdFree(this->notifId); } catch (Exception &e) { STACK_ADD; throw; @@ -87,8 +98,14 @@ std::string Notification::getNotifId(void) const { try { - std::string notifid_str = edg_wll_NotifIdUnparse(notifId); - return(notifid_str); + std::string notifid_str; + if (this->notifId != NULL) { + notifid_str = edg_wll_NotifIdUnparse(this->notifId); + return(notifid_str); + } else { + STACK_ADD; + throw Exception(EXCEPTION_MANDATORY, EINVAL, "notifId not known at the moment"); + } } catch (Exception &e) { STACK_ADD; throw; @@ -98,15 +115,15 @@ Notification::getNotifId(void) const time_t Notification::getValid(void) const { - return(valid); + return(this->valid); } int Notification::getFd(void) const { try { - int ret = edg_wll_NotifGetFd(ctx); - check_result(ret,ctx,"edg_wll_NotifGetFd"); + int ret = edg_wll_NotifGetFd(this->ctx); + check_result(ret,this->ctx,"edg_wll_NotifGetFd"); return(ret); } catch (Exception &e) { STACK_ADD; @@ -120,6 +137,10 @@ Notification::addJob(const glite::wmsutils::jobid::JobId &jobId) std::vector::iterator it; try { + if (this->notifId != NULL) { + STACK_ADD; + throw Exception(EXCEPTION_MANDATORY, EINVAL, "adding jobs allowed only before registering"); + } for( it = jobs.begin(); it != jobs.end(); it++ ) { if ( (*it).toString() == jobId.toString() ) { STACK_ADD; @@ -141,6 +162,10 @@ Notification::removeJob(const glite::wmsutils::jobid::JobId &jobId) int removed = 0; try { + if (this->notifId != NULL) { + STACK_ADD; + throw Exception(EXCEPTION_MANDATORY, EINVAL, "removing jobs allowed only before registering"); + } for( it = jobs.begin(); it != jobs.end(); it++ ) { if ( (*it).toString() == jobId.toString() ) { jobs.erase(it); @@ -159,6 +184,8 @@ Notification::removeJob(const glite::wmsutils::jobid::JobId &jobId) } } +/* XXX: obsolete, used only for debugging purposes */ + std::string Notification::getJobs(void) { @@ -181,6 +208,10 @@ Notification::getJobs(void) void Notification::setStates(const std::vector &jobStates) { + if (this->notifId != NULL) { + STACK_ADD; + throw Exception(EXCEPTION_MANDATORY, EINVAL, "removing jobs allowed only before registering"); + } states = jobStates; } @@ -211,27 +242,32 @@ Notification::Register(void) int ret = 0; std::vector::iterator it; std::vector::iterator its; - std::vector > query; + std::vector > queryExt; edg_wll_QueryRec **conditions = NULL; unsigned i; try { + if (this->notifId != NULL) { + STACK_ADD; + throw Exception(EXCEPTION_MANDATORY, EINVAL, "registering job allowed only once"); + } /* fill in the query: */ + std::vector query; for( it = jobs.begin(); it != jobs.end(); it++ ) { - std::vector queryjob; - QueryRecord r0(QueryRecord::JOBID,QueryRecord::EQUAL,*it); - queryjob.push_back(r0); - - for( its = states.begin(); its != states.end(); its++ ) { - QueryRecord r(QueryRecord::STATUS,QueryRecord::EQUAL,*its); - queryjob.push_back(r); - } + query.push_back(r0); + } + queryExt.push_back(query); + query.clear(); - query.push_back(queryjob); + for( its = states.begin(); its != states.end(); its++ ) { + QueryRecord r(QueryRecord::STATUS,QueryRecord::EQUAL,*its); + query.push_back(r); } + queryExt.push_back(query); + /* convert query to conditions */ - conditions = convertQueryVectorExt(query); + conditions = convertQueryVectorExt(queryExt); /* register */ ret = edg_wll_NotifNew(ctx,conditions,-1,NULL,¬ifId,&valid); check_result(ret,ctx,"edg_wll_NotifNew"); @@ -259,8 +295,29 @@ Notification::Register(void) } } +void +Notification::Bind(const std::string address_override) +{ + try { + if (this->notifId == NULL) { + STACK_ADD; + throw Exception(EXCEPTION_MANDATORY, EINVAL, "binding allowed only for given notifId"); + } + int ret = edg_wll_NotifBind(this->ctx,this->notifId,-1,address_override.c_str(),&this->valid); + check_result(ret,this->ctx,"edg_wll_NotifBind"); + } + catch (Exception &e) { + STACK_ADD; + throw; + } +} + + + + int Notification::receive(glite::lb::JobStatus &jobStatus,timeval &timeout) { + try { int ret = 0; edg_wll_JobStat *status = (edg_wll_JobStat *) calloc(1,sizeof(edg_wll_JobStat)); if (status == NULL) { @@ -268,9 +325,16 @@ int Notification::receive(glite::lb::JobStatus &jobStatus,timeval &timeout) throw OSException(EXCEPTION_MANDATORY, ENOMEM, "allocating jobStatus"); } ret = edg_wll_NotifReceive(ctx,-1,&timeout,status,¬ifId); + if ( ret == ETIMEDOUT ) + return 1; check_result(ret,ctx,"edg_wll_NotifReceive"); jobStatus = JobStatus(*status); - return(ret); + return 0; + } + catch (Exception &e) { + STACK_ADD; + throw; + } } EWL_END_NAMESPACE; diff --git a/org.glite.lb.client/src/dump.c b/org.glite.lb.client/src/dump.c index a75f75b..f172192 100644 --- a/org.glite.lb.client/src/dump.c +++ b/org.glite.lb.client/src/dump.c @@ -205,8 +205,8 @@ int edg_wll_DumpEvents( edg_wll_DumpRequestToXML(ctx, request, &send_mess); - //edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_TIMEOUT, 4000); - ctx->p_tmp_timeout.tv_sec = 400; + ctx->p_tmp_timeout = ctx->p_query_timeout; + if (ctx->p_tmp_timeout.tv_sec < 600) ctx->p_tmp_timeout.tv_sec = 600; if (set_server_name_and_port(ctx, NULL)) goto edg_wll_dumpevents_end; diff --git a/org.glite.lb.client/src/load.c b/org.glite.lb.client/src/load.c index 97522b0..bd75085 100644 --- a/org.glite.lb.client/src/load.c +++ b/org.glite.lb.client/src/load.c @@ -183,8 +183,8 @@ int edg_wll_LoadEvents( edg_wll_LoadRequestToXML(ctx, request, &send_mess); - //edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_TIMEOUT, 4000); - ctx->p_tmp_timeout.tv_sec = 400; + ctx->p_tmp_timeout = ctx->p_query_timeout; + if (ctx->p_tmp_timeout.tv_sec < 600) ctx->p_tmp_timeout.tv_sec = 600; if (set_server_name_and_port(ctx, NULL)) goto edg_wll_loadevents_end; diff --git a/org.glite.lb.client/src/notification.c b/org.glite.lb.client/src/notification.c index 4fd57cb..12b6cbf 100644 --- a/org.glite.lb.client/src/notification.c +++ b/org.glite.lb.client/src/notification.c @@ -1,4 +1,4 @@ -#ident "$Header" +#ident "$Header$" #include #include @@ -162,7 +162,7 @@ static int get_client_address( * address_override */ if (ctx->notifSock >= 0) { - if (getsockname(ctx->notifSock, &a, &alen)) + if (getsockname(ctx->notifSock, (struct sockaddr *) &a, &alen)) return edg_wll_SetError(ctx, errno, "getsockname() failed"); if ( (strcmp(inet_ntoa(a.sin_addr), name)) || (ntohs(a.sin_port) != port) ) { @@ -197,7 +197,7 @@ static int get_client_address( // used supplied socket ctx->notifSock = fd; - if (getsockname(ctx->notifSock, &a, &alen)) + if (getsockname(ctx->notifSock,(struct sockaddr *) &a, &alen)) return edg_wll_SetError(ctx, errno, "getsockname() failed"); if (a.sin_addr.s_addr == INADDR_ANY) @@ -455,14 +455,14 @@ static int recv_notif(edg_wll_Context ctx) edg_wll_GssStatus gss_code; - if (ctx->connPool[ctx->connToUse].buf) { - free(ctx->connPool[ctx->connToUse].buf); - ctx->connPool[ctx->connToUse].buf = NULL; + if (ctx->connPoolNotif[0].buf) { + free(ctx->connPoolNotif[0].buf); + ctx->connPoolNotif[0].buf = NULL; } - ctx->connPool[ctx->connToUse].bufUse = 0; - ctx->connPool[ctx->connToUse].bufSize = 0; + ctx->connPoolNotif[0].bufUse = 0; + ctx->connPoolNotif[0].bufSize = 0; - ret = edg_wll_gss_read_full(&ctx->connPool[ctx->connToUse].gss, + ret = edg_wll_gss_read_full(&ctx->connPoolNotif[0].gss, fbuf,17, &ctx->p_tmp_timeout,&total, &gss_code); if (ret < 0) switch (ret) { @@ -480,23 +480,23 @@ static int recv_notif(edg_wll_Context ctx) return edg_wll_SetError(ctx,EINVAL,"message length"); } - ctx->connPool[ctx->connToUse].bufSize = len+1; + ctx->connPoolNotif[0].bufSize = len+1; - ctx->connPool[ctx->connToUse].buf = (char *) malloc( - ctx->connPool[ctx->connToUse].bufSize); + ctx->connPoolNotif[0].buf = (char *) malloc( + ctx->connPoolNotif[0].bufSize); - if (!ctx->connPool[ctx->connToUse].buf) { + if (!ctx->connPoolNotif[0].buf) { return edg_wll_SetError(ctx, ENOMEM, "recv_notif()"); } - ret = edg_wll_gss_read_full(&ctx->connPool[ctx->connToUse].gss, - ctx->connPool[ctx->connToUse].buf, len, + ret = edg_wll_gss_read_full(&ctx->connPoolNotif[0].gss, + ctx->connPoolNotif[0].buf, len, &ctx->p_tmp_timeout,&total, &gss_code); if (ret < 0) { - free(ctx->connPool[ctx->connToUse].buf); - ctx->connPool[ctx->connToUse].bufUse = 0; - ctx->connPool[ctx->connToUse].bufSize = 0; + free(ctx->connPoolNotif[0].buf); + ctx->connPoolNotif[0].bufUse = 0; + ctx->connPoolNotif[0].bufSize = 0; return edg_wll_SetError(ctx, ret == EDG_WLL_GSS_ERROR_TIMEOUT ? ETIMEDOUT : EDG_WLL_ERROR_GSS, @@ -504,8 +504,8 @@ static int recv_notif(edg_wll_Context ctx) } - ctx->connPool[ctx->connToUse].buf[len] = 0; - ctx->connPool[ctx->connToUse].bufUse = len+1; + ctx->connPoolNotif[0].buf[len] = 0; + ctx->connPoolNotif[0].bufUse = len+1; return edg_wll_Error(ctx,NULL,NULL); @@ -537,7 +537,7 @@ static int send_reply(const edg_wll_Context ctx) p = put_int(p, err_code_min); p = put_string(p, err_msg); - ret = edg_wll_gss_write_full(&ctx->connPool[ctx->connToUse].gss, + ret = edg_wll_gss_write_full(&ctx->connPoolNotif[0].gss, buf,len,&ctx->p_tmp_timeout,&total, &gss_code); if (ret < 0) { edg_wll_SetError(ctx, @@ -554,12 +554,16 @@ err: + int edg_wll_NotifReceive( edg_wll_Context ctx, int fd, const struct timeval *timeout, edg_wll_JobStat *state_out, edg_wll_NotifId *id_out) + +/* pullup from INFN, support multiple messages from interlogger */ +#if 0 { fd_set fds; struct sockaddr_in a; @@ -572,8 +576,9 @@ int edg_wll_NotifReceive( edg_wll_GssStatus gss_code; + edg_wll_ResetError(ctx); -/* start timer */ + /* start timer */ gettimeofday(&start_time,0); if (fd == -1) { @@ -660,13 +665,16 @@ int edg_wll_NotifReceive( return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading event string");; } -/* check time */ + /****************************************************************/ + /* end of notif-interlogger message exchange */ + /****************************************************************/ + + /* check time */ gettimeofday(&check_time,0); if (decrement_timeout(&tv, start_time, check_time)) { edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); goto err; } - start_time = check_time; event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION); @@ -714,6 +722,184 @@ err: return edg_wll_Error(ctx,NULL,NULL); } +#endif +/* NotifReceive */ +{ + fd_set fds; + struct sockaddr_in a; + int recv_sock, alen; + edg_wll_Event *event = NULL; + struct timeval start_time,check_time,tv; + char *p = NULL, *ucs = NULL, + *event_char = NULL, *jobstat_char = NULL; + edg_wll_GssStatus gss_code; + + + edg_wll_ResetError(ctx); + + /* start timer */ + gettimeofday(&start_time,0); + + if (fd == -1) { + if (ctx->notifSock == -1) { + edg_wll_SetError(ctx, EINVAL, "No client socket opened."); + goto err; + } + else { + fd = ctx->notifSock; + } + } + + FD_ZERO(&fds); + FD_SET(fd,&fds); + tv.tv_sec = timeout->tv_sec; + tv.tv_usec = timeout->tv_usec; + + +select: + /* XXX - index 0 is used because of absence of connection management */ + /* to use it, support in client/connection.c needed */ + /* it is better to separate it from ctx->connPool, which is used */ + /* for outgouing messages to server */ + /* In future it should be in context, so one could use: */ + /* ctx->connPoolNotif[ctx->connPoolNotifToUse] */ + /* notif_send() & notif_receive() should then migrate to */ + /* client/connection.c and use connPool management f-cions */ + + if (ctx->connPoolNotif[0].gss.context == GSS_C_NO_CONTEXT) + { + int ret; + switch(select(fd+1, &fds, NULL, NULL, &tv)) { + case -1: + edg_wll_SetError(ctx, errno, "select() failed"); + goto err; + case 0: + edg_wll_SetError(ctx, ETIMEDOUT, "select() timeouted"); + goto err; + default: + break; + } + + /* check time */ + gettimeofday(&check_time,0); + if (decrement_timeout(&tv, start_time, check_time)) { + edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); + goto err; + } + start_time = check_time; + + alen=sizeof(a); + recv_sock = accept(fd,(struct sockaddr *)&a,&alen); + if (recv_sock <0) { + edg_wll_SetError(ctx, errno, "accept() failed"); + goto err; + } + + ret = edg_wll_gss_accept(ctx->connPoolNotif[0].gsiCred, recv_sock, + &tv, &ctx->connPoolNotif[0].gss,&gss_code); + if (ret) { + edg_wll_SetError(ctx, errno, "GSS authentication failed."); + goto err; + } + + /* check time */ + gettimeofday(&check_time,0); + if (decrement_timeout(&tv, start_time, check_time)) { + edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); + goto err; + } + start_time = check_time; + } + + + ctx->p_tmp_timeout = tv; + + /****************************************************************/ + /* Communication with notif-interlogger */ + /****************************************************************/ + + if (recv_notif(ctx)) { + if (ctx->errCode == ENOTCONN) { + /* other side (interlogger-notif) probably closed connection */ + edg_wll_ResetError(ctx); + + edg_wll_gss_close(&ctx->connPoolNotif[0].gss,NULL); + // buffer is freed in recv_notif() + + goto select; + } + else { + goto err; /* error set in recv_notif() */ + } + } + + if (send_reply(ctx)) { + goto err; /* error set in send_reply() */ + } + + p = ctx->connPoolNotif[0].buf; + p = get_string(p, &ucs); + if (p == NULL) return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading UCS"); + free(ucs); + + p = get_string(p, &event_char); + if (p == NULL) { + free(ucs); + return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading event string");; + } + + /****************************************************************/ + /* end of notif-interlogger message exchange */ + /****************************************************************/ + + /* check time */ + gettimeofday(&check_time,0); + if (decrement_timeout(&tv, start_time, check_time)) { + edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); + goto err; + } + start_time = check_time; + + event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION); + if (edg_wll_ParseNotifEvent(ctx, event_char, &event)) { + goto err; + } + + jobstat_char = edg_wll_UnescapeXML((const char *) event->notification.jobstat); + if (jobstat_char == NULL) { + edg_wll_SetError(ctx, EINVAL, "edg_wll_UnescapeXML()"); + goto err; + } + + /* fill in return values + */ + if ( edg_wll_ParseJobStat(ctx, jobstat_char, + strlen(jobstat_char), state_out)) { + goto err; + } + + *id_out = event->notification.notifId; + event->notification.notifId = NULL; + + +err: + if (event) { + edg_wll_FreeEvent(event); + // XXX - konzultovat s honikem; podle meho by to free + // mel delat uz edg_wll_FreeEvent + //free(event); + } + + free(ctx->connPoolNotif[0].buf); + ctx->connPoolNotif[0].buf = NULL; + ctx->connPoolNotif[0].bufUse = 0; + ctx->connPoolNotif[0].bufSize = 0; + + free(event_char); + free(jobstat_char); + + return edg_wll_Error(ctx,NULL,NULL); +} int edg_wll_NotifGetFd( @@ -734,8 +920,8 @@ int edg_wll_NotifCloseFd( int err; if (ctx->notifSock >= 0) { - if (ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT) { - edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, NULL); + if (ctx->connPoolNotif[0].gss.context != GSS_C_NO_CONTEXT) { + edg_wll_gss_close(&ctx->connPoolNotif[0].gss, NULL); } err = close(ctx->notifSock); ctx->notifSock = -1; diff --git a/org.glite.lb.client/src/purge.c b/org.glite.lb.client/src/purge.c index f06ebed..15f3911 100644 --- a/org.glite.lb.client/src/purge.c +++ b/org.glite.lb.client/src/purge.c @@ -365,8 +365,8 @@ int edg_wll_Purge( if (edg_wll_PurgeRequestToXML(ctx, request, &send_mess)) goto edg_wll_purge_end; - //edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_TIMEOUT, 4000); - ctx->p_tmp_timeout.tv_sec = 400; + ctx->p_tmp_timeout = ctx->p_query_timeout; + if (ctx->p_tmp_timeout.tv_sec < 600) ctx->p_tmp_timeout.tv_sec = 600; if (set_server_name_and_port(ctx, NULL)) goto edg_wll_purge_end;