}
+int CloseConnectionNotif(edg_wll_Context ctx)
+{
+ /* close connection and free its structures */
+ int cIndex,ret = 0;
+
+
+ cIndex = ctx->connNotif->connToUse;
+
+ assert(ctx->connNotif->connOpened);
+
+ if (ctx->connNotif->connPool[cIndex].gss.sock >= 0)
+ edg_wll_gss_close(&ctx->connNotif->connPool[cIndex].gss, &ctx->p_tmp_timeout); // always returns 0
+ if (ctx->connNotif->connPool[cIndex].gsiCred != NULL)
+ if ( (ret = edg_wll_gss_release_cred(&ctx->connNotif->connPool[cIndex].gsiCred, NULL)) )
+ edg_wll_SetError(ctx,ret,"error in edg_wll_gss_release_cred()");
+ free(ctx->connNotif->connPool[cIndex].peerName);
+ free(ctx->connNotif->connPool[cIndex].buf);
+ free(ctx->connNotif->connPool[cIndex].bufOut);
+ free(ctx->connNotif->connPool[cIndex].certfile);
+
+ memset(ctx->connNotif->connPool + cIndex, 0, sizeof(edg_wll_ConnPool));
+ ctx->connNotif->connPool[cIndex].gss.sock = -1;
+
+ ctx->connNotif->connOpened--;
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
int ConnectionIndex(edg_wll_Context ctx, const char *name, int port)
{
}
-
int AddConnection(edg_wll_Context ctx, char *name, int port)
{
int i,index = -1;
}
+int SetFreeConnectionIndexNotif(edg_wll_Context ctx)
+{
+ int i;
+
+
+ ctx->connNotif->connToUse = -1;
+
+ for (i = 0; i < ctx->connNotif->poolSize; i++)
+ if (ctx->connNotif->connPool[i].gss.sock == -1) {
+ ctx->connNotif->connToUse = i;
+ ctx->connNotif->connOpened++;
+ assert(!ctx->connNotif->connPool[i].buf);
+ break;
+ }
+
+ return ctx->connNotif->connToUse;
+}
+
+
int ReleaseConnection(edg_wll_Context ctx, char *name, int port)
{
}
+int ReleaseConnectionNotif(edg_wll_Context ctx)
+{
+ int i, index = 0;
+ long min;
+
+
+ edg_wll_ResetError(ctx);
+ if (ctx->connNotif->connOpened == 0) return 0; /* nothing to release */
+
+ min = ctx->connNotif->connPool[0].lastUsed.tv_sec;
+
+ /* free the oldest (unlocked) connection */
+ for (i=0; i<ctx->connNotif->poolSize; i++) {
+ assert(ctx->connNotif->connPool[i].gss.sock > -1); // Full pool expected - accept non-NULL values only
+
+ if (ctx->connections->connPool[i].lastUsed.tv_sec < min) {
+ min = ctx->connections->connPool[i].lastUsed.tv_sec;
+ index = i;
+ }
+ }
+
+ ctx->connNotif->connToUse = index;
+ CloseConnectionNotif(ctx);
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
int edg_wll_close(edg_wll_Context ctx, int* connToUse)
{
return edg_wll_Error(ctx,NULL,NULL);
}
+
+
+int edg_wll_accept(edg_wll_Context ctx, int fd)
+{
+ int recv_sock;
+ edg_wll_GssStatus gss_stat;
+ time_t lifetime = 0;
+ struct stat statinfo;
+ int acquire_cred = 0;
+ struct sockaddr_in a;
+ socklen_t alen;
+ edg_wll_GssStatus gss_code;
+
+
+ edg_wll_ResetError(ctx);
+ assert(fd > 0);
+
+ alen=sizeof(a);
+ recv_sock = accept(fd,(struct sockaddr *)&a,&alen);
+ if (recv_sock <0) {
+ edg_wll_SetError(ctx, errno, "accept() failed");
+ goto err;
+ }
+
+ if (ctx->connNotif->connOpened == ctx->connNotif->poolSize)
+ if (ReleaseConnectionNotif(ctx)) goto err;
+
+ if (SetFreeConnectionIndexNotif(ctx) < 0) {
+ edg_wll_SetError(ctx,EAGAIN,"connection pool size exceeded");
+ goto err;
+ }
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Connection with fd %d accepted. %d in the pool\n",>srvName,ctx->srvPort,ctx->connNotif->connToUse);
+ #endif
+
+
+ // In case of using a specifically given cert file, stat it and check for the need to reauthenticate
+ if (ctx->p_proxy_filename || ctx->p_cert_filename) {
+ if (ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile) { // Has the file been stated before?
+ stat(ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, &statinfo);
+ if (ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile->st_mtime != statinfo.st_mtime)
+ acquire_cred = 1; // File has been modified. Need to acquire new creds.
+ }
+ else acquire_cred = 1;
+ }
+
+ // Check if credentials exist. If so, check validity
+ if (ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred) {
+ lifetime = ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred->lifetime;
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf ("Credential exists, lifetime: %d\n", lifetime);
+ #endif
+ if (!lifetime) acquire_cred = 1; // Credentials exist and lifetime is OK. No need to authenticate.
+ }
+ else {
+ acquire_cred = 1; // No credentials exist so far, acquire.
+ }
+
+
+ if (acquire_cred) {
+ edg_wll_GssCred newcred = NULL;
+ if (edg_wll_gss_acquire_cred_gsi(
+ ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename,
+ ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename,
+ &newcred, &gss_stat)) {
+ edg_wll_SetErrorGss(ctx, "failed to load GSI credentials", &gss_stat);
+ goto err;
+ } else {
+ if (ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred != NULL)
+ edg_wll_gss_release_cred(&ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred,&gss_stat);
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred = newcred;
+ newcred = NULL;
+
+ // Credentials Acquired successfully. Storing file identification.
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Cert file: %s\n", ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename);
+ #endif
+
+ if (ctx->p_proxy_filename || ctx->p_cert_filename) {
+ if (!ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile) // Allocate space for certfile stats
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile =
+ (struct stat*)calloc(1, sizeof(struct stat));
+ stat(ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile);
+ }
+ }
+ }
+
+ assert(ctx->connNotif->connPool[ctx->connNotif->connToUse].gss.context == NULL);
+
+ switch ( edg_wll_gss_accept(ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred, recv_sock,
+ &ctx->p_tmp_timeout, &ctx->connNotif->connPool[ctx->connNotif->connToUse].gss,&gss_code)) {
+
+ case EDG_WLL_GSS_OK:
+ break;
+ case EDG_WLL_GSS_ERROR_ERRNO:
+ edg_wll_SetError(ctx,errno,"failed to receive notification");
+ goto err;
+ case EDG_WLL_GSS_ERROR_GSS:
+ edg_wll_SetErrorGss(ctx, "failed to authenticate sender", &gss_code);
+ goto err;
+ case EDG_WLL_GSS_ERROR_HERRNO:
+ { const char *msg1;
+ char *msg2;
+ msg1 = hstrerror(errno);
+ asprintf(&msg2, "edg_wll_gss_connect(): %s", msg1);
+ edg_wll_SetError(ctx,EDG_WLL_ERROR_DNS, msg2);
+ free(msg2);
+ }
+ break;
+ case EDG_WLL_GSS_ERROR_EOF:
+ edg_wll_SetError(ctx,ECONNREFUSED,"sender closed the connection");
+ goto err;
+ case EDG_WLL_GSS_ERROR_TIMEOUT:
+ edg_wll_SetError(ctx,ETIMEDOUT,"accepting notification");
+ goto err;
+ default:
+ edg_wll_SetError(ctx, ENOTCONN, "failed to accept notification");
+ goto err;
+ }
+
+ return edg_wll_Error(ctx,NULL,NULL);
+
+
+err:
+ /* some error occured; close created connection
+ * and free all fields in connPool[ctx->connNotif->connToUse] */
+ if (ctx->connNotif->connToUse >= 0) {
+ CloseConnectionNotif(ctx);
+ }
+
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
#include <netdb.h>
#include <poll.h>
#include <stdio.h>
+#include <assert.h>
#include "glite/security/glite_gss.h"
#include "glite/lbu/escape.h"
}
+/* read all data available on active notif connection */
+static int read_data(edg_wll_Context ctx)
+{
+ edg_wll_GssStatus gss_code;
+ int ret;
+ edg_wll_ConnPool *notif = &(ctx->connNotif->connPool[ctx->connNotif->connToUse]);
+
+
+ do {
+ if (notif->bufUse == notif->bufSize) {
+ notif->bufSize += NOTIF_POOL_BUFFER_BLOCK_SIZE;
+ notif->buf = realloc(notif->buf, notif->bufSize);
+ if (!notif->buf)
+ return edg_wll_SetError(ctx, ENOMEM, "read_data()");
+ }
+
+ ret = edg_wll_gss_read(&(notif->gss), notif->buf + notif->bufUse,
+ notif->bufSize - notif->bufUse, &ctx->p_tmp_timeout, &gss_code);
+
+ if (ret < 0) {
+ switch(ret) {
+ case EDG_WLL_GSS_ERROR_TIMEOUT:
+ edg_wll_SetError(ctx, ETIMEDOUT, "read message");
+ break;
+ case EDG_WLL_GSS_ERROR_EOF:
+ edg_wll_SetError(ctx, ENOTCONN, NULL);
+ break;
+ default:
+ edg_wll_SetError(ctx, EDG_WLL_ERROR_GSS, "read message");
+ break;
+ }
+ goto err;
+ }
+ notif->bufUse += ret;
+
+ } while (notif->bufSize == notif->bufUse);
+
+err:
+ return edg_wll_Error(ctx, NULL, NULL);
+}
+
-static int gss_reader(void *user_data, char *buffer, int max_len)
+/* copies bytes_to_copy chars from pool buffer to given _allocated_ working buffer
+ */
+static int buffer_reader(void *user_data, char *buffer, const int bytes_to_copy)
{
- edg_wll_GssStatus gss_code;
- edg_wll_Context tmp_ctx = (edg_wll_Context)user_data;
- int ret;
- size_t len;
-
- ret = edg_wll_gss_read_full(&tmp_ctx->connPoolNotif[0].gss,
- buffer, max_len,
- &tmp_ctx->p_tmp_timeout,
- &len, &gss_code);
- if(ret < 0)
- switch(ret) {
- case EDG_WLL_GSS_ERROR_TIMEOUT:
- edg_wll_SetError(tmp_ctx, ETIMEDOUT, "read message");
- break;
- case EDG_WLL_GSS_ERROR_EOF:
- edg_wll_SetError(tmp_ctx, ENOTCONN, NULL);
- break;
- default:
- edg_wll_SetError(tmp_ctx, EDG_WLL_ERROR_GSS, "read message");
- break;
- }
-
- return(ret);
+ edg_wll_Context tmp_ctx = (edg_wll_Context)user_data;
+
+
+ if (tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].bufUse < bytes_to_copy)
+ return(-1);
+
+ memcpy(buffer,tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].buf +
+ tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].bufPtr, bytes_to_copy);
+ tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].bufPtr += bytes_to_copy;
+
+ return(bytes_to_copy);
}
-static int recv_notif(edg_wll_Context ctx)
+static int recv_notif(edg_wll_Context ctx, char **message)
{
int len;
- if (ctx->connPoolNotif[0].buf) {
- free(ctx->connPoolNotif[0].buf);
- ctx->connPoolNotif[0].buf = NULL;
- }
- ctx->connPoolNotif[0].bufUse = 0;
- ctx->connPoolNotif[0].bufSize = 0;
-
- len = read_il_data(ctx, &ctx->connPoolNotif[0].buf, gss_reader);
+
+ len = read_il_data(ctx, message, buffer_reader);
if(len < 0)
return(len);
- ctx->connPoolNotif[0].bufSize = len+1;
- ctx->connPoolNotif[0].bufUse = len+1;
-
-
return edg_wll_Error(ctx,NULL,NULL);
}
-
-static int send_reply(const edg_wll_Context ctx)
+static int prepare_reply(const edg_wll_Context ctx)
{
- int ret, len, err_code, err_code_min = 0;
- char *buf, *err_msg = NULL;
- size_t total;
- edg_wll_GssStatus gss_code;
+ int len, err_code, err_code_min = 0;
+ char *err_msg = NULL;
err_code = edg_wll_Error(ctx,NULL,&err_msg);
if (!err_msg) err_msg=strdup("OK");
- len = encode_il_reply(&buf, err_code, err_code_min, err_msg);
+ len = encode_il_reply(&(ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut),
+ err_code, err_code_min, err_msg);
+
if(len < 0) {
edg_wll_SetError(ctx,E2BIG,"create_reply()");
goto err;
}
- ret = edg_wll_gss_write_full(&ctx->connPoolNotif[0].gss,
- buf,len,&ctx->p_tmp_timeout,&total, &gss_code);
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSizeOut = len;
+err:
+ free(err_msg);
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
+// send pool output buffer nonblockingly
+// XXX: missing partial-sent logic (gss_write does not return written bytes count)
+static int send_reply(const edg_wll_Context ctx)
+{
+ int ret;
+ edg_wll_GssStatus gss_code;
+
+
+ ret = edg_wll_gss_write(&ctx->connNotif->connPool[ctx->connNotif->connToUse].gss,
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut,
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSizeOut, &ctx->p_tmp_timeout, &gss_code);
+
if (ret < 0) {
+ // edg_wll_gss_write does not return how many bytes were written, so
+ // any error is fatal for us and we can close connection
edg_wll_SetError(ctx,
ret == EDG_WLL_GSS_ERROR_TIMEOUT ?
ETIMEDOUT : EDG_WLL_ERROR_GSS,
- "write reply");
+ "send_reply()");
goto err;
}
err:
- if(buf) free(buf);
- free(err_msg);
return edg_wll_Error(ctx,NULL,NULL);
}
edg_wll_JobStat *state_out,
edg_wll_NotifId *id_out)
-/* pullup from INFN, support multiple messages from interlogger */
-#if 0
+/* NotifReceive */
{
- fd_set fds;
- struct sockaddr_in a;
- int recv_sock, alen;
+ int i, j, fd_num = ctx->connNotif->connOpened + 1;
+ struct _fd_map {
+ struct pollfd pollfds[fd_num];
+ int index[fd_num];
+ } fd_map;
edg_wll_Event *event = NULL;
- struct timeval start_time,check_time,tv;
- char *p = NULL, *ucs = NULL,
- *event_char = NULL, *jobstat_char = NULL;
- int ret;
- edg_wll_GssStatus gss_code;
+ struct timeval start_time,check_time;
+ char *event_char = NULL, *jobstat_char = NULL, *message = NULL;
edg_wll_ResetError(ctx);
+ ctx->p_tmp_timeout = *timeout;
/* start timer */
gettimeofday(&start_time,0);
-
+
if (fd == -1) {
if (ctx->notifSock == -1) {
- edg_wll_SetError(ctx, EINVAL, "No client socket opened.");
- goto err;
+ return edg_wll_SetError(ctx, EINVAL, "No client socket opened.");
}
else {
fd = ctx->notifSock;
}
}
- FD_ZERO(&fds);
- FD_SET(fd,&fds);
+ fd_map.pollfds[0].fd = fd;
+ fd_map.pollfds[0].events = POLLIN;
+ fd_map.index[0] = -1;
- tv.tv_sec = timeout->tv_sec;
- tv.tv_usec = timeout->tv_usec;
+ j = 1;
+ for (i=0; i < ctx->connNotif->poolSize; i++) {
+ if (ctx->connNotif->connPool[i].gss.sock != -1) {
+ fd_map.pollfds[j].fd = ctx->connNotif->connPool[i].gss.sock;
+ fd_map.pollfds[j].events = POLLIN;
+ fd_map.index[j] = i;
+ j++;
+ }
+ }
- switch(select(fd+1, &fds, NULL, NULL, &tv)) {
+ /* XXX notif_send() & notif_receive() should then migrate to */
+ /* client/connection.c and use connPool management f-cions */
+
+ /* XXX: long-lived contexts may have problems, TODO implement credential reload */
+
+ switch(poll(fd_map.pollfds, fd_num, ctx->p_tmp_timeout.tv_sec*1000+ctx->p_tmp_timeout.tv_usec/1000)) {
case -1:
- edg_wll_SetError(ctx, errno, "select() failed");
+ edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: poll() failed");
goto err;
case 0:
- edg_wll_SetError(ctx, ETIMEDOUT, "select() timeouted");
+ edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive: poll() timed out");
goto err;
default:
+ for (i=0; i < fd_num; i++) {
+ if ((fd_map.pollfds[i].revents & POLLIN)) {
+ break;
+ }
+ }
+ if (fd_num == i) {
+ /* no events on any socket */
+ edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: error on filedescriptor");
+ goto err;
+ }
break;
}
-/* check time */
+ /* check time */
gettimeofday(&check_time,0);
- if (decrement_timeout(&tv, start_time, check_time)) {
+ if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) {
edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
goto err;
}
-
- start_time = check_time;
-
- alen=sizeof(a);
- recv_sock = accept(fd,&a,&alen);
- if (recv_sock <0) {
- edg_wll_SetError(ctx, errno, "accept() failed");
- goto err;
- }
- ret = edg_wll_gss_accept(ctx->connPool[ctx->connToUse].gsiCred, recv_sock,
- &tv, &ctx->connPool[ctx->connToUse].gss, &gss_code);
+ // there is some incomming connection(s)
+ // XXX: what has higher priority? new connection, or data on active connection ?
- if (ret) {
- edg_wll_SetError(ctx, errno, "GSS authentication failed.");
- goto err;
+ if (fd_map.pollfds[0].revents & POLLIN) { /* new connection */
+ start_time = check_time;
+
+ if (edg_wll_accept(ctx,fd)) goto err;
+
+ /* check time */
+ gettimeofday(&check_time,0);
+ if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) {
+ edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
+ goto err;
+ }
}
-
-/* check time */
- gettimeofday(&check_time,0);
- if (decrement_timeout(&tv, start_time, check_time)) {
- edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
- goto err;
+ else { /* data on some of active (NotifPool) connections */
+ // XXX: if data arrives for more connections, which one serve first?
+ // poor-man solution - take first one on which data arrived
+ for (i=1; i < fd_num; i++) {
+ if (fd_map.pollfds[i].revents & POLLIN) {
+ ctx->connNotif->connToUse = fd_map.index[i];
+ break;
+ }
+ }
+ assert(i < fd_num);
}
- start_time = check_time;
+ /****************************************************************/
+ /* Communication with notif-interlogger */
+ /****************************************************************/
+ start_time = check_time;
- ctx->p_tmp_timeout = tv;
-
- if (recv_notif(ctx)) {
- /* error set in recv_notif() */
- goto err;
- }
-
- if (send_reply(ctx)) {
- /* error set in send_reply() */
+ if (read_data(ctx)) {
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtr = 0;
goto err;
- }
-
- p = ctx->connPool[ctx->connToUse].buf;
- p = get_string(p, &ucs);
- if (p == NULL) return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading UCS");
- free(ucs);
-
- p = get_string(p, &event_char);
- if (p == NULL) {
- free(ucs);
- return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading event string");;
}
-
- /****************************************************************/
- /* end of notif-interlogger message exchange */
- /****************************************************************/
-
+
/* check time */
gettimeofday(&check_time,0);
- if (decrement_timeout(&tv, start_time, check_time)) {
+ if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) {
edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
goto err;
}
start_time = check_time;
-
- event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION);
- if (edg_wll_ParseNotifEvent(ctx, event_char, &event)) {
- goto err;
- }
- jobstat_char = glite_lbu_UnescapeXML((const char *) event->notification.jobstat);
- if (jobstat_char == NULL) {
- edg_wll_SetError(ctx, EINVAL, "glite_lbu_UnescapeXML()");
- goto err;
- }
-
- /* fill in return values
- */
- if ( edg_wll_ParseJobStat(ctx, jobstat_char,
- strlen(jobstat_char), state_out)) {
+ if (recv_notif(ctx, &message) < 0) {
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtr = 0;
goto err;
}
-
- if ( id_out ) {
- *id_out = event->notification.notifId;
- event->notification.notifId = NULL;
- }
-
-
-err:
- if (event) {
- edg_wll_FreeEvent(event);
- // XXX - konzultovat s honikem; podle meho by to free
- // mel delat uz edg_wll_FreeEvent
- //free(event);
- }
-
- free(ctx->connPool[ctx->connToUse].buf);
- ctx->connPool[ctx->connToUse].buf = NULL;
- ctx->connPool[ctx->connToUse].bufUse = 0;
- ctx->connPool[ctx->connToUse].bufSize = 0;
-
- free(event_char);
- free(jobstat_char);
- // XXX
- // konzultovat s Danem
- /* Dan: ??? */
- edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, NULL);
-
- return edg_wll_Error(ctx,NULL,NULL);
-}
-#endif
-/* NotifReceive */
-{
- struct pollfd pollfds[1];
- struct sockaddr_in a;
- int recv_sock;
- socklen_t alen;
- edg_wll_Event *event = NULL;
- struct timeval start_time,check_time,tv;
- char *event_char = NULL, *jobstat_char = NULL;
- edg_wll_GssStatus gss_code;
-
+ /* receive message complete, free input buffer */
+ free(ctx->connNotif->connPool[ctx->connNotif->connToUse].buf);
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].buf = NULL;
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtr = 0;
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufUse = 0;
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSize = 0;
- edg_wll_ResetError(ctx);
- /* start timer */
- gettimeofday(&start_time,0);
-
- if (fd == -1) {
- if (ctx->notifSock == -1) {
- edg_wll_SetError(ctx, EINVAL, "No client socket opened.");
- goto err;
- }
- else {
- fd = ctx->notifSock;
- }
+ if (prepare_reply(ctx) || send_reply(ctx)) {
+ // fatal error
+ // XXX: output buffer not used between edg_wll_NotifReceive() calls
+ CloseConnectionNotif(ctx);
+ goto err;
}
-
- pollfds[0].fd = fd;
- pollfds[0].events = POLLIN;
- tv.tv_sec = timeout->tv_sec;
- tv.tv_usec = timeout->tv_usec;
-
-
-select:
- /* XXX - index 0 is used because of absence of connection management */
- /* to use it, support in client/connection.c needed */
- /* it is better to separate it from ctx->connPool, which is used */
- /* for outgouing messages to server */
- /* In future it should be in context, so one could use: */
- /* ctx->connPoolNotif[ctx->connPoolNotifToUse] */
- /* notif_send() & notif_receive() should then migrate to */
- /* client/connection.c and use connPool management f-cions */
- /* XXX: long-lived contexts may have problems, TODO implement credential reload */
-
- if (!ctx->connPoolNotif[0].gsiCred) {
- if (edg_wll_gss_acquire_cred_gsi(
- ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename,
- ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename,
- &ctx->connPoolNotif[0].gsiCred,&gss_code))
- {
- edg_wll_SetErrorGss(ctx,"failed aquiring credentials",&gss_code);
- goto err;
- }
+ /* check time */
+ gettimeofday(&check_time,0);
+ if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) {
+ edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
+ goto err;
}
-
- if (ctx->connPoolNotif[0].gss.context == NULL)
- {
- int ret;
- switch(poll(pollfds, 1, tv.tv_sec*1000+tv.tv_usec/1000)) {
- case -1:
- edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: poll() failed");
- goto err;
- case 0:
- edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive: poll() timed out");
- goto err;
- default:
- if (!(pollfds[0].revents & POLLIN)) {
- edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: error on filedescriptor");
- goto err;
- }
- break;
- }
-
- /* check time */
- gettimeofday(&check_time,0);
- if (decrement_timeout(&tv, start_time, check_time)) {
- edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
- goto err;
- }
- start_time = check_time;
-
- alen=sizeof(a);
- recv_sock = accept(fd,(struct sockaddr *)&a,&alen);
- if (recv_sock <0) {
- edg_wll_SetError(ctx, errno, "accept() failed");
- goto err;
- }
-
- ret = edg_wll_gss_accept(ctx->connPoolNotif[0].gsiCred, recv_sock,
- &tv, &ctx->connPoolNotif[0].gss,&gss_code);
-
- switch (ret) {
- case EDG_WLL_GSS_OK:
- break;
- case EDG_WLL_GSS_ERROR_ERRNO:
- edg_wll_SetError(ctx,errno,"failed to receive notification");
- goto err;
- case EDG_WLL_GSS_ERROR_GSS:
- edg_wll_SetErrorGss(ctx, "failed to authenticate sender", &gss_code);
- goto err;
- case EDG_WLL_GSS_ERROR_EOF:
- edg_wll_SetError(ctx,ECONNREFUSED,"sender closed the connection");
- goto err;
- case EDG_WLL_GSS_ERROR_TIMEOUT:
- edg_wll_SetError(ctx,ETIMEDOUT,"accepting notification");
- goto err;
- default:
- edg_wll_SetError(ctx, ENOTCONN, "failed to accept notification");
- goto err;
- }
-
- /* check time */
- gettimeofday(&check_time,0);
- if (decrement_timeout(&tv, start_time, check_time)) {
- edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
- goto err;
- }
- start_time = check_time;
- }
+ start_time = check_time;
-
- ctx->p_tmp_timeout = tv;
-
- /****************************************************************/
- /* Communication with notif-interlogger */
- /****************************************************************/
-
- if (recv_notif(ctx)) {
- if (ctx->errCode == ENOTCONN) {
- /* other side (interlogger-notif) probably closed connection */
- edg_wll_ResetError(ctx);
-
- edg_wll_gss_close(&ctx->connPoolNotif[0].gss,NULL);
- // buffer is freed in recv_notif()
-
- goto select;
- }
- else {
- goto err; /* error set in recv_notif() */
- }
- }
+ /* response sent, free output buffer */
+ free(ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut);
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut = NULL;
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufUseOut = 0;
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSizeOut = 0;
+ ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtrOut = 0;
- if (send_reply(ctx)) {
- goto err; /* error set in send_reply() */
- }
-
{
il_octet_string_t ev;
- if(decode_il_msg(&ev, ctx->connPoolNotif[0].buf) < 0)
+ if ( decode_il_msg(&ev, message) < 0 ) {
+ CloseConnectionNotif(ctx);
return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "decoding event string");
+ }
event_char = ev.data;
}
/****************************************************************/
/* check time */
gettimeofday(&check_time,0);
- if (decrement_timeout(&tv, start_time, check_time)) {
+ if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) {
edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
goto err;
}
strlen(jobstat_char), state_out)) {
goto err;
}
-
+
if (id_out) {
*id_out = event->notification.notifId;
event->notification.notifId = NULL;
//free(event);
}
- free(ctx->connPoolNotif[0].buf);
- ctx->connPoolNotif[0].buf = NULL;
- ctx->connPoolNotif[0].bufUse = 0;
- ctx->connPoolNotif[0].bufSize = 0;
-
free(event_char);
free(jobstat_char);
int err;
if (ctx->notifSock >= 0) {
- if (ctx->connPoolNotif[0].gss.context != NULL) {
- edg_wll_gss_close(&ctx->connPoolNotif[0].gss, NULL);
- }
err = close(ctx->notifSock);
ctx->notifSock = -1;
return edg_wll_Error(ctx,NULL,NULL);
}
+
+int edg_wll_NotifClosePool(
+ edg_wll_Context ctx)
+{
+ if (ctx->connNotif->connOpened) {
+ for (ctx->connNotif->connToUse=0; ctx->connNotif->connToUse < ctx->connNotif->poolSize;
+ ctx->connNotif->connToUse++) {
+ if (ctx->connNotif->connPool[ctx->connNotif->connToUse].gss.sock != -1) CloseConnectionNotif(ctx);
+
+ }
+ }
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}