#include "glite/lb/consumer.h"
#include "lb_gss.h"
+#include "lb_plain_io.h"
#include "authz.h"
#ifdef __cplusplus
/* http(s) stream */
gss_cred_id_t gsiCred;
edg_wll_GssConnection gss;
+ edg_wll_Connection conn; /* for plain (non-gss) connections - i.e. lbproxy */
char *buf;
int bufUse,bufSize;
- int conn; /* for plain (non-gss) connections - i.e. lbproxy */
/* timestamp of usage of this entry in ctx.connPool */
struct timeval lastUsed;
enum edg_wll_Level p_level;
char *p_destination;
int p_dest_port;
+ char *p_lbproxy_store_sock;
+ char *p_lbproxy_serve_sock;
char *p_user_lbproxy;
struct timeval p_log_timeout,p_sync_timeout,p_query_timeout, p_notif_timeout, p_tmp_timeout;
char *p_query_server;
extern "C" {
#endif
-int edg_wll_plain_read_fullbuf(
- int conn,
+typedef struct _edg_wll_Connection {
+ int sock;
+ char *buffer;
+ size_t bufsz;
+ size_t bufuse;
+} edg_wll_Connection;
+
+
+int edg_wll_plain_accept(
+ int sock,
+ edg_wll_Connection *conn);
+
+int edg_wll_plain_read(
+ edg_wll_Connection *conn,
void *outbuf,
size_t outbufsz,
struct timeval *timeout);
int edg_wll_plain_read_full(
- int conn,
- void **out,
- size_t outsz,
+ edg_wll_Connection *conn,
+ void *outbuf,
+ size_t outbufsz,
struct timeval *timeout);
int edg_wll_plain_write_full(
- int conn,
+ edg_wll_Connection *conn,
const void *buf,
size_t bufsz,
struct timeval *timeout);
const char * /* IN: message body */
);
+extern edg_wll_ErrorCode edg_wll_http_recv_proxy(
+ edg_wll_Context, /* INOUT: context */
+ char **, /* OUT: first line */
+ char ***, /* OUT: null terminated array of headers */
+ char ** /* OUT: message body */
+);
+
+extern edg_wll_ErrorCode edg_wll_http_send_proxy(
+ edg_wll_Context, /* INOUT: context */
+ const char *, /* IN: first line */
+ char const * const *, /* IN: headers */
+ const char * /* IN: message body */
+);
+
#endif /* __EDG_WORKLOAD_LOGGING_COMMON_MINI_HTTP_H__ */
for (i=0; i<ctx->poolSize; i++) {
if (ctx->connPool[i].peerName) free(ctx->connPool[i].peerName);
+
+ close(ctx->connPool[i].conn.sock);
+ if (ctx->connPool[i].conn.buffer) free(ctx->connPool[i].conn.buffer);
+
edg_wll_gss_close(&ctx->connPool[i].gss,&close_timeout);
if (ctx->connPool[i].gsiCred)
gss_release_cred(&min_stat, &ctx->connPool[i].gsiCred);
}\
}
+#define bufshift(conn, shift) { \
+ memmove((conn)->buffer, (conn)->buffer+(shift), (conn)->bufuse-(shift)); \
+ (conn)->bufuse -= (shift); \
+}
-int edg_wll_plain_write_full(
- int conn,
- const void *buf,
- size_t bufsz,
- struct timeval *to)
+int edg_wll_plain_connect(
+ char const *hostname,
+ int port,
+ struct timeval *to,
+ edg_wll_Connection *conn)
{
- size_t written = 0;
- ssize_t ct = -1;
- fd_set fds;
- struct timeval timeout, before, after;
-
-
- if ( to ) {
- memcpy(&timeout, to, sizeof(timeout));
- gettimeofday(&before, NULL);
- }
-
- errno = 0;
- while ( written < bufsz ) {
- FD_ZERO(&fds);
- FD_SET(conn, &fds);
-
- switch ( select(conn+1, NULL, &fds, NULL, to ? &timeout : NULL) ) {
- case 0: errno = ETIMEDOUT; goto end; break;
- case -1: goto end; break;
- }
- if ( (ct = write(conn, ((char*)buf)+written, bufsz-written)) < 0 ) {
- if ( errno == EINTR ) { errno = 0; continue; }
- else goto end;
- }
- written += ct;
- }
-
-end:
- if ( to ) {
- gettimeofday(&after, NULL);
- tv_sub(after, before);
- tv_sub(*to, after);
- if (to->tv_sec < 0) to->tv_sec = to->tv_usec = 0;
- }
-
- return (errno)? -1: written;
+ return 0;
}
+int edg_wll_plain_accept(
+ int sock,
+ edg_wll_Connection *conn)
+{
+ struct sockaddr_in a;
+ int alen = sizeof(a);
+
+ /* Do not free the buffer here - just reuse the memmory
+ */
+ conn->bufuse = 0;
+ /*
+ if ( (conn->sock = accept(sock, (struct sockaddr *)&a, &alen)) )
+ return -1;
+ */
+ return 0;
+}
-int edg_wll_plain_read_full(
- int conn,
- void **out,
- size_t outsz,
- struct timeval *to)
+int edg_wll_plain_read(
+ edg_wll_Connection *conn,
+ void *outbuf,
+ size_t outbufsz,
+ struct timeval *to)
{
- ssize_t ct, sz, total = 0;
- char buf[4098],
- *tmp = NULL;
+ size_t ct, toread = 0;
fd_set fds;
struct timeval timeout, before, after;
+ if ( conn->bufsz == 0 ) {
+ if ( !(conn->buffer = malloc(BUFSIZ)) ) return -1;
+ conn->bufsz = BUFSIZ;
+ conn->bufuse = 0;
+ }
+
if ( to ) {
memcpy(&timeout, to, sizeof(timeout));
gettimeofday(&before, NULL);
}
errno = 0;
- sz = sizeof(buf);
+
+ if ( conn->bufuse > 0 ) goto cleanup;
+
+ toread = 0;
do {
FD_ZERO(&fds);
- FD_SET(conn, &fds);
- switch (select(conn+1, &fds, NULL, NULL, to ? &timeout : NULL)) {
+ FD_SET(conn->sock, &fds);
+ switch (select(conn->sock+1, &fds, NULL, NULL, to ? &timeout : NULL)) {
case 0: errno = ETIMEDOUT; goto cleanup; break;
case -1: goto cleanup; break;
}
- if ( sz > outsz-total ) sz = outsz-total;
- if ( (ct = read(conn, buf, sz)) < 0 ) {
+ if ( conn->bufuse == conn->bufsz ) {
+ char *tmp = realloc(conn->buffer, conn->bufsz+BUFSIZ);
+ if ( !tmp ) return -1;
+ conn->buffer = tmp;
+ conn->bufsz += BUFSIZ;
+ }
+ toread = conn->bufsz - conn->bufuse;
+ if ( (ct = read(conn->sock, conn->buffer+conn->bufuse, toread)) < 0 ) {
if ( errno == EINTR ) continue;
goto cleanup;
}
- if ( ct > 0 ) {
- char *t = realloc(tmp, total+ct);
-
- if ( !t ) goto cleanup;
- tmp = t;
- memcpy(tmp+total, buf, ct);
- total += ct;
+ if ( ct == 0 && conn->bufuse == 0 && errno == 0 ) {
+ errno = ENOTCONN;
+ goto cleanup;
}
- else if ( total == 0 && errno == 0 ) { errno = ENOTCONN; goto cleanup; }
- } while ( total < outsz );
+
+ conn->bufuse += ct;
+ } while ( ct == toread );
cleanup:
if ( to->tv_sec < 0 ) to->tv_sec = to->tv_usec = 0;
}
- if ( errno ) { free(tmp); return -1; }
+ if ( errno ) return -1;
- *out = tmp;
- return total;
+ if ( conn->bufuse > 0 ) {
+ size_t len = (conn->bufuse < outbufsz) ? conn->bufuse : outbufsz;
+ memcpy(outbuf, conn->buffer, len);
+ outbufsz = len;
+ bufshift(conn, len);
+ return len;
+ }
+
+ return 0;
}
-int edg_wll_plain_read_fullbuf(
- int conn,
+
+int edg_wll_plain_read_full(
+ edg_wll_Connection *conn,
void *outbuf,
size_t outbufsz,
struct timeval *to)
{
- ssize_t ct, sz, total = 0;
- char buf[4098];
+ size_t total = 0;
+
+
+ if ( conn->bufuse > 0 ) {
+ size_t len = (conn->bufuse < outbufsz) ? conn->bufuse : outbufsz;
+ memcpy(outbuf, conn->buffer, len);
+ outbufsz = len;
+ bufshift(conn, len);
+ total += len;
+ }
+
+ while ( total < outbufsz ) {
+ size_t ct = edg_wll_plain_read(conn, outbuf+total, outbufsz-total, to);
+ if ( ct < 0) return ct;
+ total += ct;
+ }
+
+ return total;
+}
+
+int edg_wll_plain_write_full(
+ edg_wll_Connection *conn,
+ const void *buf,
+ size_t bufsz,
+ struct timeval *to)
+{
+ size_t written = 0;
+ ssize_t ct = -1;
fd_set fds;
struct timeval timeout, before, after;
}
errno = 0;
- sz = sizeof(buf);
- do {
+ while ( written < bufsz ) {
FD_ZERO(&fds);
- FD_SET(conn, &fds);
- switch (select(conn+1, &fds, NULL, NULL, to ? &timeout : NULL)) {
- case 0: errno = ETIMEDOUT; goto cleanup; break;
- case -1: goto cleanup; break;
- }
+ FD_SET(conn->sock, &fds);
- if ( sz > outbufsz-total ) sz = outbufsz-total;
- if ( (ct = read(conn, buf, sz)) < 0 ) {
- if ( errno == EINTR ) continue;
- goto cleanup;
+ switch ( select(conn->sock+1, NULL, &fds, NULL, to? &timeout: NULL) ) {
+ case 0: errno = ETIMEDOUT; goto end; break;
+ case -1: goto end; break;
}
-
- if ( ct > 0 ) {
- memcpy(outbuf+total, buf, ct);
- total += ct;
+ if ( (ct=write(conn->sock, ((char*)buf)+written, bufsz-written)) < 0 ) {
+ if ( errno == EINTR ) { errno = 0; continue; }
+ else goto end;
}
- else if ( total == 0 && errno == 0 ) { errno = ENOTCONN; goto cleanup; }
- } while ( total < outbufsz );
+ written += ct;
+ }
-cleanup:
+end:
if ( to ) {
gettimeofday(&after, NULL);
tv_sub(after, before);
tv_sub(*to, after);
- if ( to->tv_sec < 0 ) to->tv_sec = to->tv_usec = 0;
+ if (to->tv_sec < 0) to->tv_sec = to->tv_usec = 0;
}
- return errno? -1: total;
+ return (errno)? -1: written;
}
return edg_wll_Error(ctx,NULL,NULL);
}
+edg_wll_ErrorCode edg_wll_http_recv_proxy(edg_wll_Context ctx,char **firstOut,char ***hdrOut,char **bodyOut)
+{
+ char **hdr = NULL,*first = NULL,*body = NULL;
+ enum { FIRST, HEAD, BODY, DONE } pstat = FIRST;
+ int len, nhdr = 0,rdmore = 0,clen = 0,blen = 0;
+
+#define bshift(shift) {\
+ memmove(ctx->connPool[ctx->connToUse].buf,\
+ ctx->connPool[ctx->connToUse].buf+(shift),\
+ ctx->connPool[ctx->connToUse].bufUse-(shift));\
+ ctx->connPool[ctx->connToUse].bufUse -= (shift);\
+}
+ edg_wll_ResetError(ctx);
+
+ if ( !ctx->connPool[ctx->connToUse].buf ) {
+ ctx->connPool[ctx->connToUse].bufSize = BUFSIZ;
+ ctx->connPool[ctx->connToUse].buf = malloc(BUFSIZ);
+ }
+
+ do {
+ len = edg_wll_plain_read(&ctx->connPool[ctx->connToUse].conn,
+ ctx->connPool[ctx->connToUse].buf+ctx->connPool[ctx->connToUse].bufUse,
+ ctx->connPool[ctx->connToUse].bufSize-ctx->connPool[ctx->connToUse].bufUse,
+ &ctx->p_tmp_timeout);
+ if ( len < 0 ) goto error;
+
+ ctx->connPool[ctx->connToUse].bufUse += len;
+ rdmore = 0;
+
+ while (!rdmore && pstat != DONE) switch (pstat) {
+ char *cr;
+
+ case FIRST:
+ if ((cr = memchr(ctx->connPool[ctx->connToUse].buf,'\r',ctx->connPool[ctx->connToUse].bufUse)) &&
+ ctx->connPool[ctx->connToUse].bufUse >= cr-ctx->connPool[ctx->connToUse].buf+2 && cr[1] == '\n')
+ {
+ *cr = 0;
+ first = strdup(ctx->connPool[ctx->connToUse].buf);
+ bshift(cr-ctx->connPool[ctx->connToUse].buf+2);
+ pstat = HEAD;
+ } else rdmore = 1;
+ break;
+ case HEAD:
+ if ((cr = memchr(ctx->connPool[ctx->connToUse].buf,'\r',ctx->connPool[ctx->connToUse].bufUse)) &&
+ ctx->connPool[ctx->connToUse].bufUse >= cr-ctx->connPool[ctx->connToUse].buf+2 && cr[1] == '\n')
+ {
+ if (cr == ctx->connPool[ctx->connToUse].buf) {
+ bshift(2);
+ pstat = clen ? BODY : DONE;
+ if (clen) body = malloc(clen+1);
+ break;
+ }
+
+ *cr = 0;
+ hdr = realloc(hdr,(nhdr+2) * sizeof(*hdr));
+ hdr[nhdr] = strdup(ctx->connPool[ctx->connToUse].buf);
+ hdr[++nhdr] = NULL;
+
+ if (!strncasecmp(ctx->connPool[ctx->connToUse].buf,CONTENT_LENGTH,sizeof(CONTENT_LENGTH)-1))
+ clen = atoi(ctx->connPool[ctx->connToUse].buf+sizeof(CONTENT_LENGTH)-1);
+
+ bshift(cr-ctx->connPool[ctx->connToUse].buf+2);
+ } else rdmore = 1;
+ break;
+ case BODY:
+ if (ctx->connPool[ctx->connToUse].bufUse) {
+ int m = min(ctx->connPool[ctx->connToUse].bufUse,clen-blen);
+ memcpy(body+blen,ctx->connPool[ctx->connToUse].buf,m);
+ blen += m;
+ bshift(m);
+ }
+ rdmore = 1;
+ if (blen == clen) {
+ pstat = DONE;
+ body[blen] = 0;
+ }
+ break;
+ default:
+ break;
+ }
+ } while (pstat != DONE);
+
+error:
+ if (edg_wll_Error(ctx,NULL,NULL)) {
+ if (hdr) {
+ char **h;
+ for (h = hdr; *h; h++) free(*h);
+ free(hdr);
+ }
+ free(first);
+ free(body);
+ } else {
+ if (firstOut) *firstOut = first; else free(first);
+ if (hdrOut) *hdrOut = hdr;
+ else if (hdr) {
+ char **h;
+ for (h = hdr; *h; h++) free(*h);
+ free(hdr);
+ }
+ if (bodyOut) *bodyOut = body; else free(body);
+ }
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
static int real_write(edg_wll_Context ctx, edg_wll_GssConnection *con,const char *data,int len)
{
int total = 0;
return edg_wll_Error(ctx,NULL,NULL);
}
+
+edg_wll_ErrorCode edg_wll_http_send_proxy(edg_wll_Context ctx, const char *first, const char * const *head, const char *body)
+{
+ const char* const *h;
+ int len = 0, blen;
+
+
+ edg_wll_ResetError(ctx);
+
+ if ( edg_wll_plain_write_full(&ctx->connPool[ctx->connToUse].conn,
+ first, strlen(first), &ctx->p_tmp_timeout) < 0
+ || edg_wll_plain_write_full(&ctx->connPool[ctx->connToUse].conn,
+ "\r\n", 2, &ctx->p_tmp_timeout) < 0 )
+ return edg_wll_SetError(ctx, errno, "edg_wll_http_send()");
+
+ if ( head ) for ( h = head; *h; h++ )
+ if ( edg_wll_plain_write_full(&ctx->connPool[ctx->connToUse].conn,
+ *h, strlen(*h), &ctx->p_tmp_timeout) < 0
+ || edg_wll_plain_write_full(&ctx->connPool[ctx->connToUse].conn,
+ "\r\n", 2, &ctx->p_tmp_timeout) < 0 )
+ return edg_wll_SetError(ctx, errno, "edg_wll_http_send()");
+
+ if ( body ) {
+ char buf[100];
+
+ len = strlen(body);
+ blen = sprintf(buf, CONTENT_LENGTH " %d\r\n",len);
+ if (edg_wll_plain_write_full(&ctx->connPool[ctx->connToUse].conn,
+ buf, blen, &ctx->p_tmp_timeout) < 0)
+ return edg_wll_SetError(ctx, errno, "edg_wll_http_send()");
+ }
+
+ if ( edg_wll_plain_write_full(&ctx->connPool[ctx->connToUse].conn,
+ "\r\n", 2, &ctx->p_tmp_timeout) < 0)
+ return edg_wll_SetError(ctx, errno, "edg_wll_http_send()");
+ if ( body && edg_wll_plain_write_full(&ctx->connPool[ctx->connToUse].conn,
+ body, len, &ctx->p_tmp_timeout) < 0)
+ return edg_wll_SetError(ctx, errno, "edg_wll_http_send()");
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
if (!val) val = "no";
ctx->p_query_server_override = !strcasecmp(val,"yes");
break;
+ case EDG_WLL_PARAM_LBPROXY_STORE_SOCK:
+ free(ctx->p_lbproxy_store_sock);
+ ctx->p_lbproxy_store_sock = val ? strdup(val): NULL;
+ break;
+ case EDG_WLL_PARAM_LBPROXY_SERVE_SOCK:
+ free(ctx->p_lbproxy_serve_sock);
+ ctx->p_lbproxy_serve_sock = val ? strdup(val): NULL;
+ break;
default:
return edg_wll_SetError(ctx,EINVAL,"unknown parameter");
}
case EDG_WLL_PARAM_X509_PROXY:
case EDG_WLL_PARAM_X509_KEY:
case EDG_WLL_PARAM_X509_CERT:
+ case EDG_WLL_PARAM_LBPROXY_STORE_SOCK:
+ case EDG_WLL_PARAM_LBPROXY_SERVE_SOCK:
return edg_wll_SetParamString(ctx,param,va_arg(ap,char *));
case EDG_WLL_PARAM_LOG_TIMEOUT:
case EDG_WLL_PARAM_LOG_SYNC_TIMEOUT:
p_string = va_arg(ap, char **);
*p_string = estrdup(ctx->p_cert_filename);
break;
+ case EDG_WLL_PARAM_LBPROXY_STORE_SOCK:
+ p_string = va_arg(ap, char **);
+ *p_string = estrdup(ctx->p_lbproxy_store_sock);
+ break;
+ case EDG_WLL_PARAM_LBPROXY_SERVE_SOCK:
+ p_string = va_arg(ap, char **);
+ *p_string = estrdup(ctx->p_lbproxy_serve_sock);
+ break;
case EDG_WLL_PARAM_LOG_TIMEOUT:
p_tv = va_arg(ap,struct timeval *);