From 908f24af0e3693a166e52e6c317d54c9555cb09d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jan=20Posp=C3=AD=C5=A1il?= Date: Fri, 6 Oct 2006 12:12:31 +0000 Subject: [PATCH] hopefully last major update of the client library (at least for the moment) - dualreg now should work (ifdefed under LB_PERF) - added useful time measurements in the debug output - huge clanup --- org.glite.lb.client/src/prod_proto.c | 247 +++++++--------- org.glite.lb.client/src/prod_proto.h | 99 +++---- org.glite.lb.client/src/producer.c | 551 ++++++++++++++++++++--------------- 3 files changed, 456 insertions(+), 441 deletions(-) diff --git a/org.glite.lb.client/src/prod_proto.c b/org.glite.lb.client/src/prod_proto.c index 39655b4..d0167fa 100644 --- a/org.glite.lb.client/src/prod_proto.c +++ b/org.glite.lb.client/src/prod_proto.c @@ -23,7 +23,8 @@ static const char* socket_path="/tmp/lb_proxy_store.sock"; * Handle GSS failures on the client side *---------------------------------------------------------------------- */ -int edg_wll_log_proto_handle_gss_failures(edg_wll_Context ctx, int code, edg_wll_GssStatus *gss_code, const char *text) +static +int handle_gss_failures(edg_wll_Context ctx, int code, edg_wll_GssStatus *gss_code, const char *text) { static char err[256]; int ret = 0; @@ -73,6 +74,7 @@ int edg_wll_log_proto_handle_gss_failures(edg_wll_Context ctx, int code, edg_wll * Handle UNIX socket failures on the client side *---------------------------------------------------------------------- */ +static int edg_wll_log_proto_handle_plain_failures(edg_wll_Context ctx, int code, const char *text) { return 0; @@ -92,8 +94,7 @@ struct reader_data { }; static -int -plain_reader(void *user_data, char *buffer, int max_len) +int plain_reader(void *user_data, char *buffer, int max_len) { struct reader_data *data = (struct reader_data *)user_data; int len; @@ -106,8 +107,7 @@ plain_reader(void *user_data, char *buffer, int max_len) } static -int -get_reply_plain(edg_wll_Context ctx, edg_wll_PlainConnection *conn, char **buf, int *code_maj, int *code_min) +int get_reply_plain(edg_wll_Context ctx, edg_wll_PlainConnection *conn, char **buf, int *code_maj, int *code_min) { char *msg=NULL; int len; @@ -133,8 +133,7 @@ get_reply_plain_end: static -int -gss_reader(void *user_data, char *buffer, int max_len) +int gss_reader(void *user_data, char *buffer, int max_len) { struct reader_data *data = (struct reader_data *)user_data; int ret, len; @@ -143,7 +142,7 @@ gss_reader(void *user_data, char *buffer, int max_len) ret = edg_wll_gss_read_full(data->conn, buffer, max_len, &data->ctx->p_tmp_timeout, &len, &gss_code); if(ret < 0) { - edg_wll_log_proto_handle_gss_failures(data->ctx, ret, &gss_code, "edg_wll_gss_read_full"); + handle_gss_failures(data->ctx, ret, &gss_code, "edg_wll_gss_read_full"); edg_wll_UpdateError(data->ctx, EDG_WLL_IL_PROTO, "gss_reader(): error reading message"); } @@ -152,8 +151,7 @@ gss_reader(void *user_data, char *buffer, int max_len) static -int -get_reply_gss(edg_wll_Context ctx, edg_wll_GssConnection *conn, char **buf, int *code_maj, int *code_min) +int get_reply_gss(edg_wll_Context ctx, edg_wll_GssConnection *conn, char **buf, int *code_maj, int *code_min) { char *msg = NULL; int code; @@ -179,33 +177,6 @@ get_reply_gss_end: /** *---------------------------------------------------------------------- - * client part of the logging protocol, used when sending messages to local logger - *---------------------------------------------------------------------- - */ -int edg_wll_log_proto_client(edg_wll_Context ctx, edg_wll_GssConnection *conn, edg_wll_LogLine logline) -{ - int err = 0; - - edg_wll_ResetError(ctx); - - /* send message */ - if ((err = edg_wll_log_write(ctx, conn, logline)) == -1) { - edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_log_proto_client: edg_wll_log_write error"); - goto edg_wll_log_proto_client_end; - } - - /* get answer */ - if ((err = edg_wll_log_read(ctx, conn)) != 0) { - edg_wll_UpdateError(ctx,err,"edg_wll_log_proto_client: edg_wll_log_read error"); - } - -edg_wll_log_proto_client_end: - - return edg_wll_Error(ctx,NULL,NULL); -} - -/** - *---------------------------------------------------------------------- * connect to locallogger *---------------------------------------------------------------------- */ @@ -220,7 +191,8 @@ int edg_wll_log_connect(edg_wll_Context ctx, edg_wll_GssConnection *conn) edg_wll_ResetError(ctx); #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_connect: setting up connection to local-logger\n"); + fprintf(stderr,"edg_wll_log_connect: setting connection to local-logger (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif /* acquire gss credentials */ ret = edg_wll_gss_acquire_cred_gsi( @@ -240,22 +212,21 @@ int edg_wll_log_connect(edg_wll_Context ctx, edg_wll_GssConnection *conn) } #endif #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_connect: opening connection to local-logger host '%s', port '%d'\n", + fprintf(stderr,"edg_wll_log_connect: opening connection to local-logger (host '%s', port '%d')\n", ctx->p_destination, ctx->p_dest_port); #endif if ((answer = edg_wll_gss_connect(cred, ctx->p_destination, ctx->p_dest_port, &ctx->p_tmp_timeout, conn, &gss_stat)) < 0) { - answer = edg_wll_log_proto_handle_gss_failures(ctx,answer,&gss_stat,"edg_wll_gss_connect()"); + answer = handle_gss_failures(ctx,answer,&gss_stat,"edg_wll_gss_connect()"); goto edg_wll_log_connect_end; } edg_wll_log_connect_end: #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_connect: done\n"); + fprintf(stderr,"edg_wll_log_connect: done (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif - if (conn->context != GSS_C_NO_CONTEXT) - edg_wll_gss_close(conn,&ctx->p_tmp_timeout); if (cred != GSS_C_NO_CREDENTIAL) gss_release_cred(&min_stat, &cred); if (my_subject_name) free(my_subject_name); @@ -265,6 +236,16 @@ edg_wll_log_connect_end: /** *---------------------------------------------------------------------- + * close connection to locallogger + *---------------------------------------------------------------------- + */ +int edg_wll_log_close(edg_wll_Context ctx, edg_wll_GssConnection *conn) +{ + return edg_wll_gss_close(conn,&ctx->p_tmp_timeout); +} + +/** + *---------------------------------------------------------------------- * write/send to locallogger *---------------------------------------------------------------------- */ @@ -294,8 +275,8 @@ int edg_wll_log_write(edg_wll_Context ctx, edg_wll_GssConnection *conn, edg_wll_ sprintf(header,"%s",EDG_WLL_LOG_SOCKET_HEADER); header[EDG_WLL_LOG_SOCKET_HEADER_LENGTH]='\0'; if ((err = edg_wll_gss_write_full(conn, header, EDG_WLL_LOG_SOCKET_HEADER_LENGTH, &ctx->p_tmp_timeout, &count, &gss_code)) < 0) { - answer = edg_wll_log_proto_handle_gss_failures(ctx,err,&gss_code,"edg_wll_gss_write_full(}"); - edg_wll_UpdateError(ctx,answer,"edg_wll_log_write: error sending header"); + answer = handle_gss_failures(ctx,err,&gss_code,"edg_wll_gss_write_full()"); + edg_wll_UpdateError(ctx,answer,"edg_wll_log_write(): error sending header"); return -1; } sent += count; @@ -305,8 +286,8 @@ int edg_wll_log_write(edg_wll_Context ctx, edg_wll_GssConnection *conn, edg_wll_ #endif count = 0; if ((err = edg_wll_gss_write_full(conn, size_end, 4, &ctx->p_tmp_timeout, &count, &gss_code)) < 0) { - answer = edg_wll_log_proto_handle_gss_failures(ctx,err,&gss_code,"edg_wll_gss_write_full()"); - edg_wll_UpdateError(ctx,answer,"edg_wll_log_write: error sending message size"); + answer = handle_gss_failures(ctx,err,&gss_code,"edg_wll_gss_write_full()"); + edg_wll_UpdateError(ctx,answer,"edg_wll_log_write(): error sending message size"); return -1; } sent += count; @@ -316,14 +297,15 @@ int edg_wll_log_write(edg_wll_Context ctx, edg_wll_GssConnection *conn, edg_wll_ #endif count = 0; if (( err = edg_wll_gss_write_full(conn, logline, size, &ctx->p_tmp_timeout, &count, &gss_code)) < 0) { - answer = edg_wll_log_proto_handle_gss_failures(ctx,err,&gss_code,"edg_wll_gss_write_full()"); - edg_wll_UpdateError(ctx,answer,"edg_wll_log_write: error sending message"); + answer = handle_gss_failures(ctx,err,&gss_code,"edg_wll_gss_write_full()"); + edg_wll_UpdateError(ctx,answer,"edg_wll_log_write(): error sending message"); return -1; } sent += count; #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_write: done\n"); + fprintf(stderr,"edg_wll_log_write: done (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif return sent; } @@ -350,8 +332,9 @@ int edg_wll_log_read(edg_wll_Context ctx, edg_wll_GssConnection *conn) #endif count = 0; if ((err = edg_wll_gss_read_full(conn, answer_end, 4, &ctx->p_tmp_timeout, &count, &gss_code)) < 0 ) { - answer = edg_wll_log_proto_handle_gss_failures(ctx,err,&gss_code,"edg_wll_gss_read_full()"); - edg_wll_SetError(ctx,answer,"edg_wll_log_read: error getting answer"); + answer = handle_gss_failures(ctx,err,&gss_code,"edg_wll_gss_read_full()"); + edg_wll_UpdateError(ctx,answer,"edg_wll_log_read(): error reading answer from local-logger"); + return -1; } else { answer = answer_end[3]; answer <<=8; answer |= answer_end[2]; answer <<=8; @@ -360,41 +343,16 @@ int edg_wll_log_read(edg_wll_Context ctx, edg_wll_GssConnection *conn) #ifdef EDG_WLL_LOG_STUB fprintf(stderr,"edg_wll_log_read: read answer \"%d\"\n",answer); #endif - edg_wll_SetError(ctx,answer,"answer read from locallogger"); + edg_wll_SetError(ctx,answer,"edg_wll_log_read(): answer read from locallogger"); } #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_read: done\n"); + fprintf(stderr,"edg_wll_log_read: done (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif - return edg_wll_Error(ctx,NULL,NULL); + return count; } -/** - *---------------------------------------------------------------------- - * client part of the logging protocol, used when sending messages to lbproxy - *---------------------------------------------------------------------- - */ -int edg_wll_log_proxy_proto_client(edg_wll_Context ctx, edg_wll_PlainConnection *conn, edg_wll_LogLine logline) -{ - int err = 0; - - edg_wll_ResetError(ctx); - - /* send message */ - if ((err = edg_wll_log_proxy_write(ctx, conn, logline)) == -1) { - edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_log_proxy_proto_client: edg_wll_log_proxy_write error"); - goto edg_wll_log_proxy_proto_client_end; - } - - /* get answer */ - if ((err = edg_wll_log_proxy_read(ctx, conn)) != 0) { - edg_wll_UpdateError(ctx,err,"edg_wll_log_proxy_proto_client: edg_wll_log_proxy_read error"); - } - -edg_wll_log_proxy_proto_client_end: - - return edg_wll_Error(ctx,NULL,NULL); -} /** *---------------------------------------------------------------------- @@ -410,11 +368,12 @@ int edg_wll_log_proxy_connect(edg_wll_Context ctx, edg_wll_PlainConnection *conn edg_wll_ResetError(ctx); #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_proxy_connect: setting up connection to L&B Proxy\n"); + fprintf(stderr,"edg_wll_log_proxy_connect: setting connection to lbroxy (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif conn->sock = socket(PF_UNIX, SOCK_STREAM, 0); if (conn->sock < 0) { - edg_wll_SetError(ctx,answer = errno,"socket() error"); + edg_wll_SetError(ctx,answer = errno,"edg_wll_log_proxy_connect(): socket() error"); goto edg_wll_log_proxy_connect_end; } memset(&saddr, 0, sizeof(saddr)); @@ -422,12 +381,12 @@ int edg_wll_log_proxy_connect(edg_wll_Context ctx, edg_wll_PlainConnection *conn strcpy(saddr.sun_path, ctx->p_lbproxy_store_sock? ctx->p_lbproxy_store_sock: socket_path); if ((flags = fcntl(conn->sock, F_GETFL, 0)) < 0 || fcntl(conn->sock, F_SETFL, flags | O_NONBLOCK) < 0) { - edg_wll_SetError(ctx,answer = errno,"fcntl()"); + edg_wll_SetError(ctx,answer = errno,"edg_wll_log_proxy_connect(): fcntl() error"); close(conn->sock); goto edg_wll_log_proxy_connect_end; } #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_proxy_connect: openning connection to L&B Proxy at socket %s\n", + fprintf(stderr,"edg_wll_log_proxy_connect: opening connection to lbproxy (socket '%s')\n", ctx->p_lbproxy_store_sock? ctx->p_lbproxy_store_sock: socket_path); #endif retries = 0; @@ -454,20 +413,29 @@ int edg_wll_log_proxy_connect(edg_wll_Context ctx, edg_wll_PlainConnection *conn retries++; } #ifdef EDG_WLL_LOG_STUB - if (retries) fprintf(stderr,"edg_wll_log_proxy_connect: %d connect retries\n",retries); + if (retries) fprintf(stderr,"edg_wll_log_proxy_connect: there were %d connect retries\n",retries); #endif edg_wll_log_proxy_connect_end: #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_proxy_connect: done\n"); + fprintf(stderr,"edg_wll_log_proxy_connect: done (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif - if (conn) edg_wll_plain_close(conn); - return answer; } /** *---------------------------------------------------------------------- + * close connection to lbproxy + *---------------------------------------------------------------------- + */ +int edg_wll_log_proxy_close(edg_wll_Context ctx, edg_wll_PlainConnection *conn) +{ + return edg_wll_plain_close(conn); +} + +/** + *---------------------------------------------------------------------- * write/send to lbproxy *---------------------------------------------------------------------- */ @@ -489,7 +457,7 @@ int edg_wll_log_proxy_write(edg_wll_Context ctx, edg_wll_PlainConnection *conn, len = encode_il_msg(&buffer, &ll); } if(len < 0) { - edg_wll_SetError(ctx,ENOMEM,"edg_wll_log_proxy_write: error encoding message"); + edg_wll_SetError(ctx,ENOMEM,"edg_wll_log_proxy_write(): error encoding message"); return -1; } @@ -497,14 +465,15 @@ int edg_wll_log_proxy_write(edg_wll_Context ctx, edg_wll_PlainConnection *conn, fprintf(stderr,"edg_wll_log_proxy_write: sending message\n"); #endif if ((count = edg_wll_plain_write_full(conn, buffer, len, &ctx->p_tmp_timeout)) < 0) { - edg_wll_SetError(ctx, EDG_WLL_IL_PROTO,"edg_wll_log_proxy_write: error sending message to socket"); + edg_wll_SetError(ctx, EDG_WLL_IL_PROTO,"edg_wll_log_proxy_write(): error sending message to socket"); return -1; } if (buffer) free(buffer); #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_proxy_write: done\n"); + fprintf(stderr,"edg_wll_log_proxy_write: done (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif return count; } @@ -529,10 +498,11 @@ int edg_wll_log_proxy_read(edg_wll_Context ctx, edg_wll_PlainConnection *conn) edg_wll_ResetError(ctx); #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_proxy_read: reading answer from server\n"); + fprintf(stderr,"edg_wll_log_proxy_read: reading answer from lbproxy\n"); #endif if ((err = get_reply_plain(ctx, conn, &answer, &lbproto_code, &code)) != 0 ) { - edg_wll_SetError(ctx, EDG_WLL_IL_PROTO,"edg_wll_log_proxy_read: error reading answer from L&B Proxy server"); + edg_wll_UpdateError(ctx, EDG_WLL_IL_PROTO,"edg_wll_log_proxy_read(): error reading answer from lbproxy"); + return -1; } else { #ifdef EDG_WLL_LOG_STUB fprintf(stderr,"edg_wll_log_proxy_read: read answer \"%d:%d: %s\"\n",lbproto_code,code,answer); @@ -540,53 +510,28 @@ int edg_wll_log_proxy_read(edg_wll_Context ctx, edg_wll_PlainConnection *conn) switch (lbproto_code) { case LB_OK: break; case LB_NOMEM: - edg_wll_SetError(ctx, ENOMEM, "edg_wll_log_proxy_read: proxy out of memory"); + edg_wll_SetError(ctx, ENOMEM, "edg_wll_log_proxy_read(): proxy out of memory"); break; case LB_PROTO: - edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_log_proxy_read: received protocol error response"); + edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_log_proxy_read(): received protocol error response"); break; case LB_DBERR: - snprintf(et, sizeof(et), "edg_wll_log_proxy_read: error details from L&B Proxy server: %s", answer); + snprintf(et, sizeof(et), "edg_wll_log_proxy_read(): error details from L&B Proxy server: %s", answer); edg_wll_SetError(ctx, code, et); break; default: - edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_log_proxy_read: received unknown protocol response"); + edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_log_proxy_read(): received unknown protocol response"); break; } } #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_proxy_read: done\n"); + fprintf(stderr,"edg_wll_log_proxy_read: done (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif - return edg_wll_Error(ctx,NULL,NULL); + return 0; } -/** - *---------------------------------------------------------------------- - * client part of the logging protocol, used when sending messages directly to bkserver - *---------------------------------------------------------------------- - */ -int edg_wll_log_direct_proto_client(edg_wll_Context ctx, edg_wll_GssConnection *conn, edg_wll_LogLine logline) -{ - int err = 0; - - edg_wll_ResetError(ctx); - - /* send message */ - if ((err = edg_wll_log_direct_write(ctx, conn, logline)) == -1) { - edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_log_direct_proto_client: edg_wll_log_direct_write error"); - goto edg_wll_log_direct_proto_client_end; - } - - /* get answer */ - if ((err = edg_wll_log_direct_read(ctx, conn)) != 0) { - edg_wll_UpdateError(ctx,err,"edg_wll_log_direct_proto_client: edg_wll_log_direct_read error"); - } - -edg_wll_log_direct_proto_client_end: - - return edg_wll_Error(ctx,NULL,NULL); -} /** *---------------------------------------------------------------------- @@ -608,7 +553,8 @@ int edg_wll_log_direct_connect(edg_wll_Context ctx, edg_wll_GssConnection *conn) edg_wll_ResetError(ctx); #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_direct_connect: setting up gss connection\n"); + fprintf(stderr,"edg_wll_log_direct_connect: setting connection to bkserver (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif /* get bkserver location: */ edg_wlc_JobIdGetServerParts(ctx->p_jobid,&host,&port); @@ -631,20 +577,19 @@ int edg_wll_log_direct_connect(edg_wll_Context ctx, edg_wll_GssConnection *conn) } else { fprintf(stderr,"edg_wll_log_direct_connect: going on anonymously\n"); } - fprintf(stderr,"edg_wll_log_direct_connect: opening connection to bkserver host '%s', port '%d'\n", host, port); + fprintf(stderr,"edg_wll_log_direct_connect: opening connection to bkserver (host '%s', port '%d')\n", host, port); #endif if ((answer = edg_wll_gss_connect(cred,host,port, &ctx->p_tmp_timeout, conn, &gss_stat)) < 0) { - answer = edg_wll_log_proto_handle_gss_failures(ctx,answer,&gss_stat,"edg_wll_gss_connect()"); + answer = handle_gss_failures(ctx,answer,&gss_stat,"edg_wll_gss_connect()"); goto edg_wll_log_direct_connect_end; } edg_wll_log_direct_connect_end: #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_direct_connect: done\n"); + fprintf(stderr,"edg_wll_log_direct_connect: done (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif - if (conn->context != GSS_C_NO_CONTEXT) - edg_wll_gss_close(conn,&ctx->p_tmp_timeout); if (cred != GSS_C_NO_CREDENTIAL) gss_release_cred(&min_stat, &cred); if (my_subject_name) free(my_subject_name); @@ -655,6 +600,16 @@ edg_wll_log_direct_connect_end: /** *---------------------------------------------------------------------- + * close connection to bkserver + *---------------------------------------------------------------------- + */ +int edg_wll_log_direct_close(edg_wll_Context ctx, edg_wll_GssConnection *conn) +{ + return edg_wll_gss_close(conn,&ctx->p_tmp_timeout); +} + +/** + *---------------------------------------------------------------------- * write/send to bkserver *---------------------------------------------------------------------- */ @@ -676,21 +631,22 @@ int edg_wll_log_direct_write(edg_wll_Context ctx, edg_wll_GssConnection *conn, e len = encode_il_msg(&buffer, &ll); } if(len < 0) { - edg_wll_SetError(ctx, ENOMEM, "edg_wll_log_direct_write: error encoding message"); + edg_wll_SetError(ctx, ENOMEM, "edg_wll_log_direct_write(): error encoding message"); return -1; } #ifdef EDG_WLL_LOG_STUB fprintf(stderr,"edg_wll_log_direct_write: sending message\n"); #endif count = 0; - if (( err = edg_wll_gss_write_full(conn, buffer, len, &ctx->p_tmp_timeout, &count, &gss_code)) < 0) { - edg_wll_log_proto_handle_gss_failures(ctx,err,&gss_code,"edg_wll_gss_write_full()"); - edg_wll_UpdateError(ctx, EDG_WLL_IL_PROTO,"edg_wll_log_direct_write: error sending message"); + if (( err = edg_wll_gss_write_full(conn, buffer, len, &ctx->p_tmp_timeout, &count, &gss_code)) < 0) { + handle_gss_failures(ctx,err,&gss_code,"edg_wll_gss_write_full()"); + edg_wll_UpdateError(ctx, EDG_WLL_IL_PROTO,"edg_wll_log_direct_write(): error sending message"); return -1; } if (buffer) free(buffer); #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_direct_write: done\n"); + fprintf(stderr,"edg_wll_log_direct_write: done (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif return count; } @@ -713,10 +669,12 @@ int edg_wll_log_direct_read(edg_wll_Context ctx, edg_wll_GssConnection *con) edg_wll_ResetError(ctx); #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_direct_read: reading answer from server...\n"); + fprintf(stderr,"edg_wll_log_direct_read: reading answer from bkserver\n"); #endif if ((err = get_reply_gss(ctx, con, &answer, &lbproto_code, &code)) != 0 ) { - edg_wll_SetError(ctx, EDG_WLL_IL_PROTO,"edg_wll_edg_wll_log_direct_read: error reading answer from L&B direct server"); + edg_wll_UpdateError(ctx, EDG_WLL_IL_PROTO,"edg_wll_log_direct_read(): error reading answer from bkserver"); + if (answer) free(answer); + return -1; } else { #ifdef EDG_WLL_LOG_STUB fprintf(stderr,"edg_wll_log_direct_read: read answer \"%d:%d: %s\"\n",lbproto_code,code,answer); @@ -724,22 +682,23 @@ int edg_wll_log_direct_read(edg_wll_Context ctx, edg_wll_GssConnection *con) switch (lbproto_code) { case LB_OK: break; case LB_NOMEM: - edg_wll_SetError(ctx, ENOMEM, "edg_wll_log_direct_read: server out of memory"); + edg_wll_SetError(ctx, ENOMEM, "edg_wll_log_direct_read(): server out of memory"); break; case LB_PROTO: - edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_log_direct_read: received protocol error response"); + edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_log_direct_read(): received protocol error response"); break; case LB_DBERR: snprintf(et, sizeof(et), "edg_wll_log_direct_read: error details from L&B server: %s", answer); edg_wll_SetError(ctx, code, et); break; default: - edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_log_direct_read: received unknown protocol response"); + edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_log_direct_read(): received unknown protocol response"); break; } } #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_log_direct_read: done\n"); + fprintf(stderr,"edg_wll_log_direct_read: done (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); #endif - return edg_wll_Error(ctx,NULL,NULL); + return 0; } diff --git a/org.glite.lb.client/src/prod_proto.h b/org.glite.lb.client/src/prod_proto.h index 642e58c..442e58e 100644 --- a/org.glite.lb.client/src/prod_proto.h +++ b/org.glite.lb.client/src/prod_proto.h @@ -13,126 +13,105 @@ extern "C" { #include "glite/lb/context-int.h" /** - * client part of the logging protocol, used when sending messages to locallogger - * \param[in,out] context context to work with - * \param[in] conn connection to use - * \param[in] logline message to send + * connect to local-logger + * \param[in,out] ctx context to work with + * \param[out] conn opened connection * \return errno */ -int edg_wll_log_proto_client(edg_wll_Context context, edg_wll_GssConnection *conn, edg_wll_LogLine logline); +int edg_wll_log_connect(edg_wll_Context ctx, edg_wll_GssConnection *conn); /** - * connect to locallogger - * \param[in,out] context context to work with + * close connection to local-logger + * \param[in,out] ctx context to work with * \param[out] conn opened connection * \return errno */ -int edg_wll_log_connect(edg_wll_Context context, edg_wll_GssConnection *conn); +int edg_wll_log_close(edg_wll_Context ctx, edg_wll_GssConnection *conn); /** - * write/send to locallogger - * \param[in,out] context context to work with + * write/send to local-logger + * \param[in,out] ctx context to work with * \param[in] conn connection to use * \param[in] logline message to send * \return the number of bytes written (zero indicates nothing was written) or -1 on error */ -int edg_wll_log_write(edg_wll_Context context, edg_wll_GssConnection *conn, edg_wll_LogLine logline); +int edg_wll_log_write(edg_wll_Context ctx, edg_wll_GssConnection *conn, edg_wll_LogLine logline); /** - * read/receive from locallogger - * \param[in,out] context context to work with + * read/receive from local-logger + * \param[in,out] ctx context to work with * \param[in] conn connection to use * \return the number of bytes read (zero indicates nothing was read) or -1 on error */ -int edg_wll_log_read(edg_wll_Context context, edg_wll_GssConnection *conn); +int edg_wll_log_read(edg_wll_Context ctx, edg_wll_GssConnection *conn); + /** - * client part of the logging protocol, used when sending messages to lbproxy - * \param[in,out] context context to work with - * \param[in] conn connection to use - * \param[in] logline message to send + * connect to lbproxy + * \param[in,out] ctx context to work with + * \param[out] conn opened connection * \return errno */ -int edg_wll_log_proxy_proto_client(edg_wll_Context context, edg_wll_PlainConnection *conn, edg_wll_LogLine logline); +int edg_wll_log_proxy_connect(edg_wll_Context ctx, edg_wll_PlainConnection *conn); /** - * connect to lbproxy - * \param[in,out] context context to work with + * close connection to lbproxy + * \param[in,out] ctx context to work with * \param[out] conn opened connection * \return errno */ -int edg_wll_log_proxy_connect(edg_wll_Context context, edg_wll_PlainConnection *conn); +int edg_wll_log_proxy_close(edg_wll_Context ctx, edg_wll_PlainConnection *conn); /** * write/send to lbproxy - * \param[in,out] context context to work with + * \param[in,out] ctx context to work with * \param[in] conn connection to use * \param[in] logline message to send * \return the number of bytes written (zero indicates nothing was written) or -1 on error */ -int edg_wll_log_proxy_write(edg_wll_Context context, edg_wll_PlainConnection *conn, edg_wll_LogLine logline); +int edg_wll_log_proxy_write(edg_wll_Context ctx, edg_wll_PlainConnection *conn, edg_wll_LogLine logline); /** * read/receive from lbproxy - * \param[in,out] context context to work with + * \param[in,out] ctx context to work with * \param[in] conn connection to use * \return the number of bytes read (zero indicates nothing was read) or -1 on error */ -int edg_wll_log_proxy_read(edg_wll_Context context, edg_wll_PlainConnection *conn); +int edg_wll_log_proxy_read(edg_wll_Context ctx, edg_wll_PlainConnection *conn); -/** - * client part of the logging protocol, used when sending messages directly to bkserver - * \param[in,out] context context to work with - * \param[in] conn connection to use - * \param[in] logline message to send - * \return errno - */ -int edg_wll_log_direct_proto_client(edg_wll_Context context, edg_wll_GssConnection *conn, edg_wll_LogLine logline); /** * connect to bkserver - * \param[in,out] context context to work with + * \param[in,out] ctx context to work with * \param[out] conn opened connection * \return errno */ -int edg_wll_log_direct_connect(edg_wll_Context context, edg_wll_GssConnection *conn); +int edg_wll_log_direct_connect(edg_wll_Context ctx, edg_wll_GssConnection *conn); + +/** + * close connection to bkserver + * \param[in,out] ctx context to work with + * \param[out] conn opened connection + * \return errno + */ +int edg_wll_log_direct_close(edg_wll_Context ctx, edg_wll_GssConnection *conn); /** * write/send to bkserver - * \param[in,out] context context to work with + * \param[in,out] ctx context to work with * \param[in] conn connection to use * \param[in] logline message to send * \return the number of bytes written (zero indicates nothing was written) or -1 on error */ -int edg_wll_log_direct_write(edg_wll_Context context, edg_wll_GssConnection *conn, edg_wll_LogLine logline); +int edg_wll_log_direct_write(edg_wll_Context ctx, edg_wll_GssConnection *conn, edg_wll_LogLine logline); /** * read/receive from bkserver - * \param[in,out] context context to work with + * \param[in,out] ctx context to work with * \param[in] conn connection to use * \return the number of bytes read (zero indicates nothing was read) or -1 on error */ -int edg_wll_log_direct_read(edg_wll_Context context, edg_wll_GssConnection *conn); - -/** - * Handle GSS failures on the client side - * \param[in,out] context context to work with - * \param[in] code code returned by a previous call - * \param[in] gss_code GSS code returned by a previous call - * \param[in] test additional text to print - * \return errno - */ -int edg_wll_log_proto_handle_gss_failures(edg_wll_Context context, int code, edg_wll_GssStatus *gss_code, const char *text); - -/** - * Handle UNIX socket failures on the client side - * \param[in,out] context context to work with - * \param[in] code code returned by a previous call - * \param[in] test additional text to print - * \return errno - */ -int edg_wll_log_proto_handle_plain_failures(edg_wll_Context context, int code, const char *text); - +int edg_wll_log_direct_read(edg_wll_Context ctx, edg_wll_GssConnection *conn); #ifdef __cplusplus } diff --git a/org.glite.lb.client/src/producer.c b/org.glite.lb.client/src/producer.c index 2c5f5db..3d5dee4 100644 --- a/org.glite.lb.client/src/producer.c +++ b/org.glite.lb.client/src/producer.c @@ -19,18 +19,18 @@ #include "prod_proto.h" #ifdef FAKE_VERSION -int edg_wll_DoLogEvent(edg_wll_Context context, edg_wll_LogLine logline); -int edg_wll_DoLogEventProxy(edg_wll_Context context, edg_wll_LogLine logline); -int edg_wll_DoLogEventDirect(edg_wll_Context context, edg_wll_LogLine logline); +int edg_wll_DoLogEvent(edg_wll_Context ctx, edg_wll_LogLine logline); +int edg_wll_DoLogEventProxy(edg_wll_Context ctx, edg_wll_LogLine logline); +int edg_wll_DoLogEventDirect(edg_wll_Context ctx, edg_wll_LogLine logline); #else /** *---------------------------------------------------------------------- - * handle_answers - handle answers from edg_wll_log_*proto_client + * handle_errors - handle answers from logging functions *---------------------------------------------------------------------- */ static -int handle_answers(edg_wll_Context context, int code, const char *text) +int handle_errors(edg_wll_Context ctx, int code, const char *text) { static char err[256]; @@ -52,116 +52,153 @@ int handle_answers(edg_wll_Context context, int code, const char *text) case EDG_WLL_ERROR_PARSE_KEY_MISUSE: // case EDG_WLL_ERROR_PARSE_OK_WITH_EXTRA_FIELDS: snprintf(err, sizeof(err), "%s: Error code mapped to EINVAL", text); - edg_wll_UpdateError(context,EINVAL,err); + edg_wll_UpdateError(ctx,EINVAL,err); break; case EDG_WLL_IL_PROTO: case EDG_WLL_IL_SYS: case EDG_WLL_IL_EVENTS_WAITING: snprintf(err, sizeof(err), "%s: Error code mapped to EAGAIN", text); - edg_wll_UpdateError(context,EAGAIN,err); + edg_wll_UpdateError(ctx,EAGAIN,err); break; default: snprintf(err, sizeof(err), "%s: Error code mapped to EAGAIN", text); - edg_wll_UpdateError(context,EAGAIN,err); + edg_wll_UpdateError(ctx,EAGAIN,err); break; } - return edg_wll_Error(context, NULL, NULL); + return edg_wll_Error(ctx, NULL, NULL); } /** *---------------------------------------------------------------------- - * Connects to local-logger and sends already formatted ULM string - * \brief helper logging function - * \param[in,out] context context to work with, + * Open a GSS connection to local-logger, send already formatted ULM string + * and get answer back from local-logger + * \brief connect to local-logger, send message and get answer back + * \param[in,out] ctx context to work with, * \param[in] logline formated ULM string *---------------------------------------------------------------------- */ int edg_wll_DoLogEvent( - edg_wll_Context context, + edg_wll_Context ctx, edg_wll_LogLine logline) { - int answer = 0; - edg_wll_GssConnection con; - edg_wll_ResetError(context); - memset(&con, 0, sizeof(con)); + int ret = 0, answer = EAGAIN; + edg_wll_GssConnection conn; + + edg_wll_ResetError(ctx); + memset(&conn,0,sizeof(conn)); - /* open a gss connection to local-logger: */ - if ((answer = edg_wll_log_connect(context,&con)) < 0) { + /* connect to local-logger */ + if ((ret = edg_wll_log_connect(ctx,&conn)) < 0) { + edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEvent(): edg_wll_log_connect error"); goto edg_wll_DoLogEvent_end; } - /* send the message to the local-logger: */ - answer = edg_wll_log_proto_client(context,&con,logline); + /* send message */ + if ((ret = edg_wll_log_write(ctx,&conn,logline)) == -1) { + edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEvent(): edg_wll_log_write error"); + goto edg_wll_DoLogEvent_end; + } + + /* get answer */ + if ((ret = edg_wll_log_read(ctx,&conn)) == -1) { + edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEvent(): edg_wll_log_read error"); + } else { + answer = edg_wll_Error(ctx, NULL, NULL); + } edg_wll_DoLogEvent_end: - if (con.sock) edg_wll_gss_close(&con,&context->p_tmp_timeout); + edg_wll_log_close(ctx,&conn); - return handle_answers(context,answer,"edg_wll_DoLogEvent()"); + return handle_errors(ctx,answer,"edg_wll_DoLogEvent()"); } /** *---------------------------------------------------------------------- - * Connects to L&B Proxy and sends already formatted ULM string - * \brief helper logging function - * \param[in,out] context context to work with, + * Open a plain (UNIX socket) connection to L&B Proxy, send already formatted ULM string + * and get answer back from L&B Proxy + * \brief connect to lbproxy, send message and get answer back + * \param[in,out] ctx context to work with, * \param[in] logline formated ULM string *---------------------------------------------------------------------- */ int edg_wll_DoLogEventProxy( - edg_wll_Context context, + edg_wll_Context ctx, edg_wll_LogLine logline) { - int answer = 0; - edg_wll_PlainConnection con; + int ret = 0, answer = EAGAIN; + edg_wll_PlainConnection conn; - edg_wll_ResetError(context); - memset(&con, 0, sizeof(con)); + edg_wll_ResetError(ctx); + memset(&conn,0,sizeof(conn)); - /* open a plain connection to L&B Proxy: */ - if ((answer = edg_wll_log_proxy_connect(context,&con)) < 0) { + /* connect to lbproxy */ + if ((ret = edg_wll_log_proxy_connect(ctx,&conn)) < 0) { + edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEventProxy(): edg_wll_log_proxy_write error"); goto edg_wll_DoLogEventProxy_end; } - /* and send the message to the L&B Proxy: */ - answer = edg_wll_log_proxy_proto_client(context,&con,logline); - + /* send message */ + if ((ret = edg_wll_log_proxy_write(ctx,&conn,logline)) == -1) { + edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEventProxy(): edg_wll_log_proxy_write error"); + goto edg_wll_DoLogEventProxy_end; + } + + /* get answer */ + if ((ret = edg_wll_log_proxy_read(ctx,&conn)) == -1) { + edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEventProxy(): edg_wll_log_proxy_read error"); + } else { + answer = edg_wll_Error(ctx, NULL, NULL); + } + edg_wll_DoLogEventProxy_end: - if (con.sock) edg_wll_plain_close(&con); + edg_wll_log_proxy_close(ctx,&conn); - return handle_answers(context,answer,"edg_wll_DoLogEventProxy()"); + return handle_errors(ctx,answer,"edg_wll_DoLogEventProxy()"); } /** *---------------------------------------------------------------------- - * Connects to bkserver and sends already formatted ULM string - * \brief helper logging function - * \param[in,out] context context to work with, + * Open a GSS connection to L&B server, send already formatted ULM string + * and get answer back from L&B server + * \brief connect to bkserver, send message and get answer back + * \param[in,out] ctx context to work with, * \param[in] logline formated ULM string *---------------------------------------------------------------------- */ int edg_wll_DoLogEventDirect( - edg_wll_Context context, + edg_wll_Context ctx, edg_wll_LogLine logline) { - int answer = 0; - edg_wll_GssConnection con; + int ret = 0, answer = EAGAIN; + edg_wll_GssConnection conn; - edg_wll_ResetError(context); - memset(&con, 0, sizeof(con)); + edg_wll_ResetError(ctx); + memset(&conn,0,sizeof(conn)); - /* open a gss connection to bkserver: */ - if ((answer = edg_wll_log_direct_connect(context,&con)) < 0) { + /* connect to bkserver */ + if ((ret = edg_wll_log_direct_connect(ctx,&conn)) < 0) { + edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEventDirect(): edg_wll_log_direct_connect error"); goto edg_wll_DoLogEventDirect_end; } - /* and send the message to the bkserver: */ - answer = edg_wll_log_direct_proto_client(context,&con,logline); + /* send message */ + if ((ret = edg_wll_log_direct_write(ctx,&conn,logline)) == -1) { + edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEventDirect(): edg_wll_log_direct_write error"); + goto edg_wll_DoLogEventDirect_end; + } + + /* get answer */ + if ((ret = edg_wll_log_direct_read(ctx,&conn)) == -1) { + edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEventDirect(): edg_wll_log_direct_read error"); + } else { + answer = edg_wll_Error(ctx, NULL, NULL); + } edg_wll_DoLogEventDirect_end: - edg_wll_gss_close(&con,&context->p_tmp_timeout); + edg_wll_log_direct_close(ctx,&conn); - return handle_answers(context,answer,"edg_wll_DoLogEventDirect()"); + return handle_errors(ctx,answer,"edg_wll_DoLogEventDirect()"); } #endif /* FAKE_VERSION */ @@ -176,7 +213,7 @@ edg_wll_DoLogEventDirect_end: *---------------------------------------------------------------------- * Formats a logging message * \brief formats a logging message - * \param[in,out] context context to work with, + * \param[in,out] ctx context to work with, * \param[in] flags as defined by LOGFLAG_* * \param[in] event type of the event, * \param[out] logline formated logging message @@ -185,7 +222,7 @@ edg_wll_DoLogEventDirect_end: *---------------------------------------------------------------------- */ static int edg_wll_FormatLogLine( - edg_wll_Context context, + edg_wll_Context ctx, int flags, edg_wll_EventCode event, edg_wll_LogLine *logline, @@ -206,58 +243,58 @@ static int edg_wll_FormatLogLine( seq = fix = var = dguser = out = source = eventName = lvl = fullid = NULL; priority = flags & LOGFLAG_SYNC; - edg_wll_ResetError(context); + edg_wll_ResetError(ctx); /* format the message: */ va_start(fmt_args,fmt); gettimeofday(&start_time,0); if (edg_wll_ULMTimevalToDate(start_time.tv_sec,start_time.tv_usec,date) != 0) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_FormatLogLine(): edg_wll_ULMTimevalToDate() error"); + edg_wll_SetError(ctx,ret = EINVAL,"edg_wll_FormatLogLine(): edg_wll_ULMTimevalToDate() error"); goto edg_wll_formatlogline_end; } - source = edg_wll_SourceToString(context->p_source); - lvl = edg_wll_LevelToString(context->p_level); + source = edg_wll_SourceToString(ctx->p_source); + lvl = edg_wll_LevelToString(ctx->p_level); eventName = edg_wll_EventToString(event); if (!eventName) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_FormatLogLine(): event name not specified"); + edg_wll_SetError(ctx,ret = EINVAL,"edg_wll_FormatLogLine(): event name not specified"); goto edg_wll_formatlogline_end; } - if (!(fullid = edg_wlc_JobIdUnparse(context->p_jobid))) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_FormatLogLine(): edg_wlc_JobIdUnparse() error"); + if (!(fullid = edg_wlc_JobIdUnparse(ctx->p_jobid))) { + edg_wll_SetError(ctx,ret = EINVAL,"edg_wll_FormatLogLine(): edg_wlc_JobIdUnparse() error"); goto edg_wll_formatlogline_end; } - seq = edg_wll_GetSequenceCode(context); + seq = edg_wll_GetSequenceCode(ctx); if (trio_asprintf(&fix,EDG_WLL_FORMAT_COMMON, - date,context->p_host,lvl,priority, - source,context->p_instance ? context->p_instance : "", + date,ctx->p_host,lvl,priority, + source,ctx->p_instance ? ctx->p_instance : "", eventName,fullid,seq) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_asprintf() error"); + edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_asprintf() error"); goto edg_wll_formatlogline_end; } - /* TODO: add always, probably new context->p_user */ + /* TODO: add always, probably new ctx->p_user */ if ( ( (flags & LOGFLAG_PROXY) || (flags & LOGFLAG_DIRECT) ) && - (context->p_user_lbproxy) ) { - if (trio_asprintf(&dguser,EDG_WLL_FORMAT_USER,context->p_user_lbproxy) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_asprintf() error"); + (ctx->p_user_lbproxy) ) { + if (trio_asprintf(&dguser,EDG_WLL_FORMAT_USER,ctx->p_user_lbproxy) == -1) { + edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_asprintf() error"); goto edg_wll_formatlogline_end; } } else { dguser = strdup(""); } if (trio_vasprintf(&var,fmt,fmt_args) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_vasprintf() error"); + edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_vasprintf() error"); goto edg_wll_formatlogline_end; } if (asprintf(&out,"%s%s%s\n",fix,dguser,var) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_FormatLogLine(): asprintf() error"); + edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_FormatLogLine(): asprintf() error"); goto edg_wll_formatlogline_end; } size = strlen(out); if (priority && (size > EDG_WLL_LOG_SYNC_MAXMSGSIZE)) { - edg_wll_SetError(context,ret = ENOSPC,"edg_wll_FormatLogLine(): Message size too large for synchronous transfer"); + edg_wll_SetError(ctx,ret = ENOSPC,"edg_wll_FormatLogLine(): Message size too large for synchronous transfer"); goto edg_wll_formatlogline_end; } @@ -281,16 +318,14 @@ edg_wll_formatlogline_end: if (eventName) free(eventName); if (fullid) free(fullid); - if (ret) edg_wll_UpdateError(context,0,"Logging library ERROR: "); - - return edg_wll_Error(context,NULL,NULL); + return edg_wll_Error(ctx,NULL,NULL); } /** *---------------------------------------------------------------------- * Formats a logging message and sends it to local-logger * \brief master logging event function - * \param[in,out] context INOUT context to work with, + * \param[in,out] ctx context to work with, * \param[in] flags as defined by LOGFLAG_* * \param[in] event type of the event, * \param[in] fmt printf()-like format string, @@ -298,7 +333,7 @@ edg_wll_formatlogline_end: *---------------------------------------------------------------------- */ static int edg_wll_LogEventMaster( - edg_wll_Context context, + edg_wll_Context ctx, int flags, edg_wll_EventCode event, char *fmt, ...) @@ -310,7 +345,7 @@ static int edg_wll_LogEventMaster( priority = flags & LOGFLAG_SYNC; - edg_wll_ResetError(context); + edg_wll_ResetError(ctx); /* default return value is "Try Again" */ ret = EAGAIN; @@ -319,12 +354,12 @@ static int edg_wll_LogEventMaster( va_start(fmt_args,fmt); if (trio_vasprintf(&in,fmt,fmt_args) == -1) { - edg_wll_UpdateError(context,ret = ENOMEM,"edg_wll_LogEventMaster(): trio_vasprintf() error"); + edg_wll_UpdateError(ctx,ret = ENOMEM,"edg_wll_LogEventMaster(): trio_vasprintf() error"); goto edg_wll_logeventmaster_end; } - if (edg_wll_FormatLogLine(context,flags,event,&out,"%s",in) != 0 ) { - edg_wll_UpdateError(context,ret = EINVAL,"edg_wll_LogEventMaster(): edg_wll_FormatLogLine() error"); + if (edg_wll_FormatLogLine(ctx,flags,event,&out,"%s",in) != 0 ) { + edg_wll_UpdateError(ctx,ret = EINVAL,"edg_wll_LogEventMaster(): edg_wll_FormatLogLine() error"); goto edg_wll_logeventmaster_end; } @@ -332,28 +367,28 @@ static int edg_wll_LogEventMaster( // fprintf(stderr,"edg_wll_LogEventMaster (%d chars): %s",strlen(out),out); #endif - context->p_tmp_timeout.tv_sec = 0; - context->p_tmp_timeout.tv_usec = 0; + ctx->p_tmp_timeout.tv_sec = 0; + ctx->p_tmp_timeout.tv_usec = 0; if (priority) { - context->p_tmp_timeout = context->p_sync_timeout; + ctx->p_tmp_timeout = ctx->p_sync_timeout; } else { - context->p_tmp_timeout = context->p_log_timeout; + ctx->p_tmp_timeout = ctx->p_log_timeout; } /* and send the message */ #ifndef LB_PERF_DROP if (flags & LOGFLAG_NORMAL) { /* to the local-logger: */ - ret = edg_wll_DoLogEvent(context, out); + ret = edg_wll_DoLogEvent(ctx, out); } else if (flags & LOGFLAG_PROXY) { /* to the L&B Proxy: */ - ret = edg_wll_DoLogEventProxy(context, out); + ret = edg_wll_DoLogEventProxy(ctx, out); } else if (flags & LOGFLAG_DIRECT) { /* directly to the bkserver: */ - ret = edg_wll_DoLogEventDirect(context, out); + ret = edg_wll_DoLogEventDirect(ctx, out); } else { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_LogEventMaster(): wrong flag specified"); + edg_wll_SetError(ctx,ret = EINVAL,"edg_wll_LogEventMaster(): wrong flag specified"); } #endif @@ -362,13 +397,13 @@ edg_wll_logeventmaster_end: if (in) free(in); if (out) free(out); - if (!ret) if(edg_wll_IncSequenceCode(context)) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_LogEventMaster(): edg_wll_IncSequenceCode failed"); + if (!ret) if(edg_wll_IncSequenceCode(ctx)) { + edg_wll_SetError(ctx,ret = EINVAL,"edg_wll_LogEventMaster(): edg_wll_IncSequenceCode failed"); } - if (ret) edg_wll_UpdateError(context,0,"Logging library ERROR: "); + if (ret) edg_wll_UpdateError(ctx,0,"Logging library ERROR: "); - return edg_wll_Error(context,NULL,NULL); + return edg_wll_Error(ctx,NULL,NULL); } @@ -379,7 +414,7 @@ edg_wll_logeventmaster_end: *---------------------------------------------------------------------- */ int edg_wll_LogEvent( - edg_wll_Context context, + edg_wll_Context ctx, edg_wll_EventCode event, char *fmt, ...) { @@ -387,23 +422,23 @@ int edg_wll_LogEvent( char *list=NULL; va_list fmt_args; - edg_wll_ResetError(context); + edg_wll_ResetError(ctx); va_start(fmt_args,fmt); if (trio_vasprintf(&list,fmt,fmt_args) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_LogEvent(): trio_vasprintf() error"); + edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_LogEvent(): trio_vasprintf() error"); goto edg_wll_logevent_end; } - ret=edg_wll_LogEventMaster(context,LOGFLAG_NORMAL | LOGFLAG_ASYNC,event,"%s",list); + ret=edg_wll_LogEventMaster(ctx,LOGFLAG_NORMAL | LOGFLAG_ASYNC,event,"%s",list); edg_wll_logevent_end: va_end(fmt_args); if (list) free(list); - if (ret) edg_wll_UpdateError(context,0,"edg_wll_LogEvent(): "); + if (ret) edg_wll_UpdateError(ctx,0,"edg_wll_LogEvent(): "); - return edg_wll_Error(context,NULL,NULL); + return edg_wll_Error(ctx,NULL,NULL); } /** @@ -414,7 +449,7 @@ edg_wll_logevent_end: *---------------------------------------------------------------------- */ int edg_wll_LogEventSync( - edg_wll_Context context, + edg_wll_Context ctx, edg_wll_EventCode event, char *fmt, ...) { @@ -422,23 +457,23 @@ int edg_wll_LogEventSync( char *list=NULL; va_list fmt_args; - edg_wll_ResetError(context); + edg_wll_ResetError(ctx); va_start(fmt_args,fmt); if (trio_vasprintf(&list,fmt,fmt_args) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_LogEventSync(): trio_vasprintf() error"); + edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_LogEventSync(): trio_vasprintf() error"); goto edg_wll_logeventsync_end; } - ret=edg_wll_LogEventMaster(context,LOGFLAG_NORMAL | LOGFLAG_SYNC,event,"%s",list); + ret=edg_wll_LogEventMaster(ctx,LOGFLAG_NORMAL | LOGFLAG_SYNC,event,"%s",list); edg_wll_logeventsync_end: va_end(fmt_args); if (list) free(list); - if (ret) edg_wll_UpdateError(context,0,"edg_wll_LogEventSync(): "); + if (ret) edg_wll_UpdateError(ctx,0,"edg_wll_LogEventSync(): "); - return edg_wll_Error(context,NULL,NULL); + return edg_wll_Error(ctx,NULL,NULL); } /** @@ -449,7 +484,7 @@ edg_wll_logeventsync_end: *---------------------------------------------------------------------- */ int edg_wll_LogEventProxy( - edg_wll_Context context, + edg_wll_Context ctx, edg_wll_EventCode event, char *fmt, ...) { @@ -457,23 +492,23 @@ int edg_wll_LogEventProxy( char *list=NULL; va_list fmt_args; - edg_wll_ResetError(context); + edg_wll_ResetError(ctx); va_start(fmt_args,fmt); if (trio_vasprintf(&list,fmt,fmt_args) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_LogEventProxy(): trio_vasprintf() error"); + edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_LogEventProxy(): trio_vasprintf() error"); goto edg_wll_logevent_end; } - ret=edg_wll_LogEventMaster(context,LOGFLAG_PROXY | LOGFLAG_SYNC, event,"%s",list); + ret=edg_wll_LogEventMaster(ctx,LOGFLAG_PROXY | LOGFLAG_SYNC, event,"%s",list); edg_wll_logevent_end: va_end(fmt_args); if (list) free(list); - if (ret) edg_wll_UpdateError(context,0,"edg_wll_LogEventProxy(): "); + if (ret) edg_wll_UpdateError(ctx,0,"edg_wll_LogEventProxy(): "); - return edg_wll_Error(context,NULL,NULL); + return edg_wll_Error(ctx,NULL,NULL); } /** @@ -484,7 +519,7 @@ edg_wll_logevent_end: *----------------------------------------------------------------------- */ int edg_wll_LogFlush( - edg_wll_Context context, + edg_wll_Context ctx, struct timeval *timeout) { int ret = 0; @@ -495,38 +530,38 @@ int edg_wll_LogFlush( fullid = NULL; - edg_wll_ResetError(context); + edg_wll_ResetError(ctx); gettimeofday(&start_time, 0); if (edg_wll_ULMTimevalToDate(start_time.tv_sec, start_time.tv_usec, date) != 0) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_ULMTimevalToDate()"); + edg_wll_SetError(ctx,ret = EINVAL,"edg_wll_LogFlush(): edg_wll_ULMTimevalToDate() error"); goto edg_wll_logflush_end; } - if (!(fullid = edg_wlc_JobIdUnparse(context->p_jobid))) { - ret = edg_wll_SetError(context,EINVAL,"edg_wlc_JobIdUnparse()"); + if (!(fullid = edg_wlc_JobIdUnparse(ctx->p_jobid))) { + ret = edg_wll_SetError(ctx,EINVAL,"edg_wll_LogFlush(): edg_wlc_JobIdUnparse() error"); goto edg_wll_logflush_end; } if (trio_asprintf(&out, "DATE=%s HOST=\"%|Us\" PROG=internal LVL=system DG.PRIORITY=1 DG.TYPE=\"command\" DG.COMMAND=\"flush\" DG.TIMEOUT=\"%d\" DG.JOBID=\"%s\"\n", - date, context->p_host, (timeout ? timeout->tv_sec : context->p_sync_timeout.tv_sec), fullid) == -1) { - edg_wll_SetError(context,ret = EINVAL,"trio_asprintf"); + date, ctx->p_host, (timeout ? timeout->tv_sec : ctx->p_sync_timeout.tv_sec), fullid) == -1) { + edg_wll_SetError(ctx,ret = EINVAL,"edg_wll_LogFlush(): trio_asprintf() error"); goto edg_wll_logflush_end; } if (timeout) - context->p_tmp_timeout = *timeout; + ctx->p_tmp_timeout = *timeout; else - context->p_tmp_timeout = context->p_sync_timeout; + ctx->p_tmp_timeout = ctx->p_sync_timeout; - ret = edg_wll_DoLogEvent(context, out); + ret = edg_wll_DoLogEvent(ctx, out); edg_wll_logflush_end: if(out) free(out); if(fullid) free(fullid); - if (ret) edg_wll_UpdateError(context,0,"edg_wll_LogFlush(): "); + if (ret) edg_wll_UpdateError(ctx,0,"edg_wll_LogFlush(): "); - return edg_wll_Error(context,NULL,NULL); + return edg_wll_Error(ctx,NULL,NULL); } /** @@ -537,7 +572,7 @@ edg_wll_logflush_end: *----------------------------------------------------------------------- */ int edg_wll_LogFlushAll( - edg_wll_Context context, + edg_wll_Context ctx, struct timeval *timeout) { int ret = 0; @@ -545,33 +580,33 @@ int edg_wll_LogFlushAll( char date[ULM_DATE_STRING_LENGTH+1]; struct timeval start_time; - edg_wll_ResetError(context); + edg_wll_ResetError(ctx); gettimeofday(&start_time, 0); if (edg_wll_ULMTimevalToDate(start_time.tv_sec, start_time.tv_usec, date) != 0) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_ULMTimevalToDate()"); + edg_wll_SetError(ctx,ret = EINVAL,"edg_wll_LogFlushAll(): edg_wll_ULMTimevalToDate() error"); goto edg_wll_logflushall_end; } if (trio_asprintf(&out, "DATE=%s HOST=\"%|Us\" PROG=internal LVL=system DG.PRIORITY=1 DG.TYPE=\"command\" DG.COMMAND=\"flush\" DG.TIMEOUT=\"%d\"\n", - date, context->p_host, (timeout ? timeout->tv_sec : context->p_sync_timeout.tv_sec)) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"trio_asprintf"); + date, ctx->p_host, (timeout ? timeout->tv_sec : ctx->p_sync_timeout.tv_sec)) == -1) { + edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_LogFlushAll(): trio_asprintf() error"); goto edg_wll_logflushall_end; } if (timeout) - context->p_tmp_timeout = *timeout; + ctx->p_tmp_timeout = *timeout; else - context->p_tmp_timeout = context->p_sync_timeout; + ctx->p_tmp_timeout = ctx->p_sync_timeout; - ret = edg_wll_DoLogEvent(context, out); + ret = edg_wll_DoLogEvent(ctx, out); edg_wll_logflushall_end: if(out) free(out); - if (ret) edg_wll_UpdateError(context,0,"edg_wll_LogFlushAll(): "); + if (ret) edg_wll_UpdateError(ctx,0,"edg_wll_LogFlushAll(): "); - return edg_wll_Error(context,NULL,NULL); + return edg_wll_Error(ctx,NULL,NULL); } /** @@ -581,25 +616,49 @@ edg_wll_logflushall_end: *----------------------------------------------------------------------- */ int edg_wll_SetLoggingJob( - edg_wll_Context context, + edg_wll_Context ctx, const edg_wlc_JobId job, const char *code, int flags) { int err; - edg_wll_ResetError(context); + edg_wll_ResetError(ctx); - if (!job) return edg_wll_SetError(context,EINVAL,"jobid is null"); + if (!job) return edg_wll_SetError(ctx,EINVAL,"edg_wll_SetLoggingJob(): jobid is null"); - edg_wlc_JobIdFree(context->p_jobid); - if ((err = edg_wlc_JobIdDup(job,&context->p_jobid))) { - edg_wll_SetError(context,err,"edg_wll_SetLoggingJob(): edg_wlc_JobIdDup() error"); - } else if (!edg_wll_SetSequenceCode(context,code,flags)) { - edg_wll_IncSequenceCode(context); + edg_wlc_JobIdFree(ctx->p_jobid); + if ((err = edg_wlc_JobIdDup(job,&ctx->p_jobid))) { + edg_wll_SetError(ctx,err,"edg_wll_SetLoggingJob(): edg_wlc_JobIdDup() error"); + } else if (!edg_wll_SetSequenceCode(ctx,code,flags)) { + edg_wll_IncSequenceCode(ctx); } - return edg_wll_Error(context,NULL,NULL); + /* add user credentials to context */ + { + char *my_subject_name = NULL; + edg_wll_GssStatus gss_stat; + gss_cred_id_t cred = GSS_C_NO_CREDENTIAL; + OM_uint32 min_stat; + + /* acquire gss credentials */ + err = edg_wll_gss_acquire_cred_gsi( + ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, + ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename, + &cred, &my_subject_name, &gss_stat); + /* give up if unable to acquire prescribed credentials */ + if (err && ctx->p_proxy_filename) { + edg_wll_SetErrorGss(ctx, "failed to load GSI credentials", &gss_stat); + edg_wll_SetParamString(ctx, EDG_WLL_PARAM_LBPROXY_USER, EDG_WLL_LOG_USER_DEFAULT); + } else { + edg_wll_SetParamString(ctx, EDG_WLL_PARAM_LBPROXY_USER, my_subject_name); + } + if (cred != GSS_C_NO_CREDENTIAL) + gss_release_cred(&min_stat, &cred); + if (my_subject_name) free(my_subject_name); + } + + return edg_wll_Error(ctx,NULL,NULL); } /** @@ -609,7 +668,7 @@ int edg_wll_SetLoggingJob( *----------------------------------------------------------------------- */ int edg_wll_SetLoggingJobProxy( - edg_wll_Context context, + edg_wll_Context ctx, const edg_wlc_JobId job, const char *code, const char *user, @@ -617,58 +676,61 @@ int edg_wll_SetLoggingJobProxy( { int err; char *code_loc = NULL; - char *my_subject_name = NULL; - edg_wll_GssStatus gss_stat; - gss_cred_id_t cred = GSS_C_NO_CREDENTIAL; - OM_uint32 min_stat; - edg_wll_ResetError(context); + edg_wll_ResetError(ctx); - if (!job) return edg_wll_SetError(context,EINVAL,"jobid is null"); + if (!job) return edg_wll_SetError(ctx,EINVAL,"edg_wll_SetLoggingJobProxy(): jobid is null"); - edg_wlc_JobIdFree(context->p_jobid); - if ((err = edg_wlc_JobIdDup(job,&context->p_jobid))) { - edg_wll_SetError(context,err,"edg_wll_SetLoggingJob(): edg_wlc_JobIdDup() error"); + edg_wlc_JobIdFree(ctx->p_jobid); + if ((err = edg_wlc_JobIdDup(job,&ctx->p_jobid))) { + edg_wll_SetError(ctx,err,"edg_wll_SetLoggingJobProxy(): edg_wlc_JobIdDup() error"); goto edg_wll_setloggingjobproxy_end; } /* add user credentials to context */ if (user) { - edg_wll_SetParamString(context, EDG_WLL_PARAM_LBPROXY_USER, user); + edg_wll_SetParamString(ctx, EDG_WLL_PARAM_LBPROXY_USER, user); } else { + char *my_subject_name = NULL; + edg_wll_GssStatus gss_stat; + gss_cred_id_t cred = GSS_C_NO_CREDENTIAL; + OM_uint32 min_stat; + /* acquire gss credentials */ err = edg_wll_gss_acquire_cred_gsi( - context->p_proxy_filename ? context->p_proxy_filename : context->p_cert_filename, - context->p_proxy_filename ? context->p_proxy_filename : context->p_key_filename, + ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, + ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename, &cred, &my_subject_name, &gss_stat); /* give up if unable to acquire prescribed credentials */ - if (err && context->p_proxy_filename) { - edg_wll_SetErrorGss(context, "failed to load GSI credentials", &gss_stat); - goto edg_wll_setloggingjobproxy_end; + if (err && ctx->p_proxy_filename) { + edg_wll_SetErrorGss(ctx, "failed to load GSI credentials", &gss_stat); + edg_wll_SetParamString(ctx, EDG_WLL_PARAM_LBPROXY_USER, EDG_WLL_LOG_USER_DEFAULT); + } else { + edg_wll_SetParamString(ctx, EDG_WLL_PARAM_LBPROXY_USER, my_subject_name); } - edg_wll_SetParamString(context, EDG_WLL_PARAM_LBPROXY_USER, my_subject_name); + + if (cred != GSS_C_NO_CREDENTIAL) + gss_release_cred(&min_stat, &cred); + if (my_subject_name) free(my_subject_name); } /* query LBProxyServer for sequence code if not user-suplied */ /* XXX: don't know if working properly */ if (!code) { - if (edg_wll_QuerySequenceCodeProxy(context, job, &code_loc)) + if (edg_wll_QuerySequenceCodeProxy(ctx, job, &code_loc)) goto edg_wll_setloggingjobproxy_end; } else { code_loc = strdup(code); } - if (!edg_wll_SetSequenceCode(context,code_loc,flags)) { - edg_wll_IncSequenceCode(context); + if (!edg_wll_SetSequenceCode(ctx,code_loc,flags)) { + edg_wll_IncSequenceCode(ctx); } edg_wll_setloggingjobproxy_end: if (code_loc) free(code_loc); - if (cred != GSS_C_NO_CREDENTIAL) - gss_release_cred(&min_stat, &cred); - if (my_subject_name) free(my_subject_name); - return edg_wll_Error(context,NULL,NULL); + return edg_wll_Error(ctx,NULL,NULL); } /** @@ -677,7 +739,7 @@ edg_wll_setloggingjobproxy_end: *----------------------------------------------------------------------- */ static int edg_wll_RegisterJobMaster( - edg_wll_Context context, + edg_wll_Context ctx, int flags, const edg_wlc_JobId job, enum edg_wll_RegJobJobtype type, @@ -694,67 +756,67 @@ static int edg_wll_RegisterJobMaster( seq = type_s = intseed = parent_s = NULL; - edg_wll_ResetError(context); - memcpy(&sync_to, &context->p_sync_timeout, sizeof sync_to); + edg_wll_ResetError(ctx); + memcpy(&sync_to, &ctx->p_sync_timeout, sizeof sync_to); intseed = seed ? strdup(seed) : - str2md5base64(seq = edg_wll_GetSequenceCode(context)); + str2md5base64(seq = edg_wll_GetSequenceCode(ctx)); type_s = edg_wll_RegJobJobtypeToString(type); if (!type_s) { - edg_wll_SetError(context,EINVAL,"edg_wll_RegisterJobMaster(): no jobtype specified"); + edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): no jobtype specified"); goto edg_wll_registerjobmaster_end; } if ((type == EDG_WLL_REGJOB_DAG || type == EDG_WLL_REGJOB_PARTITIONED || type == EDG_WLL_REGJOB_COLLECTION) && num_subjobs > 0) { - err = edg_wll_GenerateSubjobIds(context,job,num_subjobs,intseed,subjobs); + err = edg_wll_GenerateSubjobIds(ctx,job,num_subjobs,intseed,subjobs); /* increase log timeout on client (the same as on BK server) */ - context->p_sync_timeout.tv_sec += num_subjobs; - if (context->p_sync_timeout.tv_sec > 86400) context->p_sync_timeout.tv_sec = 86400; + ctx->p_sync_timeout.tv_sec += num_subjobs; + if (ctx->p_sync_timeout.tv_sec > 86400) ctx->p_sync_timeout.tv_sec = 86400; } if (err) { - edg_wll_UpdateError(context,EINVAL,"edg_wll_RegisterJobMaster(): edg_wll_GenerateSubjobIds() error"); + edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): edg_wll_GenerateSubjobIds() error"); goto edg_wll_registerjobmaster_end; } parent_s = parent ? edg_wlc_JobIdUnparse(parent) : strdup(""); if (flags & LOGFLAG_DIRECT) { /* SetLoggingJob and log directly the message */ - if (edg_wll_SetLoggingJob(context,job,NULL,EDG_WLL_SEQ_NORMAL) == 0) { - edg_wll_LogEventMaster(context,LOGFLAG_DIRECT | LOGFLAG_SYNC, + if (edg_wll_SetLoggingJob(ctx,job,NULL,EDG_WLL_SEQ_NORMAL) == 0) { + edg_wll_LogEventMaster(ctx,LOGFLAG_DIRECT | LOGFLAG_SYNC, EDG_WLL_EVENT_REGJOB,EDG_WLL_FORMAT_REGJOB, (char *)jdl,ns,parent_s,type_s,num_subjobs,intseed); } } else if (flags & LOGFLAG_PROXY) { /* SetLoggingJobProxy and and log to proxy */ - edg_wll_SetSequenceCode(context, NULL, EDG_WLL_SEQ_NORMAL); + edg_wll_SetSequenceCode(ctx, NULL, EDG_WLL_SEQ_NORMAL); if (seq) free(seq); - seq = edg_wll_GetSequenceCode(context); - if (edg_wll_SetLoggingJobProxy(context,job,seq,NULL,EDG_WLL_SEQ_NORMAL) == 0) { - edg_wll_LogEventMaster(context,LOGFLAG_PROXY | LOGFLAG_SYNC, + seq = edg_wll_GetSequenceCode(ctx); + if (edg_wll_SetLoggingJobProxy(ctx,job,seq,NULL,EDG_WLL_SEQ_NORMAL) == 0) { + edg_wll_LogEventMaster(ctx,LOGFLAG_PROXY | LOGFLAG_SYNC, EDG_WLL_EVENT_REGJOB,EDG_WLL_FORMAT_REGJOB, (char *)jdl,ns,parent_s,type_s,num_subjobs,intseed); } } else if (flags & LOGFLAG_NORMAL) { - /* SetLoggingJob and log normally the message through the locallogger */ - if (edg_wll_SetLoggingJob(context,job,NULL,EDG_WLL_SEQ_NORMAL) == 0) { - edg_wll_LogEventMaster(context, LOGFLAG_NORMAL, + /* SetLoggingJob and log normally the message through the local-logger */ + if (edg_wll_SetLoggingJob(ctx,job,NULL,EDG_WLL_SEQ_NORMAL) == 0) { + edg_wll_LogEventMaster(ctx, LOGFLAG_NORMAL, EDG_WLL_EVENT_REGJOB,EDG_WLL_FORMAT_REGJOB, (char *)jdl,ns,parent_s,type_s,num_subjobs,intseed); } } else { - edg_wll_SetError(context,EINVAL,"edg_wll_RegisterJobMaster(): wrong flag specified"); + edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): wrong flag specified"); } edg_wll_registerjobmaster_end: - memcpy(&context->p_sync_timeout, &sync_to, sizeof sync_to); + memcpy(&ctx->p_sync_timeout, &sync_to, sizeof sync_to); if (seq) free(seq); if (type_s) free(type_s); if (intseed) free(intseed); if (parent_s) free(parent_s); - return edg_wll_Error(context,NULL,NULL); + return edg_wll_Error(ctx,NULL,NULL); } /** @@ -764,7 +826,7 @@ edg_wll_registerjobmaster_end: *----------------------------------------------------------------------- */ int edg_wll_RegisterJobSync( - edg_wll_Context context, + edg_wll_Context ctx, const edg_wlc_JobId job, enum edg_wll_RegJobJobtype type, const char * jdl, @@ -773,7 +835,7 @@ int edg_wll_RegisterJobSync( const char * seed, edg_wlc_JobId ** subjobs) { - return edg_wll_RegisterJobMaster(context,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); + return edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); } /** @@ -783,7 +845,7 @@ int edg_wll_RegisterJobSync( *----------------------------------------------------------------------- */ int edg_wll_RegisterJob( - edg_wll_Context context, + edg_wll_Context ctx, const edg_wlc_JobId job, enum edg_wll_RegJobJobtype type, const char * jdl, @@ -792,7 +854,7 @@ int edg_wll_RegisterJob( const char * seed, edg_wlc_JobId ** subjobs) { - return edg_wll_RegisterJobMaster(context,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); + return edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); } #ifdef LB_PERF @@ -805,7 +867,7 @@ int edg_wll_RegisterJob( *----------------------------------------------------------------------- */ int edg_wll_RegisterJobProxy( - edg_wll_Context context, + edg_wll_Context ctx, const edg_wlc_JobId job, enum edg_wll_RegJobJobtype type, const char * jdl, @@ -825,68 +887,75 @@ int edg_wll_RegisterJobProxy( seq = type_s = NULL; - edg_wll_ResetError(context); - memcpy(&sync_to, &context->p_sync_timeout, sizeof sync_to); + edg_wll_ResetError(ctx); + memcpy(&sync_to, &ctx->p_sync_timeout, sizeof sync_to); memset(&con_bkserver, 0, sizeof(con_bkserver)); memset(&con_lbproxy, 0, sizeof(con_lbproxy)); + FD_ZERO(&fdset); type_s = edg_wll_RegJobJobtypeToString(type); if (!type_s) { - edg_wll_SetError(context,EINVAL,"edg_wll_RegisterJobProxy(): no jobtype specified"); + edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobProxy(): no jobtype specified"); goto edg_wll_registerjobproxy_end; } if ((type == EDG_WLL_REGJOB_DAG || type == EDG_WLL_REGJOB_PARTITIONED || type == EDG_WLL_REGJOB_COLLECTION) && num_subjobs > 0) { - ret = edg_wll_GenerateSubjobIds(context,job,num_subjobs,seed ? seed : MY_SEED,subjobs); + ret = edg_wll_GenerateSubjobIds(ctx,job,num_subjobs,seed ? seed : MY_SEED,subjobs); /* increase log timeout on client (the same as on BK server) */ - context->p_sync_timeout.tv_sec += num_subjobs; - if (context->p_sync_timeout.tv_sec > 86400) context->p_sync_timeout.tv_sec = 86400; + ctx->p_sync_timeout.tv_sec += num_subjobs; + if (ctx->p_sync_timeout.tv_sec > 86400) ctx->p_sync_timeout.tv_sec = 86400; } if (ret) { - edg_wll_UpdateError(context,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_GenerateSubjobIds() error"); + edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_GenerateSubjobIds() error"); goto edg_wll_registerjobproxy_end; } /* SetLoggingJobProxy */ - edg_wll_SetSequenceCode(context, NULL, EDG_WLL_SEQ_NORMAL); - seq = edg_wll_GetSequenceCode(context); - if (edg_wll_SetLoggingJobProxy(context,job,seq,NULL,EDG_WLL_SEQ_NORMAL) != 0) { - edg_wll_UpdateError(context,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_SetLoggingJobProxy() error"); + edg_wll_SetSequenceCode(ctx, NULL, EDG_WLL_SEQ_NORMAL); + seq = edg_wll_GetSequenceCode(ctx); + if (edg_wll_SetLoggingJobProxy(ctx,job,seq,NULL,EDG_WLL_SEQ_NORMAL) != 0) { + edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_SetLoggingJobProxy() error"); goto edg_wll_registerjobproxy_end; } /* format the RegJob event message */ - if (edg_wll_FormatLogLine(context,LOGFLAG_SYNC,EDG_WLL_EVENT_REGJOB,&logline,EDG_WLL_FORMAT_REGJOB, - (char *)jdl,ns,"",type_s,num_subjobs,seed ? seed : MY_SEED) != 0 ) { - edg_wll_UpdateError(context,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_FormatLogLine() error"); + if (edg_wll_FormatLogLine(ctx,LOGFLAG_SYNC | LOGFLAG_PROXY | LOGFLAG_PROXY, + EDG_WLL_EVENT_REGJOB,&logline, + EDG_WLL_FORMAT_REGJOB,(char *)jdl,ns,"",type_s,num_subjobs,seed ? seed : MY_SEED) != 0 ) { + edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_FormatLogLine() error"); goto edg_wll_registerjobproxy_end; } + /* do not forget to set the timeout!!! */ + ctx->p_tmp_timeout = ctx->p_sync_timeout; + /* and now do the pseudo-parallel registration: */ +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_RegisterJobProxy: start (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); +#endif /* connect to bkserver */ - if ((ret = edg_wll_log_direct_connect(context,&con_bkserver)) < 0) { - edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_direct_connect error"); + if ((ret = edg_wll_log_direct_connect(ctx,&con_bkserver)) < 0) { + edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_direct_connect error"); goto edg_wll_registerjobproxy_end; } /* connect to lbproxy */ - if ((ret = edg_wll_log_proxy_connect(context,&con_lbproxy)) < 0) { - edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_proxy_connect error"); + if ((ret = edg_wll_log_proxy_connect(ctx,&con_lbproxy)) < 0) { + edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_proxy_connect error"); goto edg_wll_registerjobproxy_end; } /* send to bkserver */ -/* - if ((ret = edg_wll_log_direct_write(context,&con_bkserver,logline)) == -1) { - edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_direct_write error"); + if ((ret = edg_wll_log_direct_write(ctx,&con_bkserver,logline)) == -1) { + edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_direct_write error"); goto edg_wll_registerjobproxy_end; } -*/ /* send to lbproxy */ - if ((ret = edg_wll_log_proxy_write(context,&con_lbproxy,logline)) == -1) { - edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_proxy_write error"); + if ((ret = edg_wll_log_proxy_write(ctx,&con_lbproxy,logline)) == -1) { + edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_proxy_write error"); goto edg_wll_registerjobproxy_end; } /* select and read the answers */ @@ -897,10 +966,14 @@ int edg_wll_RegisterJobProxy( FD_SET(con_lbproxy.sock,&fdset); if (con_lbproxy.sock > n) n = con_lbproxy.sock; n += 1; - fd = select(n,&fdset,NULL,NULL,&context->p_tmp_timeout); +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_RegisterJobProxy: calling select (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); +#endif + fd = select(n,&fdset,NULL,NULL,&ctx->p_tmp_timeout); switch (fd) { case 0: /* timeout */ - edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): select() timeouted"); + edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): select() timeouted"); count = 0; break; case -1: /* error */ @@ -908,23 +981,23 @@ int edg_wll_RegisterJobProxy( case EINTR: continue; default: - edg_wll_UpdateError(context,errno,"edg_wll_RegisterJobProxy(): select() error"); + edg_wll_UpdateError(ctx,errno,"edg_wll_RegisterJobProxy(): select() error"); } default: break; } if (FD_ISSET(con_bkserver.sock,&fdset)) { /* read answer from bkserver */ - if ((ret = edg_wll_log_direct_read(context,&con_bkserver)) == -1) { - edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_direct_read error"); + if ((ret = edg_wll_log_direct_read(ctx,&con_bkserver)) == -1) { + edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_direct_read error"); goto edg_wll_registerjobproxy_end; } count -= 1; } if (FD_ISSET(con_lbproxy.sock,&fdset)) { /* read answer from lbproxy */ - if ((ret = edg_wll_log_proxy_read(context,&con_lbproxy)) == -1) { - edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_proxy_read error"); + if ((ret = edg_wll_log_proxy_read(ctx,&con_lbproxy)) == -1) { + edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_proxy_read error"); goto edg_wll_registerjobproxy_end; } count -= 1; @@ -932,15 +1005,19 @@ int edg_wll_RegisterJobProxy( } edg_wll_registerjobproxy_end: - if (con_bkserver.sock) edg_wll_gss_close(&con_bkserver,&context->p_tmp_timeout); +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_RegisterJobProxy: done (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); +#endif + if (con_bkserver.sock) edg_wll_gss_close(&con_bkserver,&ctx->p_tmp_timeout); if (con_lbproxy.sock) edg_wll_plain_close(&con_lbproxy); - memcpy(&context->p_sync_timeout, &sync_to, sizeof sync_to); + memcpy(&ctx->p_sync_timeout, &sync_to, sizeof sync_to); if (type_s) free(type_s); if (seq) free(seq); if (logline) free(logline); - return edg_wll_Error(context,NULL,NULL); + return edg_wll_Error(ctx,NULL,NULL); #undef MY_SEED } @@ -954,7 +1031,7 @@ edg_wll_registerjobproxy_end: *----------------------------------------------------------------------- */ int edg_wll_RegisterJobProxy( - edg_wll_Context context, + edg_wll_Context ctx, const edg_wlc_JobId job, enum edg_wll_RegJobJobtype type, const char * jdl, @@ -965,13 +1042,13 @@ int edg_wll_RegisterJobProxy( { #define MY_SEED "edg_wll_RegisterJobProxy()" /* first register with bkserver */ - int ret = edg_wll_RegisterJobMaster(context,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed ? seed : MY_SEED,subjobs); + int ret = edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed ? seed : MY_SEED,subjobs); if (ret) { - edg_wll_UpdateError(context,0,"edg_wll_RegisterJobProxy(): unable to register with bkserver"); - return edg_wll_Error(context,NULL,NULL); + edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobProxy(): unable to register with bkserver"); + return edg_wll_Error(ctx,NULL,NULL); } /* and then with L&B Proxy */ - return edg_wll_RegisterJobMaster(context,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed ? seed : MY_SEED,subjobs); + return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed ? seed : MY_SEED,subjobs); #undef MY_SEED } @@ -988,7 +1065,7 @@ int edg_wll_RegisterJobProxy( *----------------------------------------------------------------------- */ int edg_wll_RegisterJobProxyOnly( - edg_wll_Context context, + edg_wll_Context ctx, const edg_wlc_JobId job, enum edg_wll_RegJobJobtype type, const char * jdl, @@ -998,7 +1075,7 @@ int edg_wll_RegisterJobProxyOnly( edg_wlc_JobId ** subjobs) { #define MY_SEED "edg_wll_RegisterJobProxyOnly()" - return edg_wll_RegisterJobMaster(context,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed ? seed : MY_SEED,subjobs); + return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed ? seed : MY_SEED,subjobs); #undef MY_SEED } @@ -1012,7 +1089,7 @@ int edg_wll_RegisterJobProxyOnly( */ static int edg_wll_RegisterSubjob( - edg_wll_Context context, + edg_wll_Context ctx, const edg_wlc_JobId job, enum edg_wll_RegJobJobtype type, const char * jdl, @@ -1022,7 +1099,7 @@ int edg_wll_RegisterSubjob( const char * seed, edg_wlc_JobId ** subjobs) { - return edg_wll_RegisterJobMaster(context,LOGFLAG_NORMAL,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); + return edg_wll_RegisterJobMaster(ctx,LOGFLAG_NORMAL,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); } /** @@ -1033,7 +1110,7 @@ int edg_wll_RegisterSubjob( */ static int edg_wll_RegisterSubjobProxy( - edg_wll_Context context, + edg_wll_Context ctx, const edg_wlc_JobId job, enum edg_wll_RegJobJobtype type, const char * jdl, @@ -1043,7 +1120,7 @@ int edg_wll_RegisterSubjobProxy( const char * seed, edg_wlc_JobId ** subjobs) { - return edg_wll_RegisterJobMaster(context,LOGFLAG_PROXY,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); + return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); } /** -- 1.8.2.3