From 9eb4593fb952ce8977c1d5f0edfd625833292757 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jan=20Posp=C3=AD=C5=A1il?= Date: Sun, 1 Oct 2006 20:18:48 +0000 Subject: [PATCH] huge code revision, rewritten almost everything - separate functions edg_wll_log_connect_proxy/direct edg_wll_log_write_proxy/direct edg_wll_log_read_proxy/direct - megajob challenge - dual registration: RegisterJobProxy now tries to do pseudo parallel registration to both bkserver and lbproxy - compiles fine (LB_PERF), needs thorough testing --- org.glite.lb.client/src/prod_proto.c | 178 ++++++-- org.glite.lb.client/src/prod_proto.h | 8 +- org.glite.lb.client/src/producer.c | 827 +++++++++++++---------------------- 3 files changed, 437 insertions(+), 576 deletions(-) diff --git a/org.glite.lb.client/src/prod_proto.c b/org.glite.lb.client/src/prod_proto.c index a652916..478cd35 100644 --- a/org.glite.lb.client/src/prod_proto.c +++ b/org.glite.lb.client/src/prod_proto.c @@ -1,4 +1,7 @@ #ident "$Header$" +/** + * \file prod_proto.c + */ #include "prod_proto.h" @@ -274,20 +277,18 @@ edg_wll_log_proto_client_end: * *---------------------------------------------------------------------- */ -int edg_wll_log_proto_client_proxy(edg_wll_Context context, edg_wll_PlainConnection *conn, edg_wll_LogLine logline) + +int edg_wll_log_write_proxy(edg_wll_Context context, edg_wll_PlainConnection *conn, edg_wll_LogLine logline) { - int len; - char *buffer,*answer = NULL; - static char et[256]; - int err; - int code; - int lbproto_code; - int count; + int len,count = 0; + char *buffer; - errno = err = code = count = 0; - lbproto_code = 0; edg_wll_ResetError(context); + /* encode message */ +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_log_write_proxy: encoding message\n"); +#endif { il_octet_string_t ll; @@ -296,51 +297,94 @@ int edg_wll_log_proto_client_proxy(edg_wll_Context context, edg_wll_PlainConnect len = encode_il_msg(&buffer, &ll); } if(len < 0) { - edg_wll_SetError(context,ENOMEM,"edg_wll_log_proto_client_proxy(): error encoding message"); - goto edg_wll_log_proto_client_proxy_end; + edg_wll_SetError(context,ENOMEM,"edg_wll_log_write_proxy(): error encoding message"); + return -1; } /* send message */ #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"log_proto_client_proxy: sending message...\n"); + fprintf(stderr,"edg_wll_log_write_proxy: sending message\n"); #endif - if (( count = edg_wll_plain_write_full(conn, buffer, len, &context->p_tmp_timeout)) < 0) { - edg_wll_SetError(context, EDG_WLL_IL_PROTO,"edg_wll_log_proto_client_proxy(): error sending message to socket"); - goto edg_wll_log_proto_client_proxy_end; + if ((count = edg_wll_plain_write_full(conn, buffer, len, &context->p_tmp_timeout)) < 0) { + edg_wll_SetError(context, EDG_WLL_IL_PROTO,"edg_wll_log_write_proxy: error sending message to socket"); + return -1; } + if (buffer) free(buffer); + +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_log_write_proxy: done\n"); +#endif + return count; +} + +int edg_wll_log_read_proxy(edg_wll_Context context, edg_wll_PlainConnection *conn) +{ + char *answer = NULL; + static char et[256]; + int err; + int code; + int lbproto_code; + int count; + + errno = err = code = count = 0; + lbproto_code = 0; + + edg_wll_ResetError(context); + /* get answer */ #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"log_proto_client_proxy: reading answer from server...\n"); + fprintf(stderr,"edg_wll_log_read_proxy: reading answer from server\n"); #endif if ((err = get_reply_plain(context, conn, &answer, &lbproto_code, &code)) != 0 ) { - edg_wll_SetError(context, EDG_WLL_IL_PROTO,"edg_wll_log_proto_client_proxy(): error reading answer from L&B Proxy server"); + edg_wll_SetError(context, EDG_WLL_IL_PROTO,"edg_wll_log_read_proxy: error reading answer from L&B Proxy server"); } else { #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"log_proto_client_proxy: read answer \"%d:%d: %s\"\n",lbproto_code,code,answer); + fprintf(stderr,"edg_wll_log_read_proxy: read answer \"%d:%d: %s\"\n",lbproto_code,code,answer); #endif switch (lbproto_code) { case LB_OK: break; case LB_NOMEM: - edg_wll_SetError(context, ENOMEM, "edg_wll_log_proto_client_proxy(): proxy out of memory"); + edg_wll_SetError(context, ENOMEM, "edg_wll_log_read_proxy: proxy out of memory"); break; case LB_PROTO: - edg_wll_SetError(context, EDG_WLL_IL_PROTO, "edg_wll_log_proto_client_proxy(): received protocol error response"); + edg_wll_SetError(context, EDG_WLL_IL_PROTO, "edg_wll_log_read_proxy: received protocol error response"); break; case LB_DBERR: - snprintf(et, sizeof(et), "error details from L&B Proxy server: %s", answer); + snprintf(et, sizeof(et), "edg_wll_log_read_proxy: error details from L&B Proxy server: %s", answer); edg_wll_SetError(context, code, et); break; default: - edg_wll_SetError(context, EDG_WLL_IL_PROTO, "edg_wll_log_proto_client_proxy(): received unknown protocol response"); + edg_wll_SetError(context, EDG_WLL_IL_PROTO, "edg_wll_log_read_proxy: received unknown protocol response"); break; } } +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_log_read_proxy: done\n"); +#endif + return edg_wll_Error(context,NULL,NULL); +} + +int edg_wll_log_proto_client_proxy(edg_wll_Context context, edg_wll_PlainConnection *conn, edg_wll_LogLine logline) +{ + int err = 0; + + edg_wll_ResetError(context); + + /* send message */ + if ((err = edg_wll_log_write_proxy(context, conn, logline)) == -1) { + edg_wll_UpdateError(context,EDG_WLL_IL_PROTO,"edg_wll_log_proto_client_proxy(): edg_wll_log_write_proxy error"); + goto edg_wll_log_proto_client_proxy_end; + } + + /* get answer */ + if ((err = edg_wll_log_read_proxy(context, conn)) != 0) { + edg_wll_UpdateError(context,err,"edg_wll_log_proto_client_proxy(): edg_wll_log_read_proxy error"); + } + edg_wll_log_proto_client_proxy_end: - if (buffer) free(buffer); - if (answer) free(answer); return edg_wll_Error(context,NULL,NULL); } /* @@ -356,20 +400,18 @@ edg_wll_log_proto_client_proxy_end: * *---------------------------------------------------------------------- */ -int edg_wll_log_proto_client_direct(edg_wll_Context context, edg_wll_GssConnection *con, edg_wll_LogLine logline) -{ - int len; - char *buffer,*answer = NULL; - static char et[256]; - int err; - int code, lbproto_code; - int count; +int edg_wll_log_write_direct(edg_wll_Context context, edg_wll_GssConnection *con, edg_wll_LogLine logline) +{ + int len,count = 0,err; + char *buffer; edg_wll_GssStatus gss_code; - errno = err = code = count = 0; edg_wll_ResetError(context); /* encode message */ +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_log_write_direct: encoding message\n"); +#endif { il_octet_string_t ll; @@ -377,26 +419,46 @@ int edg_wll_log_proto_client_direct(edg_wll_Context context, edg_wll_GssConnecti len = encode_il_msg(&buffer, &ll); } if(len < 0) { - edg_wll_SetError(context, ENOMEM, "edg_wll_log_proto_client_direct(): error encoding message"); - goto edg_wll_log_proto_client_direct_end; + edg_wll_SetError(context, ENOMEM, "edg_wll_log_write_direct: error encoding message"); + return -1; } + /* send message */ #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"log_proto_client_direct: sending message...\n"); + fprintf(stderr,"edg_wll_log_write_direct: sending message\n"); #endif count = 0; if (( err = edg_wll_gss_write_full(con, buffer, len, &context->p_tmp_timeout, &count, &gss_code)) < 0) { edg_wll_log_proto_handle_gss_failures(context,err,&gss_code,"edg_wll_gss_write_full()"); - edg_wll_UpdateError(context, EDG_WLL_IL_PROTO,"edg_wll_log_proto_client_direct(): error sending message"); - goto edg_wll_log_proto_client_direct_end; + edg_wll_UpdateError(context, EDG_WLL_IL_PROTO,"edg_wll_log_write_direct: error sending message"); + return -1; } + if (buffer) free(buffer); + +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_log_write_direct: done\n"); +#endif + return count; + +} + +int edg_wll_log_read_direct(edg_wll_Context context, edg_wll_GssConnection *con) +{ + char *answer = NULL; + static char et[256]; + int err; + int code, lbproto_code; + int count; + + errno = err = code = count = 0; + /* get answer */ #ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"log_proto_client_direct: reading answer from server...\n"); + fprintf(stderr,"edg_wll_log_read_direct: reading answer from server...\n"); #endif if ((err = get_reply_gss(context, con, &answer, &lbproto_code, &code)) != 0 ) { - edg_wll_SetError(context, EDG_WLL_IL_PROTO,"edg_wll_log_proto_client_direct(): error reading answer from L&B direct server"); + edg_wll_SetError(context, EDG_WLL_IL_PROTO,"edg_wll_edg_wll_log_read_direct: error reading answer from L&B direct server"); } else { #ifdef EDG_WLL_LOG_STUB fprintf(stderr,"log_proto_client_direct: read answer \"%d:%d: %s\"\n",lbproto_code,code,answer); @@ -404,25 +466,47 @@ int edg_wll_log_proto_client_direct(edg_wll_Context context, edg_wll_GssConnecti switch (lbproto_code) { case LB_OK: break; case LB_NOMEM: - edg_wll_SetError(context, ENOMEM, "log_proto_client_direct(): server out of memory"); + edg_wll_SetError(context, ENOMEM, "edg_wll_log_read_direct: server out of memory"); break; case LB_PROTO: - edg_wll_SetError(context, EDG_WLL_IL_PROTO, "log_proto_client_direct(): received protocol error response"); + edg_wll_SetError(context, EDG_WLL_IL_PROTO, "edg_wll_log_read_direct: received protocol error response"); break; case LB_DBERR: - snprintf(et, sizeof(et), "error details from L&B server: %s", answer); + snprintf(et, sizeof(et), "edg_wll_log_read_direct: error details from L&B server: %s", answer); edg_wll_SetError(context, code, et); break; default: - edg_wll_SetError(context, EDG_WLL_IL_PROTO, "log_proto_client_direct(): received unknown protocol response"); + edg_wll_SetError(context, EDG_WLL_IL_PROTO, "edg_wll_log_read_direct: received unknown protocol response"); break; } } +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_log_read_direct: done\n"); +#endif + return edg_wll_Error(context,NULL,NULL); + +} + +int edg_wll_log_proto_client_direct(edg_wll_Context context, edg_wll_GssConnection *conn, edg_wll_LogLine logline) +{ + int err = 0; + edg_wll_ResetError(context); + + /* send message */ + if ((err = edg_wll_log_write_direct(context, conn, logline)) == -1) { + edg_wll_UpdateError(context,EDG_WLL_IL_PROTO,"edg_wll_log_proto_client_direct(): edg_ell_log_write_direct error"); + goto edg_wll_log_proto_client_direct_end; + } + + /* get answer */ + if ((err = edg_wll_log_read_direct(context, conn)) != 0) { + edg_wll_UpdateError(context,err,"edg_wll_log_proto_client_direct(): edg_ell_log_read_direct error"); + } + + edg_wll_log_proto_client_direct_end: - if (buffer) free(buffer); - if (answer) free(answer); return edg_wll_Error(context,NULL,NULL); } diff --git a/org.glite.lb.client/src/prod_proto.h b/org.glite.lb.client/src/prod_proto.h index c5d5887..5eec4f3 100644 --- a/org.glite.lb.client/src/prod_proto.h +++ b/org.glite.lb.client/src/prod_proto.h @@ -19,8 +19,14 @@ extern "C" { #include "glite/lb/context-int.h" int edg_wll_log_proto_client(edg_wll_Context context, edg_wll_GssConnection *con, edg_wll_LogLine logline); -int edg_wll_log_proto_client_direct(edg_wll_Context context, edg_wll_GssConnection *con, edg_wll_LogLine logline); + int edg_wll_log_proto_client_proxy(edg_wll_Context context, edg_wll_PlainConnection *conn, edg_wll_LogLine logline); +int edg_wll_log_write_proxy(edg_wll_Context context, edg_wll_PlainConnection *conn, edg_wll_LogLine logline); +int edg_wll_log_read_proxy(edg_wll_Context context, edg_wll_PlainConnection *conn); + +int edg_wll_log_proto_client_direct(edg_wll_Context context, edg_wll_GssConnection *con, edg_wll_LogLine logline); +int edg_wll_log_write_direct(edg_wll_Context context, edg_wll_GssConnection *con, edg_wll_LogLine logline); +int edg_wll_log_read_direct(edg_wll_Context context, edg_wll_GssConnection *con); int edg_wll_log_proto_handle_gss_failures(edg_wll_Context context, int code, edg_wll_GssStatus *gss_code, const char *text); int edg_wll_log_proto_handle_plain_failures(edg_wll_Context context, int code, const char *text); diff --git a/org.glite.lb.client/src/producer.c b/org.glite.lb.client/src/producer.c index 349ac98..f448ced 100644 --- a/org.glite.lb.client/src/producer.c +++ b/org.glite.lb.client/src/producer.c @@ -32,12 +32,6 @@ int edg_wll_DoLogEventProxy(edg_wll_Context context, edg_wll_LogLine logline); int edg_wll_DoLogEventDirect(edg_wll_Context context, edg_wll_LogLine logline); #else -#ifdef LB_PERF -static int edg_wll_log_connect(edg_wll_Context context, edg_wll_GssConnection *con); -static int edg_wll_log_connect_proxy(edg_wll_Context context, edg_wll_PlainConnection *con); -static int edg_wll_log_connect_direct(edg_wll_Context context, edg_wll_GssConnection *con); -#endif - /* *---------------------------------------------------------------------- * handle_answers - handle answers from edg_wll_log_proto_client* @@ -83,9 +77,13 @@ int handle_answers(edg_wll_Context context, int code, const char *text) return edg_wll_Error(context, NULL, NULL); } - -#ifdef LB_PERF - +/** + *---------------------------------------------------------------------- + * opens a GSS connection to local-logger + * \param context INOUT context to work with, + * \param con OUT openned connection + *---------------------------------------------------------------------- + */ int edg_wll_log_connect(edg_wll_Context context, edg_wll_GssConnection *con) { int ret,answer; @@ -144,7 +142,6 @@ edg_wll_log_connect_end: * \brief helper logging function * \param context INOUT context to work with, * \param logline IN formated ULM string - * \note this is new (LB_PERF) edg_wll_DoLogEvent *---------------------------------------------------------------------- */ int edg_wll_DoLogEvent( @@ -165,85 +162,21 @@ int edg_wll_DoLogEvent( answer = edg_wll_log_proto_client(context,&con,logline); edg_wll_DoLogEvent_end: + if (con.sock) edg_wll_gss_close(&con,&context->p_tmp_timeout); return handle_answers(context,answer,"edg_wll_DoLogEvent()"); } -#else /* LB_PERF */ - /** *---------------------------------------------------------------------- - * Connects to local-logger and sends already formatted ULM string - * \brief helper logging function + * opens a plain (UNIX socket) connection to LB Proxy * \param context INOUT context to work with, - * \param logline IN formated ULM string - * \note this is original edg_wll_DoLogEvent + * \param con OUT openned connection *---------------------------------------------------------------------- */ -int edg_wll_DoLogEvent( - edg_wll_Context context, - edg_wll_LogLine logline) -{ - int ret,answer; - char *my_subject_name = NULL; - edg_wll_GssStatus gss_stat; - edg_wll_GssConnection con; - gss_cred_id_t cred = GSS_C_NO_CREDENTIAL; - OM_uint32 min_stat; - - edg_wll_ResetError(context); - ret = answer = 0; - memset(&con, 0, sizeof(con)); - - /* open an authenticated connection to the local-logger: */ -#ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"Logging to host %s, port %d\n", - context->p_destination, context->p_dest_port); -#endif - ret = edg_wll_gss_acquire_cred_gsi( - context->p_proxy_filename ? context->p_proxy_filename : context->p_cert_filename, - context->p_proxy_filename ? context->p_proxy_filename : context->p_key_filename, - &cred, &my_subject_name, &gss_stat); - /* Give up if unable to prescribed credentials, otherwise go on anonymously */ - if (ret && context->p_proxy_filename) { - edg_wll_SetErrorGss(context, "failed to load GSI credentials", &gss_stat); - goto edg_wll_DoLogEvent_end; - } - - if (my_subject_name != NULL) { -#ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"Using certificate: %s\n",my_subject_name); -#endif - free(my_subject_name); - } - if ((answer = edg_wll_gss_connect(cred, - context->p_destination, context->p_dest_port, - &context->p_tmp_timeout, &con, &gss_stat)) < 0) { - answer = edg_wll_log_proto_handle_gss_failures(context,answer,&gss_stat,"edg_wll_gss_connect()"); - goto edg_wll_DoLogEvent_end; - } - - /* and send the message to the local-logger: */ - answer = edg_wll_log_proto_client(context,&con,logline); - -edg_wll_DoLogEvent_end: - if (con.context != GSS_C_NO_CONTEXT) - edg_wll_gss_close(&con,&context->p_tmp_timeout); - if (cred != GSS_C_NO_CREDENTIAL) - gss_release_cred(&min_stat, &cred); - - return handle_answers(context,answer,"edg_wll_DoLogEvent()"); -} - -#endif /* LB_PERF */ - - - -#ifdef LB_PERF - int edg_wll_log_connect_proxy(edg_wll_Context context, edg_wll_PlainConnection *con) { - int answer = 0; + int answer = 0, retries; int flags; struct sockaddr_un saddr; @@ -269,16 +202,38 @@ int edg_wll_log_connect_proxy(edg_wll_Context context, edg_wll_PlainConnection * fprintf(stderr,"edg_wll_log_connect_proxy: openning connection to L&B Proxy at socket %s\n", context->p_lbproxy_store_sock? context->p_lbproxy_store_sock: socket_path); #endif - if (connect(con->sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { - if(errno != EISCONN) { - edg_wll_SetError(context,answer = errno,"connect()"); - close(con->sock); - goto edg_wll_log_connect_proxy_end; + retries = 0; + while ((answer = connect(con->sock, (struct sockaddr *)&saddr, sizeof(saddr))) < 0 && + errno == EAGAIN && + context->p_tmp_timeout.tv_sec >= 0 && context->p_tmp_timeout.tv_usec >= 0 && + !(context->p_tmp_timeout.tv_sec == 0 && context->p_tmp_timeout.tv_usec == 0) + ) + { + struct timespec ns = { 0, PROXY_CONNECT_RETRY * 1000000 /* 10 ms */ },rem; + + nanosleep(&ns,&rem); + + context->p_tmp_timeout.tv_usec -= ns.tv_nsec/1000; + context->p_tmp_timeout.tv_usec += rem.tv_nsec/1000; + + context->p_tmp_timeout.tv_sec -= ns.tv_sec; + context->p_tmp_timeout.tv_sec += rem.tv_sec; + + if (context->p_tmp_timeout.tv_usec < 0) { + context->p_tmp_timeout.tv_usec += 1000000; + context->p_tmp_timeout.tv_sec--; } + retries++; } +#ifdef EDG_WLL_LOG_STUB + if (retries) fprintf(stderr,"edg_wll_log_connect_proxy: %d connect retries\n",retries); +#endif edg_wll_log_connect_proxy_end: - edg_wll_plain_close(con); +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_log_connect_proxy: done\n"); +#endif + if (con) edg_wll_plain_close(con); return answer; } @@ -289,7 +244,6 @@ edg_wll_log_connect_proxy_end: * \brief helper logging function * \param context INOUT context to work with, * \param logline IN formated ULM string - * \note this is new (LB_PERF) edg_wll_DoLogEventProxy *---------------------------------------------------------------------- */ int edg_wll_DoLogEventProxy( @@ -297,167 +251,32 @@ int edg_wll_DoLogEventProxy( edg_wll_LogLine logline) { int answer = 0; - edg_wll_PlainConnection conn; - char *name_esc,*dguser; - edg_wll_LogLine out; - - name_esc = dguser = out = NULL; + edg_wll_PlainConnection con; edg_wll_ResetError(context); - memset(&conn, 0, sizeof(conn)); + memset(&con, 0, sizeof(con)); /* open a plain connection to L&B Proxy: */ - if ((answer = edg_wll_log_connect_proxy(context,&conn)) < 0) { + if ((answer = edg_wll_log_connect_proxy(context,&con)) < 0) { goto edg_wll_DoLogEventProxy_end; } - /* add DG.USER to the message: */ - name_esc = edg_wll_LogEscape(context->p_user_lbproxy); - if (asprintf(&dguser,"DG.USER=\"%s\" ",name_esc) == -1) { - edg_wll_SetError(context,answer = ENOMEM,"edg_wll_LogEventMasterProxy(): asprintf() error"); - goto edg_wll_DoLogEventProxy_end; - } - if (asprintf(&out,"%s%s",dguser,logline) == -1) { - edg_wll_SetError(context,answer = ENOMEM,"edg_wll_LogEventMasterProxy(): asprintf() error"); - goto edg_wll_DoLogEventProxy_end; - } - /* and send the message to the L&B Proxy: */ - answer = edg_wll_log_proto_client_proxy(context,&conn,out); + answer = edg_wll_log_proto_client_proxy(context,&con,logline); edg_wll_DoLogEventProxy_end: - edg_wll_plain_close(&conn); - - if (name_esc) free(name_esc); - if (dguser) free(dguser); - if (out) free(out); + if (con.sock) edg_wll_plain_close(&con); return handle_answers(context,answer,"edg_wll_DoLogEventProxy()"); } -#else /* LB_PERF */ - /** *---------------------------------------------------------------------- - * Connects to L&B Proxy and sends already formatted ULM string - * \brief helper logging function + * opens a GSS connection to bkserver * \param context INOUT context to work with, - * \param logline IN formated ULM string - * \note this is original edg_wll_DoLogEventProxy + * \param con OUT openned connection *---------------------------------------------------------------------- */ -int edg_wll_DoLogEventProxy( - edg_wll_Context context, - edg_wll_LogLine logline) -{ - int answer; - int flags,retries; - char *name_esc,*dguser; - struct sockaddr_un saddr; - edg_wll_PlainConnection conn; - edg_wll_LogLine out; - - answer = 0; - name_esc = dguser = out = NULL; - - edg_wll_ResetError(context); - - /* open a connection to the L&B Proxy: */ -#ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"Logging to L&B Proxy at socket %s\n", - context->p_lbproxy_store_sock? context->p_lbproxy_store_sock: socket_path); -#endif - memset(&conn, 0, sizeof(conn)); - conn.sock = socket(PF_UNIX, SOCK_STREAM, 0); - if (conn.sock < 0) { - edg_wll_SetError(context,answer = errno,"socket() error"); - goto edg_wll_DoLogEventProxy_end; - } - memset(&saddr, 0, sizeof(saddr)); - saddr.sun_family = AF_UNIX; - strcpy(saddr.sun_path, context->p_lbproxy_store_sock? - context->p_lbproxy_store_sock: socket_path); - if ((flags = fcntl(conn.sock, F_GETFL, 0)) < 0 || fcntl(conn.sock, F_SETFL, flags | O_NONBLOCK) < 0) { - edg_wll_SetError(context,answer = errno,"fcntl()"); - close(conn.sock); - goto edg_wll_DoLogEventProxy_end; - } - -/* non-retry variant (pre bug #18994) - * XXX: what is the EISCONN case good for? conn.sock is created above. - * - if (connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { - if(errno != EISCONN) { - edg_wll_SetError(context,answer = errno,"connect()"); - close(conn.sock); - goto edg_wll_DoLogEventProxy_end; - } - } -*/ - - retries = 0; - while ((answer = connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr))) < 0 && - errno == EAGAIN && - context->p_tmp_timeout.tv_sec >= 0 && context->p_tmp_timeout.tv_usec >= 0 && - !(context->p_tmp_timeout.tv_sec == 0 && context->p_tmp_timeout.tv_usec == 0) - ) - { - struct timespec ns = { 0, PROXY_CONNECT_RETRY * 1000000 /* 10 ms */ },rem; - - nanosleep(&ns,&rem); - - context->p_tmp_timeout.tv_usec -= ns.tv_nsec/1000; - context->p_tmp_timeout.tv_usec += rem.tv_nsec/1000; - - context->p_tmp_timeout.tv_sec -= ns.tv_sec; - context->p_tmp_timeout.tv_sec += rem.tv_sec; - - if (context->p_tmp_timeout.tv_usec < 0) { - context->p_tmp_timeout.tv_usec += 1000000; - context->p_tmp_timeout.tv_sec--; - } - retries++; - } - - if (answer) { - if (errno == EAGAIN) edg_wll_SetError(context,answer = ETIMEDOUT,"edg_wll_DoLogEventProxy connect()"); - else edg_wll_SetError(context,answer = errno,"connect()"); - close(conn.sock); - goto edg_wll_DoLogEventProxy_end; - } - -/* just debug if (retries) printf("edg_wll_DoLogEventProxy connect retries %d\n",retries); */ - - /* add DG.USER to the message: */ - name_esc = edg_wll_LogEscape(context->p_user_lbproxy); - if (asprintf(&dguser,"DG.USER=\"%s\" ",name_esc) == -1) { - edg_wll_SetError(context,answer = ENOMEM,"edg_wll_LogEventMasterProxy(): asprintf() error"); - goto edg_wll_DoLogEventProxy_end; - } - if (asprintf(&out,"%s%s",dguser,logline) == -1) { - edg_wll_SetError(context,answer = ENOMEM,"edg_wll_LogEventMasterProxy(): asprintf() error"); - goto edg_wll_DoLogEventProxy_end; - } - - /* and send the message to the L&B Proxy: */ - answer = edg_wll_log_proto_client_proxy(context,&conn,out); - -edg_wll_DoLogEventProxy_end: - edg_wll_plain_close(&conn); - - if (name_esc) free(name_esc); - if (dguser) free(dguser); - if (out) free(out); - - return handle_answers(context,answer,"edg_wll_DoLogEventProxy()"); -} - -#endif /* LB_PERF */ - - - -#ifdef LB_PERF - int edg_wll_log_connect_direct(edg_wll_Context context, edg_wll_GssConnection *con) { int ret,answer; @@ -491,7 +310,7 @@ int edg_wll_log_connect_direct(edg_wll_Context context, edg_wll_GssConnection *c #ifdef EDG_WLL_LOG_STUB if (my_subject_name) { - // XXX: shouldn't be probably icontext->p_user_lbproxy but some new parameter, eg. context->p_user + // XXX: shouldn't be probably context->p_user_lbproxy but some new parameter, eg. context->p_user edg_wll_SetParamString(context, EDG_WLL_PARAM_LBPROXY_USER, my_subject_name); fprintf(stderr,"edg_wll_log_connect_direct: using certificate: %s\n",my_subject_name); } else { @@ -509,6 +328,9 @@ int edg_wll_log_connect_direct(edg_wll_Context context, edg_wll_GssConnection *c } edg_wll_log_connect_direct_end: +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_log_connect_direct: done\n"); +#endif if (con->context != GSS_C_NO_CONTEXT) edg_wll_gss_close(con,&context->p_tmp_timeout); if (cred != GSS_C_NO_CREDENTIAL) @@ -525,7 +347,6 @@ edg_wll_log_connect_direct_end: * \brief helper logging function * \param context INOUT context to work with, * \param logline IN formated ULM string - * \note this is new (LB_PERF) edg_wll_DoLogEventDirect *---------------------------------------------------------------------- */ int edg_wll_DoLogEventDirect( @@ -534,9 +355,6 @@ int edg_wll_DoLogEventDirect( { int answer = 0; edg_wll_GssConnection con; - char *my_subject_name,*name_esc,*dguser; - edg_wll_LogLine out; - my_subject_name = name_esc = dguser = out = NULL; edg_wll_ResetError(context); memset(&con, 0, sizeof(con)); @@ -546,123 +364,15 @@ int edg_wll_DoLogEventDirect( goto edg_wll_DoLogEventDirect_end; } - /* add DG.USER to the message: */ - // XXX: again, it probably schouldn't be context->p_user_lbproxy - edg_wll_GetParam(context, EDG_WLL_PARAM_LBPROXY_USER, &my_subject_name); - name_esc = edg_wll_LogEscape(my_subject_name); - if (asprintf(&dguser,"DG.USER=\"%s\" ",name_esc) == -1) { - edg_wll_SetError(context,answer = ENOMEM,"edg_wll_DoLogEventDirect(): asprintf() error"); - goto edg_wll_DoLogEventDirect_end; - } - if (asprintf(&out,"%s%s\n",dguser,logline) == -1) { - edg_wll_SetError(context,answer = ENOMEM,"edg_wll_DoLogEventDirect(): asprintf() error"); - goto edg_wll_DoLogEventDirect_end; - } - /* and send the message to the bkserver: */ - answer = edg_wll_log_proto_client_direct(context,&con,out); + answer = edg_wll_log_proto_client_direct(context,&con,logline); edg_wll_DoLogEventDirect_end: - if (name_esc) free(name_esc); - if (dguser) free(dguser); - if (out) free(out); - if (my_subject_name) free(my_subject_name); + edg_wll_gss_close(&con,&context->p_tmp_timeout); return handle_answers(context,answer,"edg_wll_DoLogEventDirect()"); } -#else /* LB_PERF */ - -/** - *---------------------------------------------------------------------- - * Connects to bkserver and sends already formatted ULM string - * \brief helper logging function - * \param context INOUT context to work with, - * \param logline IN formated ULM string - * \note this is original edg_wll_DoLogEventDirect - *---------------------------------------------------------------------- - */ -int edg_wll_DoLogEventDirect( - edg_wll_Context context, - edg_wll_LogLine logline) -{ - int ret,answer; - char *my_subject_name,*name_esc,*dguser; - char *host; - int port; - edg_wll_LogLine out; - edg_wll_GssStatus gss_stat; - edg_wll_GssConnection con; - gss_cred_id_t cred = GSS_C_NO_CREDENTIAL; - OM_uint32 min_stat; - - ret = answer = 0; - my_subject_name = name_esc = dguser = out = NULL; - memset(&con, 0, sizeof(con)); - - edg_wll_ResetError(context); - - /* get bkserver location: */ - edg_wlc_JobIdGetServerParts(context->p_jobid,&host,&port); - port +=1; - - /* open an authenticated connection to the bkserver: */ -#ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"Logging to bkserver host %s, port %d\n", host, port); -#endif - ret = edg_wll_gss_acquire_cred_gsi( - context->p_proxy_filename ? context->p_proxy_filename : context->p_cert_filename, - context->p_proxy_filename ? context->p_proxy_filename : context->p_key_filename, - &cred, &my_subject_name, &gss_stat); - /* Give up if unable to prescribed credentials, otherwise go on anonymously */ - if (ret && context->p_proxy_filename) { - edg_wll_SetErrorGss(context, "failed to load GSI credentials", &gss_stat); - goto edg_wll_DoLogEventDirect_end; - } - -#ifdef EDG_WLL_LOG_STUB - if (my_subject_name) { - fprintf(stderr,"Using certificate: %s\n",my_subject_name); - } else { - fprintf(stderr,"Going on anonymously\n"); - } -#endif - if ((answer = edg_wll_gss_connect(cred,host,port, - &context->p_tmp_timeout, &con, &gss_stat)) < 0) { - answer = edg_wll_log_proto_handle_gss_failures(context,answer,&gss_stat,"edg_wll_gss_connect()"); - goto edg_wll_DoLogEventDirect_end; - } - - /* add DG.USER to the message: */ - name_esc = edg_wll_LogEscape(my_subject_name); - if (asprintf(&dguser,"DG.USER=\"%s\" ",name_esc) == -1) { - edg_wll_SetError(context,answer = ENOMEM,"edg_wll_DoLogEventDirect(): asprintf() error"); - goto edg_wll_DoLogEventDirect_end; - } - if (asprintf(&out,"%s%s\n",dguser,logline) == -1) { - edg_wll_SetError(context,answer = ENOMEM,"edg_wll_DoLogEventDirect(): asprintf() error"); - goto edg_wll_DoLogEventDirect_end; - } - - /* and send the message to the bkserver: */ - answer = edg_wll_log_proto_client_direct(context,&con,out); - -edg_wll_DoLogEventDirect_end: - if (con.context != GSS_C_NO_CONTEXT) - edg_wll_gss_close(&con,&context->p_tmp_timeout); - if (cred != GSS_C_NO_CREDENTIAL) - gss_release_cred(&min_stat, &cred); - if (host) free(host); - if (name_esc) free(name_esc); - if (dguser) free(dguser); - if (out) free(out); - if(my_subject_name) free(my_subject_name); - - return handle_answers(context,answer,"edg_wll_DoLogEventDirect()"); -} - -#endif /* LB_PERF */ - #endif /* FAKE_VERSION */ #define LOGFLAG_ASYNC 0 /**< asynchronous logging */ @@ -671,9 +381,6 @@ edg_wll_DoLogEventDirect_end: #define LOGFLAG_PROXY 4 /**< logging to L&B Proxy */ #define LOGFLAG_DIRECT 8 /**< logging directly to bkserver */ - -#ifdef LB_PERF - /** *---------------------------------------------------------------------- * Formats a logging message @@ -690,13 +397,13 @@ static int edg_wll_FormatLogLine( edg_wll_Context context, int flags, edg_wll_EventCode event, - edg_wll_LogLine logline, + edg_wll_LogLine *logline, char *fmt, ...) { va_list fmt_args; int priority; int ret; - char *fix,*var; + char *fix,*var,*dguser; char *source,*eventName,*lvl,*fullid,*seq; struct timeval start_time; char date[ULM_DATE_STRING_LENGTH+1]; @@ -705,13 +412,18 @@ static int edg_wll_FormatLogLine( int i; i = errno = size = ret = 0; - seq = fix = var = out = source = eventName = lvl = fullid = NULL; + seq = fix = var = dguser = out = source = eventName = lvl = fullid = NULL; priority = flags & LOGFLAG_SYNC; edg_wll_ResetError(context); /* format the message: */ va_start(fmt_args,fmt); + if (trio_vasprintf(&var,fmt,fmt_args) == -1) { + edg_wll_SetError(context,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_vasprintf() error"); + goto edg_wll_formatlogline_end; + } + va_end(fmt_args); gettimeofday(&start_time,0); if (edg_wll_ULMTimevalToDate(start_time.tv_sec,start_time.tv_usec,date) != 0) { @@ -738,11 +450,17 @@ static int edg_wll_FormatLogLine( edg_wll_SetError(context,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_asprintf() error"); goto edg_wll_formatlogline_end; } - if (trio_vasprintf(&var,fmt,fmt_args) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_vasprintf() error"); - goto edg_wll_formatlogline_end; + /* TODO: add always, probably new context->p_user */ + if ( ( (flags & LOGFLAG_PROXY) || (flags & LOGFLAG_DIRECT) ) && + (context->p_user_lbproxy) ) { + if (trio_asprintf(&dguser,EDG_WLL_FORMAT_USER,context->p_user_lbproxy) == -1) { + edg_wll_SetError(context,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_asprintf() error"); + goto edg_wll_formatlogline_end; + } + } else { + dguser = strdup(""); } - if (asprintf(&out,"%s%s\n",fix,var) == -1) { + if (asprintf(&out,"%s%s%s\n",fix,dguser,var) == -1) { edg_wll_SetError(context,ret = ENOMEM,"edg_wll_FormatLogLine(): asprintf() error"); goto edg_wll_formatlogline_end; } @@ -754,16 +472,18 @@ static int edg_wll_FormatLogLine( } #ifdef EDG_WLL_LOG_STUB -// fprintf(stderr,"edg_wll_LogEvent (%d chars): %s",size,out); + fprintf(stderr,"edg_wll_FormatLogLine (%d chars): %s",size,out); #endif + if (out) { + *logline = out; + } else { + *logline = NULL; + } - if (out) logline = out; - else logline = NULL; - edg_wll_formatlogline_end: - va_end(fmt_args); if (seq) free(seq); if (fix) free(fix); + if (dguser) free(dguser); if (var) free(var); if (source) free(source); if (lvl) free(lvl); @@ -807,139 +527,13 @@ static int edg_wll_LogEventMaster( /* format the message: */ va_start(fmt_args,fmt); - if (edg_wll_FormatLogLine(context,flags,event,out,fmt,fmt_args) != 0 ) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_LogEventMaster(): edg_wll_FormatLogLine() error"); - goto edg_wll_logeventmaster_end; - } - -#ifdef EDG_WLL_LOG_STUB -// fprintf(stderr,"edg_wll_LogEvent (%d chars): %s",size,out); -#endif - - context->p_tmp_timeout.tv_sec = 0; - context->p_tmp_timeout.tv_usec = 0; - if (priority) { - context->p_tmp_timeout = context->p_sync_timeout; - } - else { - context->p_tmp_timeout = context->p_log_timeout; - } - - /* and send the message */ -#ifndef LB_PERF_DROP - if (flags & LOGFLAG_NORMAL) { - /* to the local-logger: */ - ret = edg_wll_DoLogEvent(context, out); - } else if (flags & LOGFLAG_PROXY) { - /* to the L&B Proxy: */ - ret = edg_wll_DoLogEventProxy(context, out); - } else if (flags & LOGFLAG_DIRECT) { - /* directly to the bkserver: */ - ret = edg_wll_DoLogEventDirect(context, out); - } else { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_LogEventMaster(): wrong flag specified"); - } -#endif - -edg_wll_logeventmaster_end: - va_end(fmt_args); - if (out) free(out); - - if (!ret) if(edg_wll_IncSequenceCode(context)) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_LogEventMaster(): edg_wll_IncSequenceCode failed"); - } - - if (ret) edg_wll_UpdateError(context,0,"Logging library ERROR: "); - - return edg_wll_Error(context,NULL,NULL); -} - -#else /* LB_PERF */ - -/** - *---------------------------------------------------------------------- - * Formats a logging message and sends it to local-logger - * \brief master logging event function - * \param context INOUT context to work with, - * \param flags IN as defined by LOGFLAG_* - * \param event IN type of the event, - * \param fmt IN printf()-like format string, - * \param ... IN event specific values/data according to fmt. - * \note this is original edg_wll_LogEventMaster - *---------------------------------------------------------------------- - */ - -static int edg_wll_LogEventMaster( - edg_wll_Context context, - int flags, - edg_wll_EventCode event, - char *fmt, ...) -{ - va_list fmt_args; - int priority; - int ret,answer; - char *fix,*var; - char *source,*eventName,*lvl, *fullid,*seq; - struct timeval start_time; - char date[ULM_DATE_STRING_LENGTH+1]; - edg_wll_LogLine out; - size_t size; - int i; - - i = errno = size = 0; - seq = fix = var = out = source = eventName = lvl = fullid = NULL; - priority = flags & LOGFLAG_SYNC; - - edg_wll_ResetError(context); - - /* default return value is "Try Again" */ - answer = ret = EAGAIN; - - /* format the message: */ - va_start(fmt_args,fmt); - - gettimeofday(&start_time,0); - if (edg_wll_ULMTimevalToDate(start_time.tv_sec,start_time.tv_usec,date) != 0) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_LogEventMaster(): edg_wll_ULMTimevalToDate() error"); - goto edg_wll_logeventmaster_end; - } - source = edg_wll_SourceToString(context->p_source); - lvl = edg_wll_LevelToString(context->p_level); - eventName = edg_wll_EventToString(event); - if (!eventName) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_LogEventMaster(): event name not specified"); - goto edg_wll_logeventmaster_end; - } - if (!(fullid = edg_wlc_JobIdUnparse(context->p_jobid))) { - edg_wll_SetError(context,ret = EINVAL,"edg_wll_LogEventMaster(): edg_wlc_JobIdUnparse() error"); - goto edg_wll_logeventmaster_end; - } - seq = edg_wll_GetSequenceCode(context); - - if (trio_asprintf(&fix,EDG_WLL_FORMAT_COMMON, - date,context->p_host,lvl,priority, - source,context->p_instance ? context->p_instance : "", - eventName,fullid,seq) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_LogEventMaster(): trio_asprintf() error"); - goto edg_wll_logeventmaster_end; - } - if (trio_vasprintf(&var,fmt,fmt_args) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_LogEventMaster(): trio_vasprintf() error"); - goto edg_wll_logeventmaster_end; - } - if (asprintf(&out,"%s%s\n",fix,var) == -1) { - edg_wll_SetError(context,ret = ENOMEM,"edg_wll_LogEventMaster(): asprintf() error"); + if (edg_wll_FormatLogLine(context,flags,event,&out,fmt,fmt_args) != 0 ) { + edg_wll_UpdateError(context,ret = EINVAL,"edg_wll_LogEventMaster(): edg_wll_FormatLogLine() error"); goto edg_wll_logeventmaster_end; } - size = strlen(out); - - if (priority && (size > EDG_WLL_LOG_SYNC_MAXMSGSIZE)) { - edg_wll_SetError(context,ret = ENOSPC,"edg_wll_LogEventMaster(): Message size too large for synchronous transfer"); - goto edg_wll_logeventmaster_end; - } #ifdef EDG_WLL_LOG_STUB -// fprintf(stderr,"edg_wll_LogEvent (%d chars): %s",size,out); +// fprintf(stderr,"edg_wll_LogEventMaster (%d chars): %s",strlen(out),out); #endif context->p_tmp_timeout.tv_sec = 0; @@ -969,14 +563,7 @@ static int edg_wll_LogEventMaster( edg_wll_logeventmaster_end: va_end(fmt_args); - if (seq) free(seq); - if (fix) free(fix); - if (var) free(var); if (out) free(out); - if (source) free(source); - if (lvl) free(lvl); - if (eventName) free(eventName); - if (fullid) free(fullid); if (!ret) if(edg_wll_IncSequenceCode(context)) { edg_wll_SetError(context,ret = EINVAL,"edg_wll_LogEventMaster(): edg_wll_IncSequenceCode failed"); @@ -987,7 +574,6 @@ edg_wll_logeventmaster_end: return edg_wll_Error(context,NULL,NULL); } -#endif /* LB_PERF */ /** *---------------------------------------------------------------------- @@ -1062,6 +648,7 @@ edg_wll_logeventsync_end: *---------------------------------------------------------------------- * Formats a logging message and sends it synchronously to L&B Proxy * \brief generic synchronous logging function + * \note simple wrapper around edg_wll_LogEventMaster() *---------------------------------------------------------------------- */ int edg_wll_LogEventProxy( @@ -1149,6 +736,7 @@ edg_wll_logflush_end: *----------------------------------------------------------------------- * Instructs interlogger to to deliver all pending events * \brief flush all events from interlogger + * \note simple wrapper around edg_wll_LogEventMaster() *----------------------------------------------------------------------- */ int edg_wll_LogFlushAll( @@ -1232,6 +820,10 @@ int edg_wll_SetLoggingJobProxy( { int err; char *code_loc = NULL; + char *my_subject_name = NULL; + edg_wll_GssStatus gss_stat; + gss_cred_id_t cred = GSS_C_NO_CREDENTIAL; + OM_uint32 min_stat; edg_wll_ResetError(context); @@ -1244,7 +836,21 @@ int edg_wll_SetLoggingJobProxy( } /* add user credentials to context */ - edg_wll_SetParamString(context, EDG_WLL_PARAM_LBPROXY_USER, user); + if (user) { + edg_wll_SetParamString(context, EDG_WLL_PARAM_LBPROXY_USER, user); + } else { + /* acquire gss credentials */ + err = edg_wll_gss_acquire_cred_gsi( + context->p_proxy_filename ? context->p_proxy_filename : context->p_cert_filename, + context->p_proxy_filename ? context->p_proxy_filename : context->p_key_filename, + &cred, &my_subject_name, &gss_stat); + /* give up if unable to acquire prescribed credentials */ + if (err && context->p_proxy_filename) { + edg_wll_SetErrorGss(context, "failed to load GSI credentials", &gss_stat); + goto edg_wll_setloggingjobproxy_end; + } + edg_wll_SetParamString(context, EDG_WLL_PARAM_LBPROXY_USER, my_subject_name); + } /* query LBProxyServer for sequence code if not user-suplied */ /* XXX: don't know if working properly */ @@ -1261,6 +867,9 @@ int edg_wll_SetLoggingJobProxy( edg_wll_setloggingjobproxy_end: if (code_loc) free(code_loc); + if (cred != GSS_C_NO_CREDENTIAL) + gss_release_cred(&min_stat, &cred); + if (my_subject_name) free(my_subject_name); return edg_wll_Error(context,NULL,NULL); } @@ -1282,14 +891,11 @@ static int edg_wll_RegisterJobMaster( const char * seed, edg_wlc_JobId ** subjobs) { - char *seq,*type_s,*intseed,*parent_s,*user_dn; + char *seq,*type_s,*intseed,*parent_s; int err = 0; - edg_wll_GssStatus gss_stat; - gss_cred_id_t cred = GSS_C_NO_CREDENTIAL; - OM_uint32 min_stat; struct timeval sync_to; - seq = type_s = intseed = parent_s = user_dn = NULL; + seq = type_s = intseed = parent_s = NULL; edg_wll_ResetError(context); memcpy(&sync_to, &context->p_sync_timeout, sizeof sync_to); @@ -1302,7 +908,9 @@ static int edg_wll_RegisterJobMaster( edg_wll_SetError(context,EINVAL,"edg_wll_RegisterJobMaster(): no jobtype specified"); goto edg_wll_registerjobmaster_end; } - if ((type == EDG_WLL_REGJOB_DAG || type == EDG_WLL_REGJOB_PARTITIONED) + if ((type == EDG_WLL_REGJOB_DAG || + type == EDG_WLL_REGJOB_PARTITIONED || + type == EDG_WLL_REGJOB_COLLECTION) && num_subjobs > 0) { err = edg_wll_GenerateSubjobIds(context,job,num_subjobs,intseed,subjobs); /* increase log timeout on client (the same as on BK server) */ @@ -1323,24 +931,11 @@ static int edg_wll_RegisterJobMaster( (char *)jdl,ns,parent_s,type_s,num_subjobs,intseed); } } else if (flags & LOGFLAG_PROXY) { - /* first obtain the certificate */ - /* used in edg_wll_DoLogEventProxy() for field DG.USER */ - /* not mandatory now */ - - err = edg_wll_gss_acquire_cred_gsi( - context->p_proxy_filename ? context->p_proxy_filename : context->p_cert_filename, - context->p_proxy_filename ? context->p_proxy_filename : context->p_key_filename, - &cred, &user_dn, &gss_stat); - /* Give up if unable to obtain credentials */ - if (err && context->p_proxy_filename) { - edg_wll_SetErrorGss(context, "failed to load GSI credentials", &gss_stat); - goto edg_wll_registerjobmaster_end; - } /* SetLoggingJobProxy and and log to proxy */ edg_wll_SetSequenceCode(context, NULL, EDG_WLL_SEQ_NORMAL); if (seq) free(seq); seq = edg_wll_GetSequenceCode(context); - if (edg_wll_SetLoggingJobProxy(context,job,seq,user_dn,EDG_WLL_SEQ_NORMAL) == 0) { + if (edg_wll_SetLoggingJobProxy(context,job,seq,NULL,EDG_WLL_SEQ_NORMAL) == 0) { edg_wll_LogEventMaster(context,LOGFLAG_PROXY | LOGFLAG_SYNC, EDG_WLL_EVENT_REGJOB,EDG_WLL_FORMAT_REGJOB, (char *)jdl,ns,parent_s,type_s,num_subjobs,intseed); @@ -1358,8 +953,6 @@ static int edg_wll_RegisterJobMaster( edg_wll_registerjobmaster_end: memcpy(&context->p_sync_timeout, &sync_to, sizeof sync_to); - if (cred != GSS_C_NO_CREDENTIAL) - gss_release_cred(&min_stat, &cred); if (seq) free(seq); if (type_s) free(type_s); if (intseed) free(intseed); @@ -1367,6 +960,12 @@ edg_wll_registerjobmaster_end: return edg_wll_Error(context,NULL,NULL); } +/** + *----------------------------------------------------------------------- + * Register synchronously one job with L&B service + * \note simple wrapper around edg_wll_RegisterJobMaster() + *----------------------------------------------------------------------- + */ int edg_wll_RegisterJobSync( edg_wll_Context context, const edg_wlc_JobId job, @@ -1380,6 +979,12 @@ int edg_wll_RegisterJobSync( return edg_wll_RegisterJobMaster(context,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); } +/** + *----------------------------------------------------------------------- + * Register (asynchronously) one job with L&B service + * \note simple wrapper around edg_wll_RegisterJobMaster() + *----------------------------------------------------------------------- + */ int edg_wll_RegisterJob( edg_wll_Context context, const edg_wlc_JobId job, @@ -1394,7 +999,14 @@ int edg_wll_RegisterJob( } #ifdef LB_PERF -/* this is new (LB_PER) edg_wll_RegisterJobProxy */ + +/** + *----------------------------------------------------------------------- + * Register one job with L&B Proxy service + * \note simple wrapper around edg_wll_RegisterJobMaster() + * this is new (LB_PERF) edg_wll_RegisterJobProxy + *----------------------------------------------------------------------- + */ int edg_wll_RegisterJobProxy( edg_wll_Context context, const edg_wlc_JobId job, @@ -1406,20 +1018,142 @@ int edg_wll_RegisterJobProxy( edg_wlc_JobId ** subjobs) { #define MY_SEED "edg_wll_RegisterJobProxy()" - /* first register with bkserver */ - int ret = edg_wll_RegisterJobMaster(context,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed ? seed : MY_SEED,subjobs); + char *seq,*type_s; + edg_wll_LogLine logline = NULL; + int ret = 0,n,count,fd; + struct timeval sync_to; + edg_wll_GssConnection con_bkserver; + edg_wll_PlainConnection con_lbproxy; + fd_set fdset; + + seq = type_s = NULL; + + edg_wll_ResetError(context); + memcpy(&sync_to, &context->p_sync_timeout, sizeof sync_to); + memset(&con_bkserver, 0, sizeof(con_bkserver)); + memset(&con_lbproxy, 0, sizeof(con_lbproxy)); + FD_ZERO(&fdset); + + type_s = edg_wll_RegJobJobtypeToString(type); + if (!type_s) { + edg_wll_SetError(context,EINVAL,"edg_wll_RegisterJobProxy(): no jobtype specified"); + goto edg_wll_registerjobproxy_end; + } + if ((type == EDG_WLL_REGJOB_DAG || + type == EDG_WLL_REGJOB_PARTITIONED || + type == EDG_WLL_REGJOB_COLLECTION) + && num_subjobs > 0) { + ret = edg_wll_GenerateSubjobIds(context,job,num_subjobs,seed ? seed : MY_SEED,subjobs); + /* increase log timeout on client (the same as on BK server) */ + context->p_sync_timeout.tv_sec += num_subjobs; + if (context->p_sync_timeout.tv_sec > 86400) context->p_sync_timeout.tv_sec = 86400; + } if (ret) { - edg_wll_UpdateError(context,0,"edg_wll_RegisterJobProxy(): unable to register with bkserver"); - return edg_wll_Error(context,NULL,NULL); + edg_wll_UpdateError(context,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_GenerateSubjobIds() error"); + goto edg_wll_registerjobproxy_end; } - /* and then with L&B Proxy */ - return edg_wll_RegisterJobMaster(context,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed ? seed : MY_SEED,subjobs); + + /* SetLoggingJobProxy */ + edg_wll_SetSequenceCode(context, NULL, EDG_WLL_SEQ_NORMAL); + seq = edg_wll_GetSequenceCode(context); + if (edg_wll_SetLoggingJobProxy(context,job,seq,NULL,EDG_WLL_SEQ_NORMAL) != 0) { + edg_wll_UpdateError(context,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_SetLoggingJobProxy() error"); + goto edg_wll_registerjobproxy_end; + } + + /* format the RegJob event message */ + if (edg_wll_FormatLogLine(context,LOGFLAG_SYNC,EDG_WLL_EVENT_REGJOB,&logline,EDG_WLL_FORMAT_REGJOB, + (char *)jdl,ns,"",type_s,num_subjobs,seed ? seed : MY_SEED) != 0 ) { + edg_wll_UpdateError(context,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_FormatLogLine() error"); + goto edg_wll_registerjobproxy_end; + } + + /* and now do the pseudo-parallel registration: */ + + /* connect to bkserver */ + if ((ret = edg_wll_log_connect_direct(context,&con_bkserver)) < 0) { + edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_connect_direct error"); + goto edg_wll_registerjobproxy_end; + } + /* connect to lbproxy */ + if ((ret = edg_wll_log_connect_proxy(context,&con_lbproxy)) < 0) { + edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_connect_proxy error"); + goto edg_wll_registerjobproxy_end; + } + /* send to bkserver */ + if ((ret = edg_wll_log_write_direct(context,&con_bkserver,logline)) == -1) { + edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_write_direct error"); + goto edg_wll_registerjobproxy_end; + } + /* send to lbproxy */ + if ((ret = edg_wll_log_write_proxy(context,&con_lbproxy,logline)) == -1) { + edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_write_proxy error"); + goto edg_wll_registerjobproxy_end; + } + /* select and read the answers */ + FD_SET(con_bkserver.sock,&fdset); + n = con_bkserver.sock; + FD_SET(con_lbproxy.sock,&fdset); + if (con_lbproxy.sock > n) n = con_lbproxy.sock; + n += 1; + count = 2; + while (count > 0) { + fd = select(n,&fdset,NULL,NULL,&context->p_tmp_timeout); + switch (fd) { + case 0: /* timeout */ + edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): select() timeouted"); + count = 0; + break; + case -1: /* error */ + switch(errno) { + case EINTR: + continue; + default: + edg_wll_UpdateError(context,errno,"edg_wll_RegisterJobProxy(): select() error"); + } + default: + break; + } + if (FD_ISSET(con_bkserver.sock,&fdset)) { + /* read answer from bkserver */ + if ((ret = edg_wll_log_read_direct(context,&con_bkserver)) == -1) { + edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_read_direct error"); + goto edg_wll_registerjobproxy_end; + } + count -= 1; + } + if (FD_ISSET(con_lbproxy.sock,&fdset)) { + /* read answer from lbproxy */ + if ((ret = edg_wll_log_read_proxy(context,&con_lbproxy)) == -1) { + edg_wll_UpdateError(context,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_read_proxy error"); + goto edg_wll_registerjobproxy_end; + } + count -= 1; + } + } + +edg_wll_registerjobproxy_end: + if (con_bkserver.sock) edg_wll_gss_close(&con_bkserver,&context->p_tmp_timeout); + if (con_lbproxy.sock) edg_wll_plain_close(&con_lbproxy); + + memcpy(&context->p_sync_timeout, &sync_to, sizeof sync_to); + if (type_s) free(type_s); + if (seq) free(seq); + if (logline) free(logline); + + return edg_wll_Error(context,NULL,NULL); #undef MY_SEED } #else /* LB_PERF */ -/* this is original edg_wll_RegisterJobProxy */ +/** + *----------------------------------------------------------------------- + * Register one job with L&B Proxy service + * \note simple wrapper around edg_wll_RegisterJobMaster() + * this is original edg_wll_RegisterJobProxy + *----------------------------------------------------------------------- + */ int edg_wll_RegisterJobProxy( edg_wll_Context context, const edg_wlc_JobId job, @@ -1446,8 +1180,14 @@ int edg_wll_RegisterJobProxy( #ifdef LB_PERF -// function for loggin only to LBProxy -// useful for performace measuring + +/** + *----------------------------------------------------------------------- + * Register one job with L&B Proxy service ONLY + * \note simple wrapper around edg_wll_RegisterJobMaster() + * useful for performace measuring + *----------------------------------------------------------------------- + */ int edg_wll_RegisterJobProxyOnly( edg_wll_Context context, const edg_wlc_JobId job, @@ -1463,8 +1203,15 @@ int edg_wll_RegisterJobProxyOnly( #undef MY_SEED } -#endif +#endif /* LB_PERF */ +/** + *----------------------------------------------------------------------- + * Register one subjob with L&B service + * \note simple wrapper around edg_wll_RegisterJobMaster() + *----------------------------------------------------------------------- + */ +static int edg_wll_RegisterSubjob( edg_wll_Context context, const edg_wlc_JobId job, @@ -1479,6 +1226,13 @@ int edg_wll_RegisterSubjob( return edg_wll_RegisterJobMaster(context,LOGFLAG_NORMAL,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); } +/** + *----------------------------------------------------------------------- + * Register one subjob with L&B Proxy service + * \note simple wrapper around edg_wll_RegisterJobMaster() + *----------------------------------------------------------------------- + */ +static int edg_wll_RegisterSubjobProxy( edg_wll_Context context, const edg_wlc_JobId job, @@ -1493,6 +1247,12 @@ int edg_wll_RegisterSubjobProxy( return edg_wll_RegisterJobMaster(context,LOGFLAG_PROXY,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); } +/** + *----------------------------------------------------------------------- + * Register batch of subjobs with L&B service + * \note simple wrapper around edg_wll_RegisterSubjob() + *----------------------------------------------------------------------- + */ int edg_wll_RegisterSubjobs( edg_wll_Context ctx, const edg_wlc_JobId parent, @@ -1532,6 +1292,12 @@ edg_wll_registersubjobs_end: return edg_wll_Error(ctx, NULL, NULL); } +/** + *----------------------------------------------------------------------- + * Register batch of subjobs with L&B Proxy service + * \note simple wrapper around edg_wll_RegisterSubjobProxy() + *----------------------------------------------------------------------- + */ int edg_wll_RegisterSubjobsProxy( edg_wll_Context ctx, const edg_wlc_JobId parent, @@ -1571,6 +1337,11 @@ edg_wll_registersubjobsproxy_end: return edg_wll_Error(ctx, NULL, NULL); } +/** + *----------------------------------------------------------------------- + * Change ACL for given job + *----------------------------------------------------------------------- + */ int edg_wll_ChangeACL( edg_wll_Context ctx, const edg_wlc_JobId jobid, -- 1.8.2.3