From e9e03f12b6af429f864b985e44e864d2b5651c0b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Mon, 2 Feb 2009 16:30:50 +0000 Subject: [PATCH] implementation of notification pool - seems to work for basic scenario but needs extensive testing --- org.glite.lb.client/interface/connection.h | 2 + org.glite.lb.client/interface/notification.h | 13 + org.glite.lb.client/src/connection.c | 212 ++++++++++- org.glite.lb.client/src/notification.c | 519 +++++++++++---------------- org.glite.lb.client/src/notify.c | 4 +- org.glite.lb.common/interface/connpool.h | 15 +- org.glite.lb.common/interface/context-int.h | 2 +- org.glite.lb.common/src/connpool.c | 19 +- org.glite.lb.common/src/context.c | 23 +- 9 files changed, 479 insertions(+), 330 deletions(-) diff --git a/org.glite.lb.client/interface/connection.h b/org.glite.lb.client/interface/connection.h index f8eca35..5bd330d 100644 --- a/org.glite.lb.client/interface/connection.h +++ b/org.glite.lb.client/interface/connection.h @@ -5,6 +5,7 @@ int edg_wll_close(edg_wll_Context ctx,int *); int edg_wll_open(edg_wll_Context ctx,int *); +int edg_wll_accept(edg_wll_Context ctx, int fd); int edg_wll_http_send_recv(edg_wll_Context, char *, const char * const *, char *, char **, char ***, char **); int edg_wll_close_proxy(edg_wll_Context ctx); @@ -17,6 +18,7 @@ int ConnectionIndex(edg_wll_Context ctx, const char *name, int port); int AddConnection(edg_wll_Context ctx, char *name, int port); int ReleaseConnection(edg_wll_Context ctx, char *name, int port); int CloseConnection(edg_wll_Context ctx, int conn_index); +int CloseConnectionNotif(edg_wll_Context ctx); #define PROXY_CONNECT_RETRY 10 /* ms */ diff --git a/org.glite.lb.client/interface/notification.h b/org.glite.lb.client/interface/notification.h index 5d8a021..add5370 100644 --- a/org.glite.lb.client/interface/notification.h +++ b/org.glite.lb.client/interface/notification.h @@ -18,6 +18,10 @@ extern "C" { #endif +/** allocate memory for notification buffer using block of size XY +*/ +#define NOTIF_POOL_BUFFER_BLOCK_SIZE 16384 + /** * \defgroup notifications Notifications handling * \brief Notifications handling. @@ -167,6 +171,15 @@ int edg_wll_NotifCloseFd( edg_wll_Context context ); +/** Close all connections in notifPool and free it. + * \param[in,out] context context to work with + */ + +int edg_wll_NotifClosePool( + edg_wll_Context context +); + + /* *@} end of group */ diff --git a/org.glite.lb.client/src/connection.c b/org.glite.lb.client/src/connection.c index ad5c168..f8ca315 100644 --- a/org.glite.lb.client/src/connection.c +++ b/org.glite.lb.client/src/connection.c @@ -49,6 +49,35 @@ int CloseConnection(edg_wll_Context ctx, int conn_index) } +int CloseConnectionNotif(edg_wll_Context ctx) +{ + /* close connection and free its structures */ + int cIndex,ret = 0; + + + cIndex = ctx->connNotif->connToUse; + + assert(ctx->connNotif->connOpened); + + if (ctx->connNotif->connPool[cIndex].gss.sock >= 0) + edg_wll_gss_close(&ctx->connNotif->connPool[cIndex].gss, &ctx->p_tmp_timeout); // always returns 0 + if (ctx->connNotif->connPool[cIndex].gsiCred != NULL) + if ( (ret = edg_wll_gss_release_cred(&ctx->connNotif->connPool[cIndex].gsiCred, NULL)) ) + edg_wll_SetError(ctx,ret,"error in edg_wll_gss_release_cred()"); + free(ctx->connNotif->connPool[cIndex].peerName); + free(ctx->connNotif->connPool[cIndex].buf); + free(ctx->connNotif->connPool[cIndex].bufOut); + free(ctx->connNotif->connPool[cIndex].certfile); + + memset(ctx->connNotif->connPool + cIndex, 0, sizeof(edg_wll_ConnPool)); + ctx->connNotif->connPool[cIndex].gss.sock = -1; + + ctx->connNotif->connOpened--; + + return edg_wll_Error(ctx,NULL,NULL); +} + + int ConnectionIndex(edg_wll_Context ctx, const char *name, int port) { @@ -98,7 +127,6 @@ int ConnectionIndex(edg_wll_Context ctx, const char *name, int port) } - int AddConnection(edg_wll_Context ctx, char *name, int port) { int i,index = -1; @@ -125,6 +153,25 @@ int AddConnection(edg_wll_Context ctx, char *name, int port) } +int SetFreeConnectionIndexNotif(edg_wll_Context ctx) +{ + int i; + + + ctx->connNotif->connToUse = -1; + + for (i = 0; i < ctx->connNotif->poolSize; i++) + if (ctx->connNotif->connPool[i].gss.sock == -1) { + ctx->connNotif->connToUse = i; + ctx->connNotif->connOpened++; + assert(!ctx->connNotif->connPool[i].buf); + break; + } + + return ctx->connNotif->connToUse; +} + + int ReleaseConnection(edg_wll_Context ctx, char *name, int port) { @@ -165,6 +212,34 @@ int ReleaseConnection(edg_wll_Context ctx, char *name, int port) } +int ReleaseConnectionNotif(edg_wll_Context ctx) +{ + int i, index = 0; + long min; + + + edg_wll_ResetError(ctx); + if (ctx->connNotif->connOpened == 0) return 0; /* nothing to release */ + + min = ctx->connNotif->connPool[0].lastUsed.tv_sec; + + /* free the oldest (unlocked) connection */ + for (i=0; iconnNotif->poolSize; i++) { + assert(ctx->connNotif->connPool[i].gss.sock > -1); // Full pool expected - accept non-NULL values only + + if (ctx->connections->connPool[i].lastUsed.tv_sec < min) { + min = ctx->connections->connPool[i].lastUsed.tv_sec; + index = i; + } + } + + ctx->connNotif->connToUse = index; + CloseConnectionNotif(ctx); + + return edg_wll_Error(ctx,NULL,NULL); +} + + int edg_wll_close(edg_wll_Context ctx, int* connToUse) { @@ -579,3 +654,138 @@ int edg_wll_http_send_recv_proxy( return edg_wll_Error(ctx,NULL,NULL); } + + +int edg_wll_accept(edg_wll_Context ctx, int fd) +{ + int recv_sock; + edg_wll_GssStatus gss_stat; + time_t lifetime = 0; + struct stat statinfo; + int acquire_cred = 0; + struct sockaddr_in a; + socklen_t alen; + edg_wll_GssStatus gss_code; + + + edg_wll_ResetError(ctx); + assert(fd > 0); + + alen=sizeof(a); + recv_sock = accept(fd,(struct sockaddr *)&a,&alen); + if (recv_sock <0) { + edg_wll_SetError(ctx, errno, "accept() failed"); + goto err; + } + + if (ctx->connNotif->connOpened == ctx->connNotif->poolSize) + if (ReleaseConnectionNotif(ctx)) goto err; + + if (SetFreeConnectionIndexNotif(ctx) < 0) { + edg_wll_SetError(ctx,EAGAIN,"connection pool size exceeded"); + goto err; + } + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Connection with fd %d accepted. %d in the pool\n",>srvName,ctx->srvPort,ctx->connNotif->connToUse); + #endif + + + // In case of using a specifically given cert file, stat it and check for the need to reauthenticate + if (ctx->p_proxy_filename || ctx->p_cert_filename) { + if (ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile) { // Has the file been stated before? + stat(ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, &statinfo); + if (ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile->st_mtime != statinfo.st_mtime) + acquire_cred = 1; // File has been modified. Need to acquire new creds. + } + else acquire_cred = 1; + } + + // Check if credentials exist. If so, check validity + if (ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred) { + lifetime = ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred->lifetime; + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf ("Credential exists, lifetime: %d\n", lifetime); + #endif + if (!lifetime) acquire_cred = 1; // Credentials exist and lifetime is OK. No need to authenticate. + } + else { + acquire_cred = 1; // No credentials exist so far, acquire. + } + + + if (acquire_cred) { + edg_wll_GssCred newcred = NULL; + if (edg_wll_gss_acquire_cred_gsi( + ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, + ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename, + &newcred, &gss_stat)) { + edg_wll_SetErrorGss(ctx, "failed to load GSI credentials", &gss_stat); + goto err; + } else { + if (ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred != NULL) + edg_wll_gss_release_cred(&ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred,&gss_stat); + ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred = newcred; + newcred = NULL; + + // Credentials Acquired successfully. Storing file identification. + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Cert file: %s\n", ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename); + #endif + + if (ctx->p_proxy_filename || ctx->p_cert_filename) { + if (!ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile) // Allocate space for certfile stats + ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile = + (struct stat*)calloc(1, sizeof(struct stat)); + stat(ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile); + } + } + } + + assert(ctx->connNotif->connPool[ctx->connNotif->connToUse].gss.context == NULL); + + switch ( edg_wll_gss_accept(ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred, recv_sock, + &ctx->p_tmp_timeout, &ctx->connNotif->connPool[ctx->connNotif->connToUse].gss,&gss_code)) { + + case EDG_WLL_GSS_OK: + break; + case EDG_WLL_GSS_ERROR_ERRNO: + edg_wll_SetError(ctx,errno,"failed to receive notification"); + goto err; + case EDG_WLL_GSS_ERROR_GSS: + edg_wll_SetErrorGss(ctx, "failed to authenticate sender", &gss_code); + goto err; + case EDG_WLL_GSS_ERROR_HERRNO: + { const char *msg1; + char *msg2; + msg1 = hstrerror(errno); + asprintf(&msg2, "edg_wll_gss_connect(): %s", msg1); + edg_wll_SetError(ctx,EDG_WLL_ERROR_DNS, msg2); + free(msg2); + } + break; + case EDG_WLL_GSS_ERROR_EOF: + edg_wll_SetError(ctx,ECONNREFUSED,"sender closed the connection"); + goto err; + case EDG_WLL_GSS_ERROR_TIMEOUT: + edg_wll_SetError(ctx,ETIMEDOUT,"accepting notification"); + goto err; + default: + edg_wll_SetError(ctx, ENOTCONN, "failed to accept notification"); + goto err; + } + + return edg_wll_Error(ctx,NULL,NULL); + + +err: + /* some error occured; close created connection + * and free all fields in connPool[ctx->connNotif->connToUse] */ + if (ctx->connNotif->connToUse >= 0) { + CloseConnectionNotif(ctx); + } + + + return edg_wll_Error(ctx,NULL,NULL); +} + diff --git a/org.glite.lb.client/src/notification.c b/org.glite.lb.client/src/notification.c index 32fac6f..469c398 100644 --- a/org.glite.lb.client/src/notification.c +++ b/org.glite.lb.client/src/notification.c @@ -10,6 +10,7 @@ #include #include #include +#include #include "glite/security/glite_gss.h" #include "glite/lbu/escape.h" @@ -446,90 +447,127 @@ err: } +/* read all data available on active notif connection */ +static int read_data(edg_wll_Context ctx) +{ + edg_wll_GssStatus gss_code; + int ret; + edg_wll_ConnPool *notif = &(ctx->connNotif->connPool[ctx->connNotif->connToUse]); + + + do { + if (notif->bufUse == notif->bufSize) { + notif->bufSize += NOTIF_POOL_BUFFER_BLOCK_SIZE; + notif->buf = realloc(notif->buf, notif->bufSize); + if (!notif->buf) + return edg_wll_SetError(ctx, ENOMEM, "read_data()"); + } + + ret = edg_wll_gss_read(&(notif->gss), notif->buf + notif->bufUse, + notif->bufSize - notif->bufUse, &ctx->p_tmp_timeout, &gss_code); + + if (ret < 0) { + switch(ret) { + case EDG_WLL_GSS_ERROR_TIMEOUT: + edg_wll_SetError(ctx, ETIMEDOUT, "read message"); + break; + case EDG_WLL_GSS_ERROR_EOF: + edg_wll_SetError(ctx, ENOTCONN, NULL); + break; + default: + edg_wll_SetError(ctx, EDG_WLL_ERROR_GSS, "read message"); + break; + } + goto err; + } + notif->bufUse += ret; + + } while (notif->bufSize == notif->bufUse); + +err: + return edg_wll_Error(ctx, NULL, NULL); +} + -static int gss_reader(void *user_data, char *buffer, int max_len) +/* copies bytes_to_copy chars from pool buffer to given _allocated_ working buffer + */ +static int buffer_reader(void *user_data, char *buffer, const int bytes_to_copy) { - edg_wll_GssStatus gss_code; - edg_wll_Context tmp_ctx = (edg_wll_Context)user_data; - int ret; - size_t len; - - ret = edg_wll_gss_read_full(&tmp_ctx->connPoolNotif[0].gss, - buffer, max_len, - &tmp_ctx->p_tmp_timeout, - &len, &gss_code); - if(ret < 0) - switch(ret) { - case EDG_WLL_GSS_ERROR_TIMEOUT: - edg_wll_SetError(tmp_ctx, ETIMEDOUT, "read message"); - break; - case EDG_WLL_GSS_ERROR_EOF: - edg_wll_SetError(tmp_ctx, ENOTCONN, NULL); - break; - default: - edg_wll_SetError(tmp_ctx, EDG_WLL_ERROR_GSS, "read message"); - break; - } - - return(ret); + edg_wll_Context tmp_ctx = (edg_wll_Context)user_data; + + + if (tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].bufUse < bytes_to_copy) + return(-1); + + memcpy(buffer,tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].buf + + tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].bufPtr, bytes_to_copy); + tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].bufPtr += bytes_to_copy; + + return(bytes_to_copy); } -static int recv_notif(edg_wll_Context ctx) +static int recv_notif(edg_wll_Context ctx, char **message) { int len; - if (ctx->connPoolNotif[0].buf) { - free(ctx->connPoolNotif[0].buf); - ctx->connPoolNotif[0].buf = NULL; - } - ctx->connPoolNotif[0].bufUse = 0; - ctx->connPoolNotif[0].bufSize = 0; - - len = read_il_data(ctx, &ctx->connPoolNotif[0].buf, gss_reader); + + len = read_il_data(ctx, message, buffer_reader); if(len < 0) return(len); - ctx->connPoolNotif[0].bufSize = len+1; - ctx->connPoolNotif[0].bufUse = len+1; - - return edg_wll_Error(ctx,NULL,NULL); } - -static int send_reply(const edg_wll_Context ctx) +static int prepare_reply(const edg_wll_Context ctx) { - int ret, len, err_code, err_code_min = 0; - char *buf, *err_msg = NULL; - size_t total; - edg_wll_GssStatus gss_code; + int len, err_code, err_code_min = 0; + char *err_msg = NULL; err_code = edg_wll_Error(ctx,NULL,&err_msg); if (!err_msg) err_msg=strdup("OK"); - len = encode_il_reply(&buf, err_code, err_code_min, err_msg); + len = encode_il_reply(&(ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut), + err_code, err_code_min, err_msg); + if(len < 0) { edg_wll_SetError(ctx,E2BIG,"create_reply()"); goto err; } - ret = edg_wll_gss_write_full(&ctx->connPoolNotif[0].gss, - buf,len,&ctx->p_tmp_timeout,&total, &gss_code); + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSizeOut = len; +err: + free(err_msg); + return edg_wll_Error(ctx,NULL,NULL); +} + + +// send pool output buffer nonblockingly +// XXX: missing partial-sent logic (gss_write does not return written bytes count) +static int send_reply(const edg_wll_Context ctx) +{ + int ret; + edg_wll_GssStatus gss_code; + + + ret = edg_wll_gss_write(&ctx->connNotif->connPool[ctx->connNotif->connToUse].gss, + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut, + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSizeOut, &ctx->p_tmp_timeout, &gss_code); + if (ret < 0) { + // edg_wll_gss_write does not return how many bytes were written, so + // any error is fatal for us and we can close connection edg_wll_SetError(ctx, ret == EDG_WLL_GSS_ERROR_TIMEOUT ? ETIMEDOUT : EDG_WLL_ERROR_GSS, - "write reply"); + "send_reply()"); goto err; } err: - if(buf) free(buf); - free(err_msg); return edg_wll_Error(ctx,NULL,NULL); } @@ -543,321 +581,168 @@ int edg_wll_NotifReceive( edg_wll_JobStat *state_out, edg_wll_NotifId *id_out) -/* pullup from INFN, support multiple messages from interlogger */ -#if 0 +/* NotifReceive */ { - fd_set fds; - struct sockaddr_in a; - int recv_sock, alen; + int i, j, fd_num = ctx->connNotif->connOpened + 1; + struct _fd_map { + struct pollfd pollfds[fd_num]; + int index[fd_num]; + } fd_map; edg_wll_Event *event = NULL; - struct timeval start_time,check_time,tv; - char *p = NULL, *ucs = NULL, - *event_char = NULL, *jobstat_char = NULL; - int ret; - edg_wll_GssStatus gss_code; + struct timeval start_time,check_time; + char *event_char = NULL, *jobstat_char = NULL, *message = NULL; edg_wll_ResetError(ctx); + ctx->p_tmp_timeout = *timeout; /* start timer */ gettimeofday(&start_time,0); - + if (fd == -1) { if (ctx->notifSock == -1) { - edg_wll_SetError(ctx, EINVAL, "No client socket opened."); - goto err; + return edg_wll_SetError(ctx, EINVAL, "No client socket opened."); } else { fd = ctx->notifSock; } } - FD_ZERO(&fds); - FD_SET(fd,&fds); + fd_map.pollfds[0].fd = fd; + fd_map.pollfds[0].events = POLLIN; + fd_map.index[0] = -1; - tv.tv_sec = timeout->tv_sec; - tv.tv_usec = timeout->tv_usec; + j = 1; + for (i=0; i < ctx->connNotif->poolSize; i++) { + if (ctx->connNotif->connPool[i].gss.sock != -1) { + fd_map.pollfds[j].fd = ctx->connNotif->connPool[i].gss.sock; + fd_map.pollfds[j].events = POLLIN; + fd_map.index[j] = i; + j++; + } + } - switch(select(fd+1, &fds, NULL, NULL, &tv)) { + /* XXX notif_send() & notif_receive() should then migrate to */ + /* client/connection.c and use connPool management f-cions */ + + /* XXX: long-lived contexts may have problems, TODO implement credential reload */ + + switch(poll(fd_map.pollfds, fd_num, ctx->p_tmp_timeout.tv_sec*1000+ctx->p_tmp_timeout.tv_usec/1000)) { case -1: - edg_wll_SetError(ctx, errno, "select() failed"); + edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: poll() failed"); goto err; case 0: - edg_wll_SetError(ctx, ETIMEDOUT, "select() timeouted"); + edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive: poll() timed out"); goto err; default: + for (i=0; i < fd_num; i++) { + if ((fd_map.pollfds[i].revents & POLLIN)) { + break; + } + } + if (fd_num == i) { + /* no events on any socket */ + edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: error on filedescriptor"); + goto err; + } break; } -/* check time */ + /* check time */ gettimeofday(&check_time,0); - if (decrement_timeout(&tv, start_time, check_time)) { + if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) { edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); goto err; } - - start_time = check_time; - - alen=sizeof(a); - recv_sock = accept(fd,&a,&alen); - if (recv_sock <0) { - edg_wll_SetError(ctx, errno, "accept() failed"); - goto err; - } - ret = edg_wll_gss_accept(ctx->connPool[ctx->connToUse].gsiCred, recv_sock, - &tv, &ctx->connPool[ctx->connToUse].gss, &gss_code); + // there is some incomming connection(s) + // XXX: what has higher priority? new connection, or data on active connection ? - if (ret) { - edg_wll_SetError(ctx, errno, "GSS authentication failed."); - goto err; + if (fd_map.pollfds[0].revents & POLLIN) { /* new connection */ + start_time = check_time; + + if (edg_wll_accept(ctx,fd)) goto err; + + /* check time */ + gettimeofday(&check_time,0); + if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) { + edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); + goto err; + } } - -/* check time */ - gettimeofday(&check_time,0); - if (decrement_timeout(&tv, start_time, check_time)) { - edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); - goto err; + else { /* data on some of active (NotifPool) connections */ + // XXX: if data arrives for more connections, which one serve first? + // poor-man solution - take first one on which data arrived + for (i=1; i < fd_num; i++) { + if (fd_map.pollfds[i].revents & POLLIN) { + ctx->connNotif->connToUse = fd_map.index[i]; + break; + } + } + assert(i < fd_num); } - start_time = check_time; + /****************************************************************/ + /* Communication with notif-interlogger */ + /****************************************************************/ + start_time = check_time; - ctx->p_tmp_timeout = tv; - - if (recv_notif(ctx)) { - /* error set in recv_notif() */ - goto err; - } - - if (send_reply(ctx)) { - /* error set in send_reply() */ + if (read_data(ctx)) { + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtr = 0; goto err; - } - - p = ctx->connPool[ctx->connToUse].buf; - p = get_string(p, &ucs); - if (p == NULL) return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading UCS"); - free(ucs); - - p = get_string(p, &event_char); - if (p == NULL) { - free(ucs); - return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading event string");; } - - /****************************************************************/ - /* end of notif-interlogger message exchange */ - /****************************************************************/ - + /* check time */ gettimeofday(&check_time,0); - if (decrement_timeout(&tv, start_time, check_time)) { + if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) { edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); goto err; } start_time = check_time; - - event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION); - if (edg_wll_ParseNotifEvent(ctx, event_char, &event)) { - goto err; - } - jobstat_char = glite_lbu_UnescapeXML((const char *) event->notification.jobstat); - if (jobstat_char == NULL) { - edg_wll_SetError(ctx, EINVAL, "glite_lbu_UnescapeXML()"); - goto err; - } - - /* fill in return values - */ - if ( edg_wll_ParseJobStat(ctx, jobstat_char, - strlen(jobstat_char), state_out)) { + if (recv_notif(ctx, &message) < 0) { + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtr = 0; goto err; } - - if ( id_out ) { - *id_out = event->notification.notifId; - event->notification.notifId = NULL; - } - - -err: - if (event) { - edg_wll_FreeEvent(event); - // XXX - konzultovat s honikem; podle meho by to free - // mel delat uz edg_wll_FreeEvent - //free(event); - } - - free(ctx->connPool[ctx->connToUse].buf); - ctx->connPool[ctx->connToUse].buf = NULL; - ctx->connPool[ctx->connToUse].bufUse = 0; - ctx->connPool[ctx->connToUse].bufSize = 0; - - free(event_char); - free(jobstat_char); - // XXX - // konzultovat s Danem - /* Dan: ??? */ - edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, NULL); - - return edg_wll_Error(ctx,NULL,NULL); -} -#endif -/* NotifReceive */ -{ - struct pollfd pollfds[1]; - struct sockaddr_in a; - int recv_sock; - socklen_t alen; - edg_wll_Event *event = NULL; - struct timeval start_time,check_time,tv; - char *event_char = NULL, *jobstat_char = NULL; - edg_wll_GssStatus gss_code; - + /* receive message complete, free input buffer */ + free(ctx->connNotif->connPool[ctx->connNotif->connToUse].buf); + ctx->connNotif->connPool[ctx->connNotif->connToUse].buf = NULL; + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtr = 0; + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufUse = 0; + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSize = 0; - edg_wll_ResetError(ctx); - /* start timer */ - gettimeofday(&start_time,0); - - if (fd == -1) { - if (ctx->notifSock == -1) { - edg_wll_SetError(ctx, EINVAL, "No client socket opened."); - goto err; - } - else { - fd = ctx->notifSock; - } + if (prepare_reply(ctx) || send_reply(ctx)) { + // fatal error + // XXX: output buffer not used between edg_wll_NotifReceive() calls + CloseConnectionNotif(ctx); + goto err; } - - pollfds[0].fd = fd; - pollfds[0].events = POLLIN; - tv.tv_sec = timeout->tv_sec; - tv.tv_usec = timeout->tv_usec; - - -select: - /* XXX - index 0 is used because of absence of connection management */ - /* to use it, support in client/connection.c needed */ - /* it is better to separate it from ctx->connPool, which is used */ - /* for outgouing messages to server */ - /* In future it should be in context, so one could use: */ - /* ctx->connPoolNotif[ctx->connPoolNotifToUse] */ - /* notif_send() & notif_receive() should then migrate to */ - /* client/connection.c and use connPool management f-cions */ - /* XXX: long-lived contexts may have problems, TODO implement credential reload */ - - if (!ctx->connPoolNotif[0].gsiCred) { - if (edg_wll_gss_acquire_cred_gsi( - ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, - ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename, - &ctx->connPoolNotif[0].gsiCred,&gss_code)) - { - edg_wll_SetErrorGss(ctx,"failed aquiring credentials",&gss_code); - goto err; - } + /* check time */ + gettimeofday(&check_time,0); + if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) { + edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); + goto err; } - - if (ctx->connPoolNotif[0].gss.context == NULL) - { - int ret; - switch(poll(pollfds, 1, tv.tv_sec*1000+tv.tv_usec/1000)) { - case -1: - edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: poll() failed"); - goto err; - case 0: - edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive: poll() timed out"); - goto err; - default: - if (!(pollfds[0].revents & POLLIN)) { - edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: error on filedescriptor"); - goto err; - } - break; - } - - /* check time */ - gettimeofday(&check_time,0); - if (decrement_timeout(&tv, start_time, check_time)) { - edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); - goto err; - } - start_time = check_time; - - alen=sizeof(a); - recv_sock = accept(fd,(struct sockaddr *)&a,&alen); - if (recv_sock <0) { - edg_wll_SetError(ctx, errno, "accept() failed"); - goto err; - } - - ret = edg_wll_gss_accept(ctx->connPoolNotif[0].gsiCred, recv_sock, - &tv, &ctx->connPoolNotif[0].gss,&gss_code); - - switch (ret) { - case EDG_WLL_GSS_OK: - break; - case EDG_WLL_GSS_ERROR_ERRNO: - edg_wll_SetError(ctx,errno,"failed to receive notification"); - goto err; - case EDG_WLL_GSS_ERROR_GSS: - edg_wll_SetErrorGss(ctx, "failed to authenticate sender", &gss_code); - goto err; - case EDG_WLL_GSS_ERROR_EOF: - edg_wll_SetError(ctx,ECONNREFUSED,"sender closed the connection"); - goto err; - case EDG_WLL_GSS_ERROR_TIMEOUT: - edg_wll_SetError(ctx,ETIMEDOUT,"accepting notification"); - goto err; - default: - edg_wll_SetError(ctx, ENOTCONN, "failed to accept notification"); - goto err; - } - - /* check time */ - gettimeofday(&check_time,0); - if (decrement_timeout(&tv, start_time, check_time)) { - edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); - goto err; - } - start_time = check_time; - } + start_time = check_time; - - ctx->p_tmp_timeout = tv; - - /****************************************************************/ - /* Communication with notif-interlogger */ - /****************************************************************/ - - if (recv_notif(ctx)) { - if (ctx->errCode == ENOTCONN) { - /* other side (interlogger-notif) probably closed connection */ - edg_wll_ResetError(ctx); - - edg_wll_gss_close(&ctx->connPoolNotif[0].gss,NULL); - // buffer is freed in recv_notif() - - goto select; - } - else { - goto err; /* error set in recv_notif() */ - } - } + /* response sent, free output buffer */ + free(ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut); + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut = NULL; + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufUseOut = 0; + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSizeOut = 0; + ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtrOut = 0; - if (send_reply(ctx)) { - goto err; /* error set in send_reply() */ - } - { il_octet_string_t ev; - if(decode_il_msg(&ev, ctx->connPoolNotif[0].buf) < 0) + if ( decode_il_msg(&ev, message) < 0 ) { + CloseConnectionNotif(ctx); return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "decoding event string"); + } event_char = ev.data; } /****************************************************************/ @@ -866,7 +751,7 @@ select: /* check time */ gettimeofday(&check_time,0); - if (decrement_timeout(&tv, start_time, check_time)) { + if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) { edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()"); goto err; } @@ -889,7 +774,7 @@ select: strlen(jobstat_char), state_out)) { goto err; } - + if (id_out) { *id_out = event->notification.notifId; event->notification.notifId = NULL; @@ -903,11 +788,6 @@ err: //free(event); } - free(ctx->connPoolNotif[0].buf); - ctx->connPoolNotif[0].buf = NULL; - ctx->connPoolNotif[0].bufUse = 0; - ctx->connPoolNotif[0].bufSize = 0; - free(event_char); free(jobstat_char); @@ -933,9 +813,6 @@ int edg_wll_NotifCloseFd( int err; if (ctx->notifSock >= 0) { - if (ctx->connPoolNotif[0].gss.context != NULL) { - edg_wll_gss_close(&ctx->connPoolNotif[0].gss, NULL); - } err = close(ctx->notifSock); ctx->notifSock = -1; @@ -945,3 +822,17 @@ int edg_wll_NotifCloseFd( return edg_wll_Error(ctx,NULL,NULL); } + +int edg_wll_NotifClosePool( + edg_wll_Context ctx) +{ + if (ctx->connNotif->connOpened) { + for (ctx->connNotif->connToUse=0; ctx->connNotif->connToUse < ctx->connNotif->poolSize; + ctx->connNotif->connToUse++) { + if (ctx->connNotif->connPool[ctx->connNotif->connToUse].gss.sock != -1) CloseConnectionNotif(ctx); + + } + } + + return edg_wll_Error(ctx,NULL,NULL); +} diff --git a/org.glite.lb.client/src/notify.c b/org.glite.lb.client/src/notify.c index d75b1ee..6e607b3 100644 --- a/org.glite.lb.client/src/notify.c +++ b/org.glite.lb.client/src/notify.c @@ -203,7 +203,7 @@ int main(int argc,char **argv) edg_wll_NotifId nid = NULL; int c; char *field_arg = "owner",*err; - time_t client_tout = time(NULL) + 60; + time_t client_tout = time(NULL) + 600; int refresh = 0; struct timeval tout; time_t opt_valid = 0,do_refresh = client_tout + 999999999,now; @@ -269,6 +269,7 @@ int main(int argc,char **argv) tout.tv_usec = 0; edg_wll_FreeStatus(&stat); + stat.state = EDG_WLL_JOB_UNDEF; if ( (err = edg_wll_NotifReceive(ctx, sock, &tout, &stat, &recv_nid)) ) { edg_wll_NotifIdFree(recv_nid); @@ -324,6 +325,7 @@ receive_err: if (stat.state != EDG_WLL_JOB_UNDEF) edg_wll_FreeStatus(&stat); if (nid) edg_wll_NotifIdFree(nid); edg_wll_NotifCloseFd(ctx); + edg_wll_NotifClosePool(ctx); if (edg_wll_Error(ctx,&errt,&errd)) fprintf(stderr, "%s: %s (%s)\n", me, errt, errd); diff --git a/org.glite.lb.common/interface/connpool.h b/org.glite.lb.common/interface/connpool.h index 3393349..8b76f40 100644 --- a/org.glite.lb.common/interface/connpool.h +++ b/org.glite.lb.common/interface/connpool.h @@ -26,9 +26,10 @@ extern "C" { #ifndef EDG_WLL_CONNPOOL_DECLARED #define EDG_WLL_CONNPOOL_DECLARED 1 -#define GLITE_LB_COMMON_CONNPOOL_SIZE 50 +#define GLITE_LB_COMMON_CONNPOOL_SIZE 50 +#define GLITE_LB_COMMON_CONNPOOL_NOTIF_SIZE 50 -glite_lb_padded_struct(_edg_wll_ConnPool,15, +glite_lb_padded_struct(_edg_wll_ConnPool,25, /* address and port where we are connected to */ char *peerName; unsigned int peerPort; @@ -37,6 +38,7 @@ glite_lb_padded_struct(_edg_wll_ConnPool,15, edg_wll_GssCred gsiCred; edg_wll_GssConnection gss; char *buf; + int bufPtr; int bufUse; int bufSize; @@ -47,6 +49,11 @@ glite_lb_padded_struct(_edg_wll_ConnPool,15, struct stat *certfile; +/* output buffer */ + char *bufOut; + int bufPtrOut; + int bufUseOut; + int bufSizeOut; ); typedef struct _edg_wll_ConnPool edg_wll_ConnPool; #endif @@ -64,6 +71,7 @@ typedef struct _edg_wll_Connections { /* pool of connections from client */ int poolSize; /* number of connections in the pool */ int connOpened; /* number of opened connections */ + int connToUse; /* active connection we are working with */ /* Connection pool locks & accessories.*/ #ifdef GLITE_LB_THREADED @@ -110,6 +118,9 @@ void edg_wll_poolFree(); in case memory has been already allocated, just return a pointer */ edg_wll_Connections* edg_wll_initConnections(); +/** Initializes connNotif structure */ +void edg_wll_initConnNotif(edg_wll_Connections *); + #ifdef __cplusplus } diff --git a/org.glite.lb.common/interface/context-int.h b/org.glite.lb.common/interface/context-int.h index b717155..d2ad4dc 100644 --- a/org.glite.lb.common/interface/context-int.h +++ b/org.glite.lb.common/interface/context-int.h @@ -65,7 +65,7 @@ glite_lb_padded_struct(_edg_wll_Context,150, void *dbctx; int dbcaps; edg_wll_Connections *connections; - edg_wll_ConnPool *connPoolNotif; /* hold _one_ connection from notif-interlogger */ + edg_wll_Connections *connNotif; /* holds connections from notif-interlogger */ edg_wll_ConnProxy *connProxy; /* holds one plain connection */ edg_wll_QueryRec **job_index; diff --git a/org.glite.lb.common/src/connpool.c b/org.glite.lb.common/src/connpool.c index e6f4089..0f287a9 100644 --- a/org.glite.lb.common/src/connpool.c +++ b/org.glite.lb.common/src/connpool.c @@ -2,14 +2,19 @@ #ifdef GLITE_LB_THREADED edg_wll_Connections connectionsHandle = - { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , PTHREAD_MUTEX_INITIALIZER , NULL , NULL}; + { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , 0, PTHREAD_MUTEX_INITIALIZER , NULL , NULL}; +edg_wll_Connections connNotifInitializer = + { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_NOTIF_SIZE , 0 , 0 , PTHREAD_MUTEX_INITIALIZER , NULL, NULL}; #endif #ifndef GLITE_LB_THREADED edg_wll_Connections connectionsHandle = - { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , NULL}; + { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , 0, NULL}; +edg_wll_Connections connNotifInitializer = + { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_NOTIF_SIZE , 0 , 0 , NULL}; #endif + /** Lock (try) the pool */ int edg_wll_poolTryLock() { int RetVal; @@ -277,3 +282,13 @@ edg_wll_Connections* edg_wll_initConnections() { } +void edg_wll_initConnNotif(edg_wll_Connections *conn) +{ + int i; + + *conn = connNotifInitializer; + conn->connPool = (edg_wll_ConnPool *) calloc(conn->poolSize, sizeof(edg_wll_ConnPool)); + for (i=0; ipoolSize; i++) { + conn->connPool[i].gss.sock = -1; + } +} diff --git a/org.glite.lb.common/src/context.c b/org.glite.lb.common/src/context.c index 50af048..fe09148 100644 --- a/org.glite.lb.common/src/context.c +++ b/org.glite.lb.common/src/context.c @@ -38,8 +38,8 @@ int edg_wll_InitContext(edg_wll_Context *ctx) out->p_tmp_timeout.tv_usec = out->p_log_timeout.tv_usec; out->connections = edg_wll_initConnections(); -// out->connections->connPool = (edg_wll_ConnPool *) calloc(out->connections->poolSize, sizeof(edg_wll_ConnPool)); - out->connPoolNotif = (edg_wll_ConnPool *) calloc(1, sizeof(edg_wll_ConnPool)); + out->connNotif = (edg_wll_Connections *) calloc(1, sizeof(edg_wll_Connections)); + edg_wll_initConnNotif(out->connNotif); out->connProxy = (edg_wll_ConnProxy *) calloc(1, sizeof(edg_wll_ConnProxy)); out->connProxy->conn.sock = -1; // out->connToUse = -1; @@ -63,6 +63,7 @@ int edg_wll_InitContext(edg_wll_Context *ctx) void edg_wll_FreeContext(edg_wll_Context ctx) { struct timeval close_timeout = {0, 50000}; + int i; if (!ctx) return; #ifdef CTXTRACE @@ -114,13 +115,17 @@ void edg_wll_FreeContext(edg_wll_Context ctx) } free(ctx->connections->connPool);*/ } - if (ctx->connPoolNotif) { - if (ctx->connPoolNotif[0].peerName) free(ctx->connPoolNotif[0].peerName); - edg_wll_gss_close(&ctx->connPoolNotif[0].gss,&close_timeout); - if (ctx->connPoolNotif[0].gsiCred) - edg_wll_gss_release_cred(&ctx->connPoolNotif[0].gsiCred, NULL); - if (ctx->connPoolNotif[0].buf) free(ctx->connPoolNotif[0].buf); - free(ctx->connPoolNotif); + if (ctx->connNotif) { + for (i=0; iconnNotif->poolSize; i++) { + if (ctx->connNotif->connPool[i].peerName) free(ctx->connNotif->connPool[i].peerName); + edg_wll_gss_close(&ctx->connNotif->connPool[i].gss,&close_timeout); + if (ctx->connNotif->connPool[i].gsiCred) + edg_wll_gss_release_cred(&ctx->connNotif->connPool[i].gsiCred, NULL); + if (ctx->connNotif->connPool[i].buf) free(ctx->connNotif->connPool[i].buf); + if (ctx->connNotif->connPool[i].bufOut) free(ctx->connNotif->connPool[i].bufOut); + } + free(ctx->connNotif->connPool); + free(ctx->connNotif); } if ( ctx->connProxy ) { if ( ctx->connProxy->buf ) free(ctx->connProxy->buf); -- 1.8.2.3