added possibility to feed JPIS data through interlogger
authorMichal Voců <michal@ruk.cuni.cz>
Tue, 19 Aug 2008 12:40:09 +0000 (12:40 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Tue, 19 Aug 2008 12:40:09 +0000 (12:40 +0000)
org.glite.jp.primary/src/bones_server.c
org.glite.jp.primary/src/is_client.c

index 77544c2..9c50358 100644 (file)
@@ -36,6 +36,8 @@ static struct glite_srvbones_service stab = {
 
 static time_t cert_mtime;
 char *server_cert, *server_key, *cadir;
+char file_prefix[PATH_MAX] = "/var/glite/log/dglogd.log";
+char *il_sock = NULL;
 edg_wll_GssCred mycred = NULL;
 static char *mysubj;
 
@@ -72,7 +74,7 @@ int main(int argc, char *argv[])
 
        b_argc = p_argc = 1;
 
-       while ((opt = getopt(argc,argv,"B:P:a:p:s:dl:i:c:k:n")) != EOF) switch (opt) {
+       while ((opt = getopt(argc,argv,"B:P:a:p:s:dl:i:c:k:nf:w:")) != EOF) switch (opt) {
                case 'B':
                        assert(b_argc < 20);
                        if (com = strchr(optarg,',')) *com = 0;
@@ -108,6 +110,8 @@ int main(int argc, char *argv[])
                case 'c': server_cert = optarg; break;
                case 'k': server_key = optarg; break;
                case 'n': ctx->noauth = 1; break;
+               case 'f': strncpy(file_prefix, optarg, PATH_MAX); file_prefix[PATH_MAX-1] = 0; break;
+               case 'w': il_sock = strdup(optarg); break;
                case '?': fprintf(stderr,"usage: %s: -Bb,val ... -Pplugin.so ...\n"
                                          "b is backend option\n",argv[0]);
                          exit (1);
index 9d93345..41d064c 100644 (file)
@@ -4,6 +4,10 @@
 #include <string.h>
 #include <errno.h>
 #include <assert.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/stat.h>
 
 #undef SOAP_FMAC1
 #define SOAP_FMAC1 static
 #include "glite/jp/ws_fault.c"
 #include "soap_util.c"
 
+
 #define MAX_RETRY      10
 #define RETRY_SLEEP    2
 
 extern char *server_key, *server_cert; /* XXX */
+extern char file_prefix[];
+extern char *il_sock;
+
+struct client_data {
+       char *host;
+       int port;
+       long offset;
+};
+
+/*
+static int fconnect(struct soap *soap, const char *endpoint, const char *host, int port){
+       int s, len;
+       const char *dest = "/tmp/il_sock";
+       struct sockaddr_un remote;
+
+        if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
+               return SOAP_ERR; 
+
+        remote.sun_family = AF_UNIX;
+        strcpy(remote.sun_path, dest);
+        len = strlen(remote.sun_path) + sizeof(remote.sun_family);
+        if (connect(s, (struct sockaddr *)&remote, len) < 0)
+               return SOAP_ERR; 
+
+       soap->socket = s;
+
+       return SOAP_OK;
+}
+*/
+
+void notify_il_sock(struct soap *soap) {
+       struct client_data *data = soap->user;
+       int s, len;
+       struct sockaddr_un remote;
+       char *buf;
+
+       if(il_sock == NULL) return;
+       if(data == NULL) return;
+
+        if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
+               return; 
+
+        remote.sun_family = AF_UNIX;
+        strcpy(remote.sun_path, il_sock);
+        len = strlen(remote.sun_path) + sizeof(remote.sun_family);
+        if (connect(s, (struct sockaddr *)&remote, len) < 0) {
+               close(s);
+               return; 
+       }
+
+       asprintf(&buf, "POST / HTTP/1.1\r\nHost: %s:%d\r\nContent-Length: 1\r\n\r\n",
+                data->host, data->port);
+       if(buf) {
+               /* fire and forget */
+               send(s, buf, strlen(buf) + 1, MSG_NOSIGNAL | MSG_DONTWAIT);
+               free(buf);
+       }
+       free(remote.sun_path);
+       close(s);
+       return;
+}
+
+
+int myopen(struct soap *soap, const char *endpoint, const char *host, int port)
+{
+       char filename[PATH_MAX];
+       FILE *outfile;
+       int i, filedesc;
+       struct client_data *data;
+
+       data = malloc(sizeof(*data));
+       if(data == NULL)
+               return ENOMEM;
+
+       /* XXX: should it be strdup'ed? */
+       data->host = host;
+       data->port = port;
+
+       snprintf(filename, PATH_MAX-1, "%s.%s:%d", file_prefix, host, port);
+       filename[PATH_MAX - 1] = 0;     
+
+try_again:
+       if((outfile = fopen(filename, "a")) == NULL) {
+               goto cleanup;
+       }
+       if((filedesc = fileno(outfile)) < 0) {
+               goto cleanup;
+       }
+
+       for(i = 0; i < 5; i++) {
+               struct flock filelock;
+               int filelock_status;
+               struct stat statbuf;
+
+               filelock.l_type = F_WRLCK;
+               filelock.l_whence = SEEK_SET;
+               filelock.l_start = 0;
+               filelock.l_len = 0;
+
+               if((filelock_status=fcntl(filedesc, F_SETLK, &filelock)) < 0) {
+                       switch(errno) {
+                       case EAGAIN:
+                       case EACCES:
+                       case EINTR:
+                               if((i+1) < 5) sleep(1);
+                               break;
+                       default:
+                               goto cleanup;
+                       }
+               } else {
+                       if(stat(filename, &statbuf)) {
+                               if(errno == ENOENT) {
+                                       fclose(outfile);
+                                       goto try_again;
+                               } else {
+                                       goto cleanup;
+                               }
+                       } else {
+                               /* success */
+                               break;
+                       }
+               }
+       }
+
+       if(i == 5) {
+               errno = ETIMEDOUT;
+               goto cleanup;
+       }
+
+       data->offset = ftell(outfile);
+       soap->user = data;
+       soap->sendfd = filedesc;
+       return filedesc;
+
+cleanup:
+       filedesc = errno;
+       if(outfile) fclose(outfile);
+       return filedesc;
+}
+
+
+int myclose(struct soap *soap)
+{
+       if(soap->sendfd > 2) {
+               write(soap->sendfd, "\n", 1);
+               close(soap->sendfd);
+               soap->sendfd = -1;
+               /* send message to IL on socket */
+               notify_il_sock(soap);
+               if(soap->user) {
+                       free((struct client_data*)soap->user);
+                       soap->user = NULL;
+               }
+       }
+       return SOAP_OK;
+}
+
+int mysend(struct soap *soap, const char *s, size_t n)
+{
+       int ret;
+
+       ret = write(soap->sendfd, s, n);
+       if(ret < 0) 
+               return ret;
+       return SOAP_OK;
+}
+
+size_t myrecv(struct soap *soap, char *s, size_t n)
+{
+       const char response[] = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+               "<SOAP-ENV:Envelope"
+               " xmlns:SOAP-ENV=\"http://schemas.xmlsoap.org/soap/envelope/\""
+               " xmlns:SOAP-ENC=\"http://schemas.xmlsoap.org/soap/encoding/\""
+               " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\""
+               " xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\""
+               " xmlns:ns3=\"http://glite.org/wsdl/types/jp\""
+               " xmlns:ns1=\"http://glite.org/wsdl/services/jp\""
+               " xmlns:ns2=\"http://glite.org/wsdl/elements/jp\">"
+               " <SOAP-ENV:Body>"
+               "  <ns2:UpdateJobsResponse>"
+               "  </ns2:UpdateJobsResponse>"
+               " </SOAP-ENV:Body>"
+               "</SOAP-ENV:Envelope>";
+
+       int len;
+
+       if(soap->user == NULL)
+               soap->user = response;
+
+       len = sizeof(response) - ((char*)soap->user - response);
+       if(n < len) len = n;
+       strncpy(s, (char*)soap->user, len);
+       soap->user += len;
+       return len;
+}
 
 static int check_other_soap(glite_jp_context_t ctx)
 {
@@ -55,8 +255,17 @@ static int check_other_soap(glite_jp_context_t ctx)
                soap_set_namespaces(ctx->other_soap,jpis__namespaces);
                soap_set_omode(ctx->other_soap, SOAP_IO_BUFFER);        // set buffered response
                                                                        // buffer set to SOAP_BUFLEN (default = 8k)
-               soap_register_plugin_arg(ctx->other_soap,glite_gsplugin,plugin_ctx);
-               ctx->other_soap->user = ctx;
+               if(il_sock == NULL) {
+                       /* send to JPIS directly */
+                       soap_register_plugin_arg(ctx->other_soap,glite_gsplugin,plugin_ctx);
+                       ctx->other_soap->user = ctx;
+               } else {
+                       /* this makes the SOAP client send all traffic through local socket to IL */
+                       // ctx->other_soap->fconnect = fconnect;
+                       ctx->other_soap->fopen = myopen;
+                       ctx->other_soap->fclose = myclose;
+                       ctx->other_soap->fsend = mysend;
+               }
        }
        return ret;
 }