Notification::Notification(void)
{
try {
- int ret = edg_wll_InitContext(&ctx);
- check_result(ret,ctx,"edg_wll_InitContext");
+ int ret = edg_wll_InitContext(&this->ctx);
+ check_result(ret,this->ctx,"edg_wll_InitContext");
+ this->notifId = NULL;
+ this->valid = 0;
} catch (Exception &e) {
STACK_ADD;
throw;
Notification::Notification(const std::string notifid_str)
{
try {
- int ret = edg_wll_InitContext(&ctx);
- check_result(ret,ctx,"edg_wll_InitContext");
- ret = edg_wll_NotifIdParse(notifid_str.c_str(),¬ifId);
- check_result(ret,ctx,"edg_wll_NotifIdParse");
+ char *host;
+ unsigned int port;
+ int ret = edg_wll_InitContext(&this->ctx);
+ check_result(ret,this->ctx,"edg_wll_InitContext");
+ ret = edg_wll_NotifIdParse(notifid_str.c_str(),&this->notifId);
+ check_result(ret,this->ctx,"edg_wll_NotifIdParse");
+ edg_wll_NotifIdGetServerParts(this->notifId,&host,&port);
+ edg_wll_SetParam(this->ctx, EDG_WLL_PARAM_NOTIF_SERVER, host);
+ edg_wll_SetParam(this->ctx, EDG_WLL_PARAM_NOTIF_SERVER_PORT, port);
+ free(host);
+ this->valid = 0;
} catch (Exception &e) {
STACK_ADD;
throw;
check_result(ret,ctx,"edg_wll_InitContext");
edg_wll_SetParam(ctx, EDG_WLL_PARAM_NOTIF_SERVER, host.c_str());
edg_wll_SetParam(ctx, EDG_WLL_PARAM_NOTIF_SERVER_PORT, port);
- ret = edg_wll_NotifIdCreate(host.c_str(),port,¬ifId);
+ this->notifId = NULL;
+ this->valid = 0;
check_result(ret,ctx,"edg_wll_NotifIdCreate");
} catch (Exception &e) {
STACK_ADD;
Notification::~Notification(void)
{
try {
- edg_wll_FreeContext(ctx);
+ edg_wll_FreeContext(this->ctx);
+ edg_wll_NotifIdFree(this->notifId);
} catch (Exception &e) {
STACK_ADD;
throw;
Notification::getNotifId(void) const
{
try {
- std::string notifid_str = edg_wll_NotifIdUnparse(notifId);
- return(notifid_str);
+ std::string notifid_str;
+ if (this->notifId != NULL) {
+ notifid_str = edg_wll_NotifIdUnparse(this->notifId);
+ return(notifid_str);
+ } else {
+ STACK_ADD;
+ throw Exception(EXCEPTION_MANDATORY, EINVAL, "notifId not known at the moment");
+ }
} catch (Exception &e) {
STACK_ADD;
throw;
time_t
Notification::getValid(void) const
{
- return(valid);
+ return(this->valid);
}
int
Notification::getFd(void) const
{
try {
- int ret = edg_wll_NotifGetFd(ctx);
- check_result(ret,ctx,"edg_wll_NotifGetFd");
+ int ret = edg_wll_NotifGetFd(this->ctx);
+ check_result(ret,this->ctx,"edg_wll_NotifGetFd");
return(ret);
} catch (Exception &e) {
STACK_ADD;
std::vector<glite::wmsutils::jobid::JobId>::iterator it;
try {
+ if (this->notifId != NULL) {
+ STACK_ADD;
+ throw Exception(EXCEPTION_MANDATORY, EINVAL, "adding jobs allowed only before registering");
+ }
for( it = jobs.begin(); it != jobs.end(); it++ ) {
if ( (*it).toString() == jobId.toString() ) {
STACK_ADD;
int removed = 0;
try {
+ if (this->notifId != NULL) {
+ STACK_ADD;
+ throw Exception(EXCEPTION_MANDATORY, EINVAL, "removing jobs allowed only before registering");
+ }
for( it = jobs.begin(); it != jobs.end(); it++ ) {
if ( (*it).toString() == jobId.toString() ) {
jobs.erase(it);
}
}
+/* XXX: obsolete, used only for debugging purposes */
+
std::string
Notification::getJobs(void)
{
void
Notification::setStates(const std::vector<glite::lb::JobStatus::Code> &jobStates)
{
+ if (this->notifId != NULL) {
+ STACK_ADD;
+ throw Exception(EXCEPTION_MANDATORY, EINVAL, "removing jobs allowed only before registering");
+ }
states = jobStates;
}
int ret = 0;
std::vector<glite::wmsutils::jobid::JobId>::iterator it;
std::vector<glite::lb::JobStatus::Code>::iterator its;
- std::vector<std::vector<glite::lb::QueryRecord> > query;
+ std::vector<std::vector<glite::lb::QueryRecord> > queryExt;
edg_wll_QueryRec **conditions = NULL;
unsigned i;
try {
+ if (this->notifId != NULL) {
+ STACK_ADD;
+ throw Exception(EXCEPTION_MANDATORY, EINVAL, "registering job allowed only once");
+ }
/* fill in the query: */
+ std::vector<glite::lb::QueryRecord> query;
for( it = jobs.begin(); it != jobs.end(); it++ ) {
- std::vector<glite::lb::QueryRecord> queryjob;
-
QueryRecord r0(QueryRecord::JOBID,QueryRecord::EQUAL,*it);
- queryjob.push_back(r0);
-
- for( its = states.begin(); its != states.end(); its++ ) {
- QueryRecord r(QueryRecord::STATUS,QueryRecord::EQUAL,*its);
- queryjob.push_back(r);
- }
+ query.push_back(r0);
+ }
+ queryExt.push_back(query);
+ query.clear();
- query.push_back(queryjob);
+ for( its = states.begin(); its != states.end(); its++ ) {
+ QueryRecord r(QueryRecord::STATUS,QueryRecord::EQUAL,*its);
+ query.push_back(r);
}
+ queryExt.push_back(query);
+
/* convert query to conditions */
- conditions = convertQueryVectorExt(query);
+ conditions = convertQueryVectorExt(queryExt);
/* register */
ret = edg_wll_NotifNew(ctx,conditions,-1,NULL,¬ifId,&valid);
check_result(ret,ctx,"edg_wll_NotifNew");
}
}
+void
+Notification::Bind(const std::string address_override)
+{
+ try {
+ if (this->notifId == NULL) {
+ STACK_ADD;
+ throw Exception(EXCEPTION_MANDATORY, EINVAL, "binding allowed only for given notifId");
+ }
+ int ret = edg_wll_NotifBind(this->ctx,this->notifId,-1,address_override.c_str(),&this->valid);
+ check_result(ret,this->ctx,"edg_wll_NotifBind");
+ }
+ catch (Exception &e) {
+ STACK_ADD;
+ throw;
+ }
+}
+
+
+
+
int Notification::receive(glite::lb::JobStatus &jobStatus,timeval &timeout)
{
+ try {
int ret = 0;
edg_wll_JobStat *status = (edg_wll_JobStat *) calloc(1,sizeof(edg_wll_JobStat));
if (status == NULL) {
throw OSException(EXCEPTION_MANDATORY, ENOMEM, "allocating jobStatus");
}
ret = edg_wll_NotifReceive(ctx,-1,&timeout,status,¬ifId);
+ if ( ret == ETIMEDOUT )
+ return 1;
check_result(ret,ctx,"edg_wll_NotifReceive");
jobStatus = JobStatus(*status);
- return(ret);
+ return 0;
+ }
+ catch (Exception &e) {
+ STACK_ADD;
+ throw;
+ }
}
EWL_END_NAMESPACE;
-#ident "$Header"
+#ident "$Header$"
#include <unistd.h>
#include <stdlib.h>
* address_override
*/
if (ctx->notifSock >= 0) {
- if (getsockname(ctx->notifSock, &a, &alen))
+ if (getsockname(ctx->notifSock, (struct sockaddr *) &a, &alen))
return edg_wll_SetError(ctx, errno, "getsockname() failed");
if ( (strcmp(inet_ntoa(a.sin_addr), name)) || (ntohs(a.sin_port) != port) ) {
// used supplied socket
ctx->notifSock = fd;
- if (getsockname(ctx->notifSock, &a, &alen))
+ if (getsockname(ctx->notifSock,(struct sockaddr *) &a, &alen))
return edg_wll_SetError(ctx, errno, "getsockname() failed");
if (a.sin_addr.s_addr == INADDR_ANY)
edg_wll_GssStatus gss_code;
- if (ctx->connPool[ctx->connToUse].buf) {
- free(ctx->connPool[ctx->connToUse].buf);
- ctx->connPool[ctx->connToUse].buf = NULL;
+ if (ctx->connPoolNotif[0].buf) {
+ free(ctx->connPoolNotif[0].buf);
+ ctx->connPoolNotif[0].buf = NULL;
}
- ctx->connPool[ctx->connToUse].bufUse = 0;
- ctx->connPool[ctx->connToUse].bufSize = 0;
+ ctx->connPoolNotif[0].bufUse = 0;
+ ctx->connPoolNotif[0].bufSize = 0;
- ret = edg_wll_gss_read_full(&ctx->connPool[ctx->connToUse].gss,
+ ret = edg_wll_gss_read_full(&ctx->connPoolNotif[0].gss,
fbuf,17, &ctx->p_tmp_timeout,&total, &gss_code);
if (ret < 0)
switch (ret) {
return edg_wll_SetError(ctx,EINVAL,"message length");
}
- ctx->connPool[ctx->connToUse].bufSize = len+1;
+ ctx->connPoolNotif[0].bufSize = len+1;
- ctx->connPool[ctx->connToUse].buf = (char *) malloc(
- ctx->connPool[ctx->connToUse].bufSize);
+ ctx->connPoolNotif[0].buf = (char *) malloc(
+ ctx->connPoolNotif[0].bufSize);
- if (!ctx->connPool[ctx->connToUse].buf) {
+ if (!ctx->connPoolNotif[0].buf) {
return edg_wll_SetError(ctx, ENOMEM, "recv_notif()");
}
- ret = edg_wll_gss_read_full(&ctx->connPool[ctx->connToUse].gss,
- ctx->connPool[ctx->connToUse].buf, len,
+ ret = edg_wll_gss_read_full(&ctx->connPoolNotif[0].gss,
+ ctx->connPoolNotif[0].buf, len,
&ctx->p_tmp_timeout,&total, &gss_code);
if (ret < 0) {
- free(ctx->connPool[ctx->connToUse].buf);
- ctx->connPool[ctx->connToUse].bufUse = 0;
- ctx->connPool[ctx->connToUse].bufSize = 0;
+ free(ctx->connPoolNotif[0].buf);
+ ctx->connPoolNotif[0].bufUse = 0;
+ ctx->connPoolNotif[0].bufSize = 0;
return edg_wll_SetError(ctx,
ret == EDG_WLL_GSS_ERROR_TIMEOUT ?
ETIMEDOUT : EDG_WLL_ERROR_GSS,
}
- ctx->connPool[ctx->connToUse].buf[len] = 0;
- ctx->connPool[ctx->connToUse].bufUse = len+1;
+ ctx->connPoolNotif[0].buf[len] = 0;
+ ctx->connPoolNotif[0].bufUse = len+1;
return edg_wll_Error(ctx,NULL,NULL);
p = put_int(p, err_code_min);
p = put_string(p, err_msg);
- ret = edg_wll_gss_write_full(&ctx->connPool[ctx->connToUse].gss,
+ ret = edg_wll_gss_write_full(&ctx->connPoolNotif[0].gss,
buf,len,&ctx->p_tmp_timeout,&total, &gss_code);
if (ret < 0) {
edg_wll_SetError(ctx,
+
int edg_wll_NotifReceive(
edg_wll_Context ctx,
int fd,
const struct timeval *timeout,
edg_wll_JobStat *state_out,
edg_wll_NotifId *id_out)
+
+/* pullup from INFN, support multiple messages from interlogger */
+#if 0
{
fd_set fds;
struct sockaddr_in a;
edg_wll_GssStatus gss_code;
+ edg_wll_ResetError(ctx);
-/* start timer */
+ /* start timer */
gettimeofday(&start_time,0);
if (fd == -1) {
return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading event string");;
}
-/* check time */
+ /****************************************************************/
+ /* end of notif-interlogger message exchange */
+ /****************************************************************/
+
+ /* check time */
gettimeofday(&check_time,0);
if (decrement_timeout(&tv, start_time, check_time)) {
edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
goto err;
}
-
start_time = check_time;
event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION);
return edg_wll_Error(ctx,NULL,NULL);
}
+#endif
+/* NotifReceive */
+{
+ fd_set fds;
+ struct sockaddr_in a;
+ int recv_sock, alen;
+ edg_wll_Event *event = NULL;
+ struct timeval start_time,check_time,tv;
+ char *p = NULL, *ucs = NULL,
+ *event_char = NULL, *jobstat_char = NULL;
+ edg_wll_GssStatus gss_code;
+
+
+ edg_wll_ResetError(ctx);
+
+ /* start timer */
+ gettimeofday(&start_time,0);
+
+ if (fd == -1) {
+ if (ctx->notifSock == -1) {
+ edg_wll_SetError(ctx, EINVAL, "No client socket opened.");
+ goto err;
+ }
+ else {
+ fd = ctx->notifSock;
+ }
+ }
+
+ FD_ZERO(&fds);
+ FD_SET(fd,&fds);
+ tv.tv_sec = timeout->tv_sec;
+ tv.tv_usec = timeout->tv_usec;
+
+
+select:
+ /* XXX - index 0 is used because of absence of connection management */
+ /* to use it, support in client/connection.c needed */
+ /* it is better to separate it from ctx->connPool, which is used */
+ /* for outgouing messages to server */
+ /* In future it should be in context, so one could use: */
+ /* ctx->connPoolNotif[ctx->connPoolNotifToUse] */
+ /* notif_send() & notif_receive() should then migrate to */
+ /* client/connection.c and use connPool management f-cions */
+
+ if (ctx->connPoolNotif[0].gss.context == GSS_C_NO_CONTEXT)
+ {
+ int ret;
+ switch(select(fd+1, &fds, NULL, NULL, &tv)) {
+ case -1:
+ edg_wll_SetError(ctx, errno, "select() failed");
+ goto err;
+ case 0:
+ edg_wll_SetError(ctx, ETIMEDOUT, "select() timeouted");
+ goto err;
+ default:
+ break;
+ }
+
+ /* check time */
+ gettimeofday(&check_time,0);
+ if (decrement_timeout(&tv, start_time, check_time)) {
+ edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
+ goto err;
+ }
+ start_time = check_time;
+
+ alen=sizeof(a);
+ recv_sock = accept(fd,(struct sockaddr *)&a,&alen);
+ if (recv_sock <0) {
+ edg_wll_SetError(ctx, errno, "accept() failed");
+ goto err;
+ }
+
+ ret = edg_wll_gss_accept(ctx->connPoolNotif[0].gsiCred, recv_sock,
+ &tv, &ctx->connPoolNotif[0].gss,&gss_code);
+ if (ret) {
+ edg_wll_SetError(ctx, errno, "GSS authentication failed.");
+ goto err;
+ }
+
+ /* check time */
+ gettimeofday(&check_time,0);
+ if (decrement_timeout(&tv, start_time, check_time)) {
+ edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
+ goto err;
+ }
+ start_time = check_time;
+ }
+
+
+ ctx->p_tmp_timeout = tv;
+
+ /****************************************************************/
+ /* Communication with notif-interlogger */
+ /****************************************************************/
+
+ if (recv_notif(ctx)) {
+ if (ctx->errCode == ENOTCONN) {
+ /* other side (interlogger-notif) probably closed connection */
+ edg_wll_ResetError(ctx);
+
+ edg_wll_gss_close(&ctx->connPoolNotif[0].gss,NULL);
+ // buffer is freed in recv_notif()
+
+ goto select;
+ }
+ else {
+ goto err; /* error set in recv_notif() */
+ }
+ }
+
+ if (send_reply(ctx)) {
+ goto err; /* error set in send_reply() */
+ }
+
+ p = ctx->connPoolNotif[0].buf;
+ p = get_string(p, &ucs);
+ if (p == NULL) return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading UCS");
+ free(ucs);
+
+ p = get_string(p, &event_char);
+ if (p == NULL) {
+ free(ucs);
+ return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading event string");;
+ }
+
+ /****************************************************************/
+ /* end of notif-interlogger message exchange */
+ /****************************************************************/
+
+ /* check time */
+ gettimeofday(&check_time,0);
+ if (decrement_timeout(&tv, start_time, check_time)) {
+ edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
+ goto err;
+ }
+ start_time = check_time;
+
+ event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION);
+ if (edg_wll_ParseNotifEvent(ctx, event_char, &event)) {
+ goto err;
+ }
+
+ jobstat_char = edg_wll_UnescapeXML((const char *) event->notification.jobstat);
+ if (jobstat_char == NULL) {
+ edg_wll_SetError(ctx, EINVAL, "edg_wll_UnescapeXML()");
+ goto err;
+ }
+
+ /* fill in return values
+ */
+ if ( edg_wll_ParseJobStat(ctx, jobstat_char,
+ strlen(jobstat_char), state_out)) {
+ goto err;
+ }
+
+ *id_out = event->notification.notifId;
+ event->notification.notifId = NULL;
+
+
+err:
+ if (event) {
+ edg_wll_FreeEvent(event);
+ // XXX - konzultovat s honikem; podle meho by to free
+ // mel delat uz edg_wll_FreeEvent
+ //free(event);
+ }
+
+ free(ctx->connPoolNotif[0].buf);
+ ctx->connPoolNotif[0].buf = NULL;
+ ctx->connPoolNotif[0].bufUse = 0;
+ ctx->connPoolNotif[0].bufSize = 0;
+
+ free(event_char);
+ free(jobstat_char);
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
int edg_wll_NotifGetFd(
int err;
if (ctx->notifSock >= 0) {
- if (ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT) {
- edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, NULL);
+ if (ctx->connPoolNotif[0].gss.context != GSS_C_NO_CONTEXT) {
+ edg_wll_gss_close(&ctx->connPoolNotif[0].gss, NULL);
}
err = close(ctx->notifSock);
ctx->notifSock = -1;