This commit was manufactured by cvs2svn to create tag 'glite-lb_R_1_10_1_1'. glite-lb_R_1_10_1_1
authorcvs2svn <admin@example.com>
Fri, 20 Aug 2010 08:32:20 +0000 (08:32 +0000)
committercvs2svn <admin@example.com>
Fri, 20 Aug 2010 08:32:20 +0000 (08:32 +0000)
Sprout from branch_RC31_3 2010-08-20 08:32:18 UTC Zdeněk Šustr <sustr4@cesnet.cz> 'New revision'
Cherrypick from master 2009-04-08 08:15:08 UTC Aleš Křenek <ljocha@ics.muni.cz> 'The most recent version copied. Do not modify this instance (RW in ./org.glite.lb).':
    org.glite.lb.logger/configure
    org.glite.lb.logger/project/package.description
    org.glite.lb.logger/project/package.summary
    org.glite.lb.logger/src-nt/Connection.cpp
    org.glite.lb.logger/src-nt/EventManager.cpp
    org.glite.lb.logger/src-nt/InputChannel.H
    org.glite.lb.logger/src-nt/InputChannel.cpp
    org.glite.lb.logger/src-nt/Message.H
    org.glite.lb.logger/src-nt/MessageStore.H
    org.glite.lb.logger/src-nt/MessageStore.cpp
    org.glite.lb.logger/src-nt/Properties.H
    org.glite.lb.logger/src-nt/Transport.cpp
    org.glite.lb.logger/src/event_store_http.c
    org.glite.lb.logger/src/http.c
    org.glite.lb.logger/src/input_queue_socket_http.c
    org.glite.lb.logger/src/queue_mgr_http.c
    org.glite.lb.logger/src/send_event_http.c
    org.glite.lb.logger/src/server_msg_http.c

18 files changed:
org.glite.lb.logger/configure [new file with mode: 0755]
org.glite.lb.logger/project/package.description [new file with mode: 0644]
org.glite.lb.logger/project/package.summary [new file with mode: 0644]
org.glite.lb.logger/src-nt/Connection.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/EventManager.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/InputChannel.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/InputChannel.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/Message.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/MessageStore.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/MessageStore.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/Properties.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/Transport.cpp [new file with mode: 0644]
org.glite.lb.logger/src/event_store_http.c [new file with mode: 0644]
org.glite.lb.logger/src/http.c [new file with mode: 0644]
org.glite.lb.logger/src/input_queue_socket_http.c [new file with mode: 0644]
org.glite.lb.logger/src/queue_mgr_http.c [new file with mode: 0644]
org.glite.lb.logger/src/send_event_http.c [new file with mode: 0644]
org.glite.lb.logger/src/server_msg_http.c [new file with mode: 0644]

diff --git a/org.glite.lb.logger/configure b/org.glite.lb.logger/configure
new file mode 100755 (executable)
index 0000000..c289773
--- /dev/null
@@ -0,0 +1,691 @@
+#!/usr/bin/perl
+
+# WARNING: Don't edit this file unless it is the master copy in org.glite.lb
+#
+# For the purpose of standalone builds of lb/jobid/lbjp-common components
+# it is copied on tagging 
+
+# $Header$
+
+use Getopt::Long;
+
+my $pwd = `pwd`; chomp $pwd;
+my $prefix = $pwd.'/stage';
+my $stagedir;
+my $staged;
+my $module;
+my $thrflavour = 'gcc64dbgpthr';
+my $nothrflavour = 'gcc64dbg';
+my $mode = 'build';
+my $help = 0;
+my $listmodules;
+my $version;
+my $output;
+my $lb_tag = '';
+my $lbjp_tag = '';
+my $jp_tag = '';
+my $sec_tag = '';
+my $jobid_tag = '';
+
+my @nodes = qw/client server logger utils client-java doc ws-test db jpprimary jpindex jpclient/;
+my %enable_nodes;
+my %disable_nodes;
+
+my %extern_prefix = (
+       cares => '/opt/c-ares',
+       classads => '/opt/classads',
+       cppunit => '/usr',
+       expat => '/usr',
+       globus => '/opt/globus',
+       gsoap => '/usr',
+       mysql => '/usr',
+       'mysql-devel' => '',
+       voms => '/opt/glite',
+       gridsite => '/opt/glite',
+       lcas => '/opt/glite',
+       ant => '/usr',
+       jdk => '/usr',
+       libtar => '/usr',
+);
+
+my %jar = (
+       'commons-codec' => '/usr/share/java/commons-codec-1.3.jar',
+);
+
+
+my %glite_prefix;
+my %need_externs;
+my %need_externs_type;
+my %need_jars;
+my %extrafull;
+my %extranodmod;
+my %deps;
+my %deps_type;
+my %topbuild;
+
+my %lbmodules = (
+       'lb' => [ qw/client client-java common doc logger server state-machine types utils ws-interface ws-test/], 
+       'security' => [qw/gss gsoap-plugin/],
+       'lbjp-common' => [qw/db maildir server-bones trio jp-interface/],
+       'jobid' => [qw/api-c api-cpp api-java/],
+       'jp' => [ qw/client doc index primary server-common ws-interface/ ],
+       );
+
+
+my @opts = (
+       'prefix=s' => \$prefix,
+       'staged=s' => \$staged,
+       'module=s' => \$module,
+       'thrflavour=s' => \$thrflavour,
+       'nothrflavour=s' => \$nothrflavour,
+       'mode=s' => \$mode,
+       'listmodules=s' => \$listmodules,
+       'version=s' => \$version,
+       'output=s' => \$output,
+       'stage=s' => \$stagedir,
+       'lb-tag=s' => \$lb_tag,
+       'lbjp-common-tag=s' => \$lbjp_tag,
+       'jp-tag=s' => \$jp_tag,
+       'security-tag=s' => \$sec_tag,
+       'jobid-tag=s' => \$jobid_tag,
+       'help' => \$help,
+);
+
+for (@nodes) {
+       $enable_nodes{$_} = 0;
+       $disable_nodes{$_} = 0;
+       
+       push @opts,"disable-$_",\$disable_nodes{$_};
+       push @opts,"enable-$_",\$enable_nodes{$_};
+}
+
+push @opts,"with-$_=s",\$extern_prefix{$_} for keys %extern_prefix;
+push @opts,"with-$_=s",\$jar{$_} for keys %jar;
+
+my @keeparg = @ARGV;
+
+GetOptions @opts or die "Errors parsing command line\n";
+
+$extern_prefix{'mysql-devel'}=$extern_prefix{mysql} if $extern_prefix{'mysql-devel'} eq '';
+
+if ($help) { usage(); exit 0; }
+
+if ($listmodules) {
+       my @m = map "org.glite.$listmodules.$_",@{$lbmodules{$listmodules}};
+       print "@m\n";
+       exit 0;
+}
+
+warn "$0: --version and --output make sense only in --mode=etics\n"
+       if ($version || $output) && $mode ne 'etics';
+
+my $en;
+for (keys %enable_nodes) { $en = 1 if $enable_nodes{$_}; }
+
+my $dis;
+for (keys %disable_nodes) { $dis = 1 if $disable_nodes{$_}; }
+
+die "--enable-* and --disable-* are mutually exclusive\n"
+       if $en && $dis;
+
+die "--module cannot be used with --enable-* or --disable-*\n"
+       if $module && ($en || $dis);
+
+die "$module: unknown module\n" if $module && ! grep $module,@{$lbmodules{lb}},@{$lbmodules{security}},{$lbmodules{jp}};
+
+if ($dis) {
+       for (@nodes) {
+               $enable_nodes{$_} = 1 unless $disable_nodes{$_};
+       }
+}
+
+if (!$en && !$dis) { $enable_nodes{$_} = 1 for (@nodes) } ;
+
+for (keys %enable_nodes) { delete $enable_nodes{$_} unless $enable_nodes{$_}; }
+
+$stagedir = $prefix unless $stagedir;
+
+if ($mode eq 'build') {
+       print "Writing config.status\n";
+       open CONF,">config.status" or die "config.status: $!\n";
+       print CONF "$0 @keeparg\n";
+       close CONF;
+}
+
+
+my @modules;
+my %aux;
+
+if ($module) {
+#      push @modules,split(/[,.]+/,$module);
+       push @modules,$module;
+}
+else {
+       @modules = map(($extranodmod{$_} ? $extranodmod{$_} : 'lb.'.$_),(keys %enable_nodes));
+       
+       my $n;
+
+       do {
+               local $"="\n";
+               $n = $#modules;
+               push @modules,(map @{$deps{$_}},@modules);
+
+               undef %aux; @aux{@modules} = (1) x ($#modules+1);
+               @modules = keys %aux;
+       } while ($#modules > $n);
+}
+
+@aux{@modules} = (1) x ($#modules+1);
+delete $aux{$_} for (split /,/,$staged);
+@modules = keys %aux;
+
+mode_build() if $mode eq 'build';
+mode_checkout() if $mode eq 'checkout';
+mode_etics($module) if $mode eq 'etics';
+
+sub mode_build {
+       print "\nBuilding modules: @modules\n";
+       
+       my @ext = map @{$need_externs{$_}},@modules;
+       my @myjars = map @{$need_jars{$_}},@modules;
+       undef %aux; @aux{@ext} = 1;
+       @ext = keys %aux;
+       undef %aux; @aux{@myjars} = (1) x ($#myjars+1);
+       @myjars = keys %aux;
+       
+       print "\nRequired externals:\n";
+       print "\t$_: $extern_prefix{$_}\n" for @ext;
+       print "\t$_: $jar{$_}\n" for @myjars;
+       print "\nThis is a poor-man configure, it's up to you to have sources and externals there\n\n";
+       
+       mkinc($_) for @modules;
+       
+       print "Creating Makefile\n";
+       
+       open MAK,">Makefile" or die "Makefile: $!\n";
+       
+       print MAK "all: @modules\n\nclean:\n";
+       
+       for (@modules) {
+               my $full = full($_);
+               my $build = $topbuild{$_} ? '': '/build';
+               print MAK "\tcd $full$build && \${MAKE} clean\n"
+       }
+       
+       print MAK "\ndistclean:\n";
+       
+       for (@modules) {
+               my $full = full($_);
+               print MAK $topbuild{$_} ?
+                       "\tcd $full$build && \${MAKE} distclean\n" :
+                       "\trm -rf $full$build\n"
+       }
+       
+       print MAK "\n";
+       
+       for (@modules) {
+               my %ldeps; undef %ldeps;  
+               @ldeps{@{$deps{$_}}} = 1;
+               for my $x (split /,/,$staged) { delete $ldeps{$x}; }
+               my @dnames = $module ? () : keys %ldeps;
+       
+               my $full = full($_);
+               my $build = $topbuild{$_} ? '': '/build';
+       
+               print MAK "$_: @dnames\n\tcd $full$build && \${MAKE} && \${MAKE} install\n\n";
+       }
+       
+       close MAK;
+}
+       
+sub mode_checkout() {
+       for (@modules) {
+               my $module = $_;
+               my $tag = "";
+               if ($lb_tag){
+                       for (@{$lbmodules{lb}}){
+                               if ("lb.".$_ eq $module){
+                                       $tag = '-r '.$lb_tag;
+                               }
+                       }       
+               }
+               if ($lbjp_tag){
+                       for (@{$lbmodules{'lbjp-common'}}){
+                               if ("lbjp-common.".$_ eq $module){
+                                        $tag = '-r '.$lbjp_tag;
+                                }
+                       }
+               }
+               if ($jp_tag){
+                       for (@{$lbmodules{'jp'}}){
+                               if ("jp.".$_ eq $module){
+                                        $tag = '-r '.$jp_tag;
+                               }
+                        }
+               }
+               if ($sec_tag){
+                       for (@{$lbmodules{security}}){
+                               if ("security.".$_ eq $module){
+                                        $tag = '-r '.$sec_tag;
+                                }
+                       }
+               }
+               if ($jobid_tag){
+                       for (@{$lbmodules{jobid}}){
+                               if ("jobid.".$_ eq $module){
+                                        $tag = '-r '.$jobid_tag;
+                                }
+                       }
+               }
+               #if (grep {"lb.".$_ eq $module} @{$lbmodules{lb}}){
+               #       print "found";
+               #}
+               $_ = full($_);
+               print "\n*** Checking out $_\n";
+               system("cvs checkout  $tag $_") == 0 or die "cvs checkout $tag $_: $?\n";
+       }
+}
+
+BEGIN{
+%need_externs_aux = (
+       'lb.client' => [ qw/cppunit:B classads/ ],
+       'lb.client-java' => [ qw/ant:B/ ],
+       'lb.common' => [ qw/expat cppunit:B classads/ ],
+       'lb.doc' => [],
+       'lb.logger' => [ qw/cppunit:B/ ],
+       'lb.server' => [ qw/globus_essentials:R globus:B expat cares mysql cppunit:B gsoap:B classads voms lcas gridsite/ ],
+       'lb.state-machine' => [ qw/classads/ ],
+       'lb.utils' => [ qw/cppunit:B/ ],
+       'lb.ws-interface' => [],
+       'lb.ws-test' => [ qw/gsoap:B/ ],
+       'lb.types' => [ qw// ],
+       'lbjp-common.db' => [ qw/mysql:R mysql-devel:B/ ],
+       'lbjp-common.maildir' => [ qw// ],
+       'lbjp-common.server-bones' => [ qw// ],
+       'lbjp-common.trio' => [ qw/cppunit:B/ ],
+       'lbjp-common.jp-interface' => [ qw/cppunit:B/ ],
+       'security.gss' =>  [ qw/globus_essentials:R globus:B cares cppunit:B/ ],
+       'security.gsoap-plugin' =>  [ qw/cppunit:B globus_essentials:R globus:B cares gsoap:B/ ],
+       'jobid.api-c' =>  [ qw/cppunit:B/ ],
+       'jobid.api-cpp' =>  [ qw/cppunit:B/ ],
+       'jobid.api-java' =>  [ qw/ant:B jdk:B/ ],
+       'jp.client' => [ qw/gsoap libtar globus_essentials:R globus:B/ ],
+        'jp.doc' => [],
+        'jp.index' => [ qw/gsoap globus_essentials:R globus:B/ ],
+        'jp.primary' => [ qw/classads gsoap libtar globus_essentials:R globus:B/ ],
+        'jp.server-common' => [],
+        'jp.ws-interface' => [],
+);
+
+for my $ext (keys %need_externs_aux) {
+       for (@{$need_externs_aux{$ext}}) {
+               /([^:]*)(?::(.*))?/;
+               push @{$need_externs{$ext}},$1;
+               my $type = $2 ? $2 : 'BR';
+               $need_externs_type{$ext}->{$1} = $type;
+       }
+}
+
+%need_jars = (
+       'jobid.api-java' => [ qw/commons-codec/ ],
+);
+
+for my $jar (keys %need_jars) {
+       for (@{$need_jars{$jar}}) {
+               $need_externs_type{$jar}->{$_} = 'BR';  # XXX
+       }
+}
+
+%deps_aux = (
+       'lb.client' => [ qw/
+               lb.types:B lb.common
+               lbjp-common.trio
+               jobid.api-cpp jobid.api-c
+               security.gss
+       / ],
+       'lb.client-java' => [ qw/
+               lb.types:B
+               jobid.api-java
+       / ],
+       'lb.common' => [ qw/
+               jobid.api-cpp jobid.api-c
+               lb.types:B lbjp-common.trio security.gss
+       / ],
+       'lb.doc' => [ qw/lb.types:B/ ],
+       'lb.logger' => [ qw/
+               lbjp-common.trio
+               jobid.api-c
+               lb.common
+               security.gss
+       / ],
+       'lb.server' => [ qw/
+               lb.ws-interface lb.types:B lb.common lb.state-machine
+               lbjp-common.db lbjp-common.server-bones lbjp-common.trio lbjp-common.maildir
+               jobid.api-c
+               security.gsoap-plugin security.gss
+       / ],
+       'lb.state-machine' => [ qw/lb.common lbjp-common.jp-interface security.gss/ ],
+       'lb.utils' => [ qw/
+               lbjp-common.jp-interface
+               jobid.api-c
+               lbjp-common.trio lbjp-common.maildir
+               lb.client lb.state-machine
+       / ],
+       'lb.ws-test' => [ qw/security.gsoap-plugin lb.ws-interface/ ],
+       'lb.ws-interface' => [ qw/lb.types:B/ ],
+       'lb.types' => [ qw// ],
+       'lbjp-common.db' => [ qw/lbjp-common.trio/ ],
+       'lbjp-common.maildir' => [ qw// ],
+       'lbjp-common.server-bones' => [ qw// ],
+       'lbjp-common.trio' => [ qw// ],
+       'security.gss' =>  [ qw// ],
+       'security.gsoap-plugin' =>  [ qw/security.gss/ ],
+       'jobid.api-c' =>  [ qw// ],
+       'jobid.api-cpp' =>  [ qw/jobid.api-c/ ],
+       'jobid.api-java' =>  [ qw// ],
+
+       'lbjp-common.jp-interface' => [ qw/lbjp-common.db jobid.api-c/ ],
+
+       'jp.client' => [ qw/
+                jp.ws-interface
+                lbjp-common.jp-interface lbjp-common.maildir
+                jobid.api-c
+                security.gsoap-plugin
+        / ],
+       'jp.doc' => [ qw// ],
+       'jp.index' => [ qw/
+                jp.server-common jp.ws-interface
+                lbjp-common.jp-interface lbjp-common.trio lbjp-common.db lbjp-common.server-bones
+                security.gsoap-plugin
+        / ],
+       'jp.primary' => [ qw/
+                jobid.api-c
+                jp.server-common jp.ws-interface
+                lb.state-machine
+                lbjp-common.jp-interface lbjp-common.trio lbjp-common.db lbjp-common.server-bones
+                security.gsoap-plugin
+        / ],
+       'jp.server-common' => [ qw/ 
+                lbjp-common.jp-interface lbjp-common.db
+        / ],
+       'jp.ws-interface' => [ qw// ],
+);
+
+for my $ext (keys %deps_aux) {
+       for (@{$deps_aux{$ext}}) {
+               /([^:]*)(?::(.*))?/;
+               push @{$deps{$ext}},$1;
+               my $type = $2 ? $2 : 'BR';
+               $deps_type{$ext}->{$1} = $type;
+       }
+}
+
+
+%extrafull = ( gridsite=>'org.gridsite.core');
+
+#( java => 'client-java' );
+%extranodmod = (
+       db => 'lbjp-common.db',
+       jpprimary => 'jp.primary',
+       jpindex => 'jp.index',
+       jpclient => 'jp.client',
+);
+
+my @t = qw/lb.client-java jobid.api-java lb.types/;
+@topbuild{@t} = (1) x ($#t+1);
+}
+
+sub full
+{
+       my $short = shift;
+       return $extrafull{$short} ? $extrafull{$short} : 'org.glite.'.$short;
+}
+
+sub mkinc
+{
+       my %aux;
+       undef %aux;
+       my @m=qw/
+lb.client lb.doc lb.state-machine lb.ws-interface lb.logger lb.types lb.common lb.server lb.utils lb.ws-test lb.client-java
+security.gss security.gsoap-plugin
+jobid.api-c jobid.api-cpp jobid.api-java
+lbjp-common.db lbjp-common.maildir lbjp-common.server-bones lbjp-common.trio lbjp-common.jp-interface
+jp.client jp.doc jp.index jp.primary jp.server-common jp.ws-interface
+/;
+       @aux{@m} = (1) x ($#m+1);
+
+       my $short = shift;
+       my $full = full $short;
+
+       unless ($aux{$short}) {
+               print "Makefile.inc not needed in $full\n";
+               return;
+       }
+
+       my $build = '';
+       
+       unless ($topbuild{$_}) {
+               $build = '/build';
+               unless (-d "$full/build") {
+                       mkdir "$full/build" or die "mkdir $full/build: $!\n";
+               }
+               unlink "$full/build/Makefile";
+               symlink "../Makefile","$full/build/Makefile" or die "symlink ../Makefile $full/build/Makefile: $!\n";
+       }
+
+       open MKINC,">$full$build/Makefile.inc"
+               or die "$full$build/Makefile.inc: $!\n";
+
+       print "Creating $full$build/Makefile.inc\n";
+
+       print MKINC qq{
+PREFIX = $prefix
+stagedir = $stagedir
+thrflavour = $thrflavour
+nothrflavour = $nothrflavour
+};
+
+       for (@{$need_externs{$short}}) {
+               print MKINC "${_}_prefix = $extern_prefix{$_}\n"
+       }
+
+       for (@{$need_jars{$short}}) {
+               print MKINC "${_}_jar = $jar{$_}\n"
+       }
+
+       my $need_gsoap = 0;
+       for (@{$need_externs{$short}})  { $need_gsoap = 1 if $_ eq 'gsoap'; }
+
+       print MKINC "gsoap_default_version=".gsoap_version()."\n"  if $need_gsoap;
+
+       close MKINC;
+}
+
+my %etics_externs;
+my %etics_projects;
+BEGIN{
+       %etics_externs = (
+               globus_essentials=>'vdt_globus_essentials',
+               globus=>'globus',
+               cares=>'c-ares',
+               voms=>'org.glite.security.voms-api-cpp',
+               gridsite=>'org.gridsite.shared',
+               lcas=>'org.glite.security.lcas',
+       );
+       %etics_projects = (
+               vdt=>[qw/globus globus_essentials/],
+               'org.glite'=>[qw/voms gridsite lcas/],
+       );
+};
+
+sub mode_etics {
+       $fmod = shift;
+
+       die "$0: --module required with --etics\n" unless $fmod;
+       
+       my ($subsys,$module) = split /\./,$fmod;
+
+       my ($major,$minor,$rev,$age);
+
+       if ($version) {
+               $version =~ /([[:digit:]]+)\.([[:digit:]]+)\.([[:digit:]]+)-(.+)/;
+               ($major,$minor,$rev,$age) = ($1,$2,$3,$4);
+       }
+       else { 
+               open V,"org.glite.$subsys.$module/project/version.properties"
+                       or die "org.glite.$subsys.$module/project/version.properties: $!\n";
+       
+               while ($_ = <V>) {
+                       chomp;
+                       ($major,$minor,$rev) = ($1,$2,$3) if /module\.version\s*=\s*([[:digit:]]+)\.([[:digit:]]+)\.([[:digit:]]+)/;
+                       $age = $1 if /module\.age\s*=\s*([[:digit:]]+)/;
+               }
+               close V;
+       }
+
+       my @copts = ();
+       my %ge;
+       @ge{@{$etics_projects{'org.glite'}}} = (1) x ($#{$etics_projects{'org.glite'}}+1);
+
+       for (@{$need_externs{"$subsys.$module"}}) {
+           if ($need_externs_type{"$subsys.$module"}->{$_}=~/B/) {
+               my $eext = $etics_externs{$_} ? $etics_externs{$_} : $_;
+               push @copts,$ge{$_} ? "--with-$_=\${stageDir}" : "--with-$_=\${$eext.location}";
+           }
+       }
+
+       for (@{$need_jars{"$subsys.$module"}}) {
+               my $eext = $etics_externs{$_} ? $etics_externs{$_} : $_;
+
+               push @copts,"--with-$_ \${$eext.location}/$_*.jar";
+       }
+
+
+       my $conf = "glite-$subsys-${module}_R_${major}_${minor}_${rev}_${age}";
+       my $file = $output ? $output : "$conf.ini";
+       open C,">$file" or die "$file: $!\n";
+
+       my $buildroot = $topbuild{"$subsys.$module"} ? '' : "build.root = build\n";
+
+       my $confdir = $topbuild{"$subsys.$module"} ? '..' : '../..';
+
+       print STDERR "Writing $file\n";
+       print C qq{
+[Configuration-$conf]
+profile = None
+moduleName = org.glite.$subsys.$module
+displayName = $conf
+description = org.glite.$subsys.$module
+projectName = org.glite
+age = $age
+deploymentType = None
+tag = $conf
+version = $major.$minor.$rev
+path = \${projectName}/\${moduleName}/\${version}/\${platformName}/\${packageName}-\${version}-\${age}.tar.gz
+
+[Platform-default:VcsCommand]
+displayName = None
+description = None
+tag = cvs -d \${vcsroot} tag -R \${tag} \${moduleName}
+branch = None
+commit = None
+checkout = cvs -d \${vcsroot} co -r \${tag} \${moduleName}
+
+[Platform-default:BuildCommand]
+postpublish = None
+packaging = None
+displayName = None
+description = None
+doc = None
+prepublish = None
+publish = None
+compile = make
+init = None
+install = make install
+clean = make clean
+test = make check
+configure = cd $confdir && \${moduleName}/configure --thrflavour=\${globus.thr.flavor} --nothrflavour=\${globus.nothr.flavor} --prefix=\${prefix} --stage=\${stageDir} --module $subsys.$module @copts
+checkstyle = None
+
+[Platform-default:Property]
+$buildroot
+
+[Platform-default:DynamicDependency]
+
+};
+       for (@{$need_externs{"$subsys.$module"}},@{$need_jars{"$subsys.$module"}}) {
+               my $eext = $etics_externs{$_} ? $etics_externs{$_} : $_;
+
+               my $proj = 'externals';
+               for my $p (keys %etics_projects) {
+                       for $m (@{$etics_projects{$p}}) {
+                               $proj = $p if $m eq $_;
+                       }
+               }
+
+               my $type = $need_externs_type{"$subsys.$module"}->{$_};
+               print C "$proj|$eext = $type\n";
+       }
+
+       for (@{$deps{"$subsys.$module"}}) {
+               my $type = $deps_type{"$subsys.$module"}->{$_};
+               print C "org.glite|org.glite.$_ = $type\n";
+       }
+
+       close C;
+}
+
+sub gsoap_version {
+       local $_;
+       my $gsoap_version;
+       open S,"$extern_prefix{gsoap}/bin/soapcpp2 -v 2>&1 |" or die "$extern_prefix{gsoap}/bin/soapcpp2: $!\n";
+
+       while ($_ = <S>) {
+               chomp;
+
+               $gsoap_version = $1 if /The gSOAP Stub and Skeleton Compiler for C and C\+\+ ([.[:digit:][:alpha:]]+)$/;
+       }
+       close S;
+       return $gsoap_version;
+}
+
+
+sub usage {
+       my @ext = keys %extern_prefix;
+       my @myjars, keys %jar;
+
+       print STDERR qq{
+usage: $0 options
+
+General options (defaults in []):
+  --prefix=PREFIX              destination directory [./stage]
+  --staged=module,module,...   what is already in PREFIX (specify without org.glite.)
+  --thrflavour=flavour
+  --nothrflavour=flavour       threaded and non-treaded flavours [gcc64dbgpthr,gcc64dbg]
+  --listmodules=subsys          list modules of a subsystem
+  
+Mode of operation:
+  --mode={checkout|build|etics}        what to do [build]
+  
+What to build:
+  --module=module              build this module only (mostly in-Etics operation)
+  --enable-NODE                        build this "node" (set of modules) only. Available nodes are
+                                       @{$lbmodules{lb}},@{$lbmodules{security}}
+  --disable-NODE               don't build this node
+  --lb-tag=tag                 checkout LB modules with specific tag
+  --jp-tag=tag                 checkout JP modules with specific tag
+  --lbjp-common-tag=tag         checkout lbjp-common modules with specific tag
+  --security-tag=tag           checkout security modules with specific tag
+  --jobid-tag=tag              checkout jobid modules with specific tag
+
+Dependencies:
+  --with-EXTERNAL=PATH         where to look for an external. Required externals
+                               (not all for all modules) are:
+                                       @ext
+  --with-JAR=JAR               where to look for jars. Required jars are:
+                                       @myjars
+                               Summary of what will be used is always printed
+
+};
+
+}
diff --git a/org.glite.lb.logger/project/package.description b/org.glite.lb.logger/project/package.description
new file mode 100644 (file)
index 0000000..cd0621b
--- /dev/null
@@ -0,0 +1 @@
+glite-lb-logger is the gLite LB local-logger and inter-logger. This package contains the local-logger (glite-lb-logd), inter-logger (glite-lb-interlogd) and notification inter-logger (glite-lb-notif-interlogd) daemons.
diff --git a/org.glite.lb.logger/project/package.summary b/org.glite.lb.logger/project/package.summary
new file mode 100644 (file)
index 0000000..089b630
--- /dev/null
@@ -0,0 +1 @@
+gLite Logging and Bookkeeping local-logger and inter-logger
diff --git a/org.glite.lb.logger/src-nt/Connection.cpp b/org.glite.lb.logger/src-nt/Connection.cpp
new file mode 100644 (file)
index 0000000..7cac943
--- /dev/null
@@ -0,0 +1,4 @@
+#include "Connection.H"
+
+Connection::Factory::~Factory() {
+}
diff --git a/org.glite.lb.logger/src-nt/EventManager.cpp b/org.glite.lb.logger/src-nt/EventManager.cpp
new file mode 100644 (file)
index 0000000..91efb12
--- /dev/null
@@ -0,0 +1,23 @@
+#include "EventManager.H"
+
+int
+EventManager::postEvent(Event* &e)
+{
+  for(std::list<EventHandler*>::iterator i = handlers.begin();
+      i != handlers.end();
+      i++) {
+    (*i)->handleEvent(e);
+  }
+  return 0;
+}
+
+void
+EventManager::addHandler(EventHandler *handler)
+{
+  handlers.push_back(handler);
+}
+
+void
+EventManager::removeHandler(EventHandler *handler)
+{
+}
diff --git a/org.glite.lb.logger/src-nt/InputChannel.H b/org.glite.lb.logger/src-nt/InputChannel.H
new file mode 100644 (file)
index 0000000..2bac262
--- /dev/null
@@ -0,0 +1,29 @@
+#ifndef _INPUT_CHANNEL_H_
+#define _INPUT_CHANNEL_H_
+
+#include "ThreadPool.H"
+#include "Connection.H"
+#include "Transport.H"
+
+class InputChannel 
+       : public ThreadPool::WorkDescription {
+public:
+       
+       InputChannel(Connection *conn, Transport *trans)
+               :  ThreadPool::WorkDescription(conn->getFD()),
+                  m_connection(conn), m_transport(trans)
+               {}
+
+       void start();
+
+protected:
+       virtual void onReady();
+       virtual void onTimeout();
+       virtual void onError();
+
+private:
+       Connection *m_connection;
+       Transport  *m_transport;
+};
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/InputChannel.cpp b/org.glite.lb.logger/src-nt/InputChannel.cpp
new file mode 100644 (file)
index 0000000..e3ac9c8
--- /dev/null
@@ -0,0 +1,37 @@
+#include "InputChannel.H"
+#include "ThreadPool.H"
+#include "EventManager.H"
+
+extern EventManager theEventManager;
+
+void
+InputChannel::start()
+{
+       ThreadPool::instance()->queueWorkRead(this);
+}
+
+void
+InputChannel::onReady()
+{
+       Transport::Message *msg = NULL;
+       int ret = m_transport->receive(m_connection, msg);
+       if(ret <= 0) {
+               // no new data read
+       } else if(msg) {
+               // we have a new message
+               
+       } else {
+               // still need more data
+               ThreadPool::instance()->queueWorkRead(this);
+       }
+}
+
+void
+InputChannel::onTimeout()
+{
+}
+
+void
+InputChannel::onError()
+{
+}
diff --git a/org.glite.lb.logger/src-nt/Message.H b/org.glite.lb.logger/src-nt/Message.H
new file mode 100644 (file)
index 0000000..725966a
--- /dev/null
@@ -0,0 +1,66 @@
+#ifndef _MESSAGE_H_
+#define _MESSAGE_H
+
+#include "Properties.H"
+#include "MessageStore.H"
+
+#include <string>
+
+class Message: public MessageStore::Storable {
+public:
+       
+       /** class that holds message state
+        *
+        */
+       class State : public MessageStore::Storable {
+       public:
+
+               /** Get size needed for storage (from Storable).
+                */
+               virtual int getStorageSize() const;
+
+               /** Save State (from Storable)
+                */
+               virtual int save(void* data, int len) const;
+
+               /** Load State (from Storable)
+                */
+               virtual int load(void* data, int len);
+       };
+
+
+       Message();
+
+       Message(void * data, unsigned int length) 
+               : m_length(length), 
+                 m_data(data)
+               {}
+
+
+       int getContent(void* &data) const
+               { data = m_data; return m_length; }
+
+       int getContentLength() const 
+               { return m_length; }
+
+       std::string getProperty(const std::string &name, std::string &val)
+               { return m_properties.getProperty(name); }
+       
+       void setProperty(const std::string &name, std::string &val) 
+               { m_properties.setProperty(name, val); }
+       
+       Properties& getProperties() 
+               { return m_properties; }
+       
+       void setProperties(Properties &)
+               {}
+
+private:
+       MessageStore::ID        m_id;
+       unsigned int            m_length;
+       void *                  m_data;
+       Properties              m_properties;
+};
+
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/MessageStore.H b/org.glite.lb.logger/src-nt/MessageStore.H
new file mode 100644 (file)
index 0000000..ff03a9b
--- /dev/null
@@ -0,0 +1,84 @@
+#ifndef _MESSAGE_STORE_H_
+#define _MESSAGE_STORE_H_
+
+#include <pthread.h>
+
+/** Permanent storage for messages and their states.
+ */
+
+class MessageStore {
+public:
+
+       /** Base class for everything that can be stored here.
+        */
+       class Storable {
+       public:
+               /** Get size needed for object storage.
+                */
+               virtual int getStorageSize() const = 0;
+               
+               /** Save state of object into binary data.
+                */
+               virtual int save(void* data, int len) const = 0;
+
+               /** Load state of object from binary data.
+                */
+               virtual int load(void* data, int len) = 0;
+
+               virtual ~Storable() {}
+       };
+
+
+       /** Class that uniquely identifies stored content.
+        */
+       class ID: public Storable {
+       public:
+               /** Default constructor.
+                *
+                * Creates new unique ID.
+                */
+               ID();
+
+               /** Copy constructor.
+                */
+               ID(const ID& src);
+
+               /** Destructor.
+                */
+               ~ID() {};
+
+               /** Assignment operator.
+                */
+               ID& operator=(const ID& src);
+
+               /** Return the string suitable for printing.
+                */
+               std::string toString() const;
+
+               /** Comparison operator
+                */
+               int operator==(const ID& second);
+
+               /** Get size needed for storage (from Storable).
+                */
+               virtual int getStorageSize() const;
+
+               /** Save ID (from Storable)
+                */
+               virtual int save(void* data, int len) const;
+
+               /** Load ID (from Storable)
+                */
+               virtual int load(void* data, int len);
+
+       protected:
+               unsigned long long getID() {return id;}
+
+       private:
+               static pthread_mutex_t counterLock;
+               static unsigned counter;
+               unsigned long long id;
+       };
+};
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/MessageStore.cpp b/org.glite.lb.logger/src-nt/MessageStore.cpp
new file mode 100644 (file)
index 0000000..eb9de7a
--- /dev/null
@@ -0,0 +1,24 @@
+#include <pthread.h>
+#include <sys/time.h>
+#include <sstream>
+
+#include "MessageStore.H"
+
+pthread_mutex_t MessageStore::ID::counterLock = PTHREAD_MUTEX_INITIALIZER;
+unsigned MessageStore::ID::counter = 0;
+
+MessageStore::ID::ID(){
+       time_t t;
+       time(&t);
+       pthread_mutex_lock(&counterLock);
+       counter++;
+       id = ((unsigned long long) counter << 32) + t;
+       pthread_mutex_unlock(&counterLock);
+}
+
+std::string MessageStore::ID::toString() const{
+       std::ostringstream oss;
+       oss << id;
+       return oss.str();
+}
+
diff --git a/org.glite.lb.logger/src-nt/Properties.H b/org.glite.lb.logger/src-nt/Properties.H
new file mode 100644 (file)
index 0000000..77d216d
--- /dev/null
@@ -0,0 +1,36 @@
+#ifndef _PROPERTIES_H_
+#define _PROPERTIES_H_
+
+#include <map>
+#include <string>
+
+class Properties {
+public:
+
+       // default constructor
+       Properties() 
+               : properties()
+               {}
+
+       // accessors
+       std::string& getProperty(const std::string &key) 
+               { return properties[key]; }
+
+       void setProperty(const std::string &key, std::string &val) 
+               { properties[key] = val; }
+
+       // iterators
+       typedef std::map<std::string,std::string>::iterator  iterator;
+
+       iterator begin() 
+               { return properties.begin(); }
+
+       iterator end()
+               { return properties.end(); }
+
+       
+private:
+       std::map<std::string,std::string> properties;
+};
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/Transport.cpp b/org.glite.lb.logger/src-nt/Transport.cpp
new file mode 100644 (file)
index 0000000..2544997
--- /dev/null
@@ -0,0 +1,5 @@
+#include "Transport.H"
+
+Transport::~Transport()
+{
+}
diff --git a/org.glite.lb.logger/src/event_store_http.c b/org.glite.lb.logger/src/event_store_http.c
new file mode 100644 (file)
index 0000000..ebd5523
--- /dev/null
@@ -0,0 +1,1113 @@
+#ident "$Header$"
+
+#include <assert.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <fcntl.h>
+#include <sys/param.h>
+
+#include "glite/lb/events_parse.h"
+
+#include "interlogd.h"
+
+#ifdef __GNUC__
+#define UNUSED_VAR __attribute__((unused))
+#else
+#define UNUSED_VAR
+#endif
+
+static char *file_prefix = NULL;
+
+
+struct event_store_list {
+       struct event_store *es;
+       struct event_store_list *next;
+};
+
+
+static struct event_store_list *store_list;
+static pthread_rwlock_t store_list_lock = PTHREAD_RWLOCK_INITIALIZER;
+
+
+/* ----------------
+ * helper functions
+ * ----------------
+ */
+static
+char *
+jobid2eventfile(const char *job_id_s)
+{
+  char *buffer;
+
+  if(job_id_s) {
+    asprintf(&buffer, "%s.%s", file_prefix, job_id_s);
+  } else 
+    asprintf(&buffer, "%s.default", file_prefix);
+    
+  return(buffer);
+}
+
+
+static
+char *
+jobid2controlfile(char *job_id_s)
+{
+  char *buffer;
+  char *hash;
+
+  if(job_id_s) {
+    asprintf(&buffer, "%s.%s.ctl", file_prefix, job_id_s);
+  } else 
+    asprintf(&buffer, "%s.default.ctl", file_prefix);
+    
+  return(buffer);
+}
+
+static
+int
+file_reader(void *user_data, char *buffer, const int len)
+{
+       size_t ret = 0;
+       
+       if(len > 0) {
+               ret = fread(buffer, 1, len, (FILE*)user_data);
+               if(ret == 0 && ferror((FILE*)user_data)) {
+                       return -1;
+               } 
+       }
+       return ret;
+}
+
+
+static
+int
+read_event_string(FILE *file, il_http_message_t *msg)
+{
+       int  len, ret;
+       int fd = fileno(file);
+       long start;
+
+       /* remember the start position */
+       start = ftell(file);
+       ret = receive_http(file, file_reader, msg);
+       if(ret < 0) return ret;
+       /* seek at the end of message in case the reader read ahead */
+       len = fseek(file, start + msg->len, SEEK_SET);
+       len = fgetc(file);
+       if(len != '\n') {
+               il_log(LOG_ERR, "error reading event from file, missing terminator character at %d, found %c(%d))\n", 
+                      start+msg->len, len, len);
+               if(msg->data) { free(msg->data); msg->data = NULL; }
+               if(msg->host) { free(msg->host); msg->host = NULL; }
+               return EINVAL;
+       }
+       return ret;
+}
+
+
+
+/* ------------------------------
+ * event_store 'member' functions
+ * ------------------------------
+ */
+static
+int
+event_store_free(struct event_store *es)
+{
+  assert(es != NULL);
+
+  if(es->job_id_s) free(es->job_id_s);
+  if(es->event_file_name) free(es->event_file_name);
+  if(es->control_file_name) free(es->control_file_name);
+  pthread_rwlock_destroy(&es->use_lock);
+  pthread_rwlock_destroy(&es->commit_lock);
+  free(es);
+
+  return(0);
+}
+
+
+static
+struct event_store *
+event_store_create(char *job_id_s)
+{
+  struct event_store *es;
+
+  es = malloc(sizeof(*es));
+  if(es == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "event_store_create: error allocating room for structure");
+    return(NULL);
+  }
+
+  memset(es, 0, sizeof(*es));
+
+  il_log(LOG_DEBUG, "  creating event store for id %s\n", job_id_s);
+
+  es->job_id_s = strdup(job_id_s);
+  es->event_file_name = jobid2eventfile(job_id_s);
+  es->control_file_name = jobid2controlfile(job_id_s);
+
+  if(pthread_rwlock_init(&es->commit_lock, NULL)) 
+          abort();
+  if(pthread_rwlock_init(&es->use_lock, NULL)) 
+         abort();
+
+  return(es);
+}
+
+
+static
+int
+event_store_lock_ro(struct event_store *es)
+{
+  assert(es != NULL);
+
+  if(pthread_rwlock_rdlock(&es->commit_lock)) 
+    abort();
+
+  return(0);
+}
+
+
+static
+int
+event_store_lock(struct event_store *es)
+{
+  assert(es != NULL);
+
+  if(pthread_rwlock_wrlock(&es->commit_lock)) 
+    abort();
+
+  return(0);
+}
+
+
+static
+int
+event_store_unlock(struct event_store *es)
+{
+  assert(es != NULL);
+
+  if(pthread_rwlock_unlock(&es->commit_lock)) 
+    abort();
+  return(0);
+}
+
+
+static
+int
+event_store_read_ctl(struct event_store *es)
+{
+  FILE *ctl_file;
+
+  assert(es != NULL);
+
+  event_store_lock(es);
+  if((ctl_file = fopen(es->control_file_name, "r")) == NULL) {
+    /* no control file, new event file */
+    es->last_committed_ls = 0;
+    es->last_committed_bs = 0;
+  } else {
+    /* read last seen and last committed counts */
+    fscanf(ctl_file, "%*s\n%ld\n%ld\n",
+          &es->last_committed_ls,
+          &es->last_committed_bs);
+    fclose(ctl_file);
+  }
+  event_store_unlock(es);
+
+  return(0);
+}
+
+
+static
+int
+event_store_write_ctl(struct event_store *es)
+{
+  FILE   *ctl;
+
+  assert(es != NULL);
+
+  ctl = fopen(es->control_file_name, "w");
+  if(ctl == NULL) {
+    set_error(IL_SYS, errno, "event_store_write_ctl: error opening control file");
+    return(-1);
+  }
+
+  if(fprintf(ctl, "%s\n%ld\n%ld\n", 
+            es->job_id_s, 
+            es->last_committed_ls,
+            es->last_committed_bs) < 0) {
+    set_error(IL_SYS, errno, "event_store_write_ctl: error writing control record");
+    return(-1);
+  }
+
+  if(fclose(ctl) < 0) {
+    set_error(IL_SYS, errno, "event_store_write_ctl: error closing control file");
+    return(-1);
+  }
+
+  return(0);
+}
+
+
+/*
+ * event_store_qurantine() 
+ *   - rename damaged event store file 
+ *   - essentially does the same actions as cleanup, but the event store 
+ *     does not have to be empty
+ * returns 0 on success, -1 on error
+ */
+static
+int
+event_store_quarantine(struct event_store *es) 
+{
+       int num;
+       char newname[MAXPATHLEN+1];
+
+       /* find available qurantine name */
+       /* we give it at most 1024 tries */
+       for(num = 0; num < 1024; num++) {
+               struct stat st;
+
+               snprintf(newname, MAXPATHLEN, "%s.quarantine.%d", es->event_file_name, num);
+               newname[MAXPATHLEN] = 0;
+               if(stat(newname, &st) < 0) {
+                       if(errno == ENOENT) {
+                               /* file not found */
+                               break;
+                       } else {
+                               /* some other error with name, probably permanent */
+                               set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename");
+                               return(-1);
+                               
+                       }
+               } else {
+                       /* the filename is used already */
+               }
+       }
+       if(num >= 1024) {
+               /* new name not found */
+               /* XXX - is there more suitable error? */
+               set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename");
+               return(-1);
+       }
+
+       /* actually rename the file */
+       il_log(LOG_DEBUG, "    renaming damaged event file from %s to %s\n",
+              es->event_file_name, newname);
+       if(rename(es->event_file_name, newname) < 0) {
+               set_error(IL_SYS, errno, "event_store_quarantine: error renaming event file");
+               return(-1);
+       }
+
+       /* clear the counters */
+       es->last_committed_ls = 0;
+       es->last_committed_bs = 0;
+       es->offset = 0;
+
+       return(0);
+}
+
+
+/*
+ * event_store_recover()
+ *   - recover after restart or catch up when events missing in IPC
+ *   - if offset > 0, read everything behind it
+ *   - if offset == 0, read everything behind min(last_committed_bs, last_committed_es)
+ */
+int
+event_store_recover(struct event_store *es)
+{
+  struct event_queue *eq_l = NULL, *eq_b = NULL;
+  struct server_msg *msg;
+  il_http_message_t hmsg;
+  char *event_s;
+  int fd, ret;
+  long last;
+  FILE *ef;
+  struct flock efl;
+  char err_msg[128];
+  struct stat stbuf;
+
+  assert(es != NULL);
+  
+#if defined(IL_NOTIFICATIONS)
+  /* destination queue has to be found for each message separately */
+#else
+  /* find bookkepping server queue */
+  eq_b = queue_list_get(es->job_id_s);
+  if(eq_b == NULL) 
+    return(-1);
+#endif
+
+#if !defined(IL_NOTIFICATIONS)
+  /* get log server queue */
+  eq_l = queue_list_get(NULL);
+#endif
+
+  event_store_lock(es);
+
+  il_log(LOG_DEBUG, "  reading events from %s\n", es->event_file_name);
+
+  /* open event file */
+  ef = fopen(es->event_file_name, "r");
+  if(ef == NULL) {
+         snprintf(err_msg, sizeof(err_msg), 
+                  "event_store_recover: error opening event file %s",
+                  es->event_file_name);
+         set_error(IL_SYS, errno, err_msg);
+         event_store_unlock(es);
+         return(-1);
+  }
+
+  /* lock the file for reading (we should not read while dglogd is writing) */
+  fd = fileno(ef);
+  efl.l_type = F_RDLCK;
+  efl.l_whence = SEEK_SET;
+  efl.l_start = 0;
+  efl.l_len = 0;
+  if(fcntl(fd, F_SETLKW, &efl) < 0) {
+         snprintf(err_msg, sizeof(err_msg), 
+                  "event_store_recover: error locking event file %s",
+                  es->event_file_name);
+         set_error(IL_SYS, errno, err_msg);
+         event_store_unlock(es);
+         fclose(ef);
+         return(-1);
+  }
+
+  /* check the file modification time and size to avoid unnecessary operations */
+  memset(&stbuf, 0, sizeof(stbuf));
+  if(fstat(fd, &stbuf) < 0) {
+         il_log(LOG_ERR, "    could not stat event file %s: %s\n", es->event_file_name, strerror(errno));
+         fclose(ef);
+         event_store_unlock(es);
+         return -1;
+  } else {
+         if((es->offset == stbuf.st_size) && (es->last_modified == stbuf.st_mtime)) {
+                 il_log(LOG_DEBUG, "  event file not modified since last visit, skipping\n");
+                 fclose(ef);
+                 event_store_unlock(es);
+                 return(0);
+         }
+  }
+
+  while(1) { /* try, try, try */
+
+         /* get the position in file to be sought */
+         if(es->offset)
+                 last = es->offset;
+         else {
+                 last = es->last_committed_bs;
+         }
+
+         il_log(LOG_DEBUG, "    setting starting file position to  %ld\n", last);
+         il_log(LOG_DEBUG, "    bytes sent to destination: %d\n", es->last_committed_bs);
+
+         if(last > 0) {
+                 int c;
+
+                 /* skip all committed or already enqueued events */
+                 /* be careful - check, if the offset really points to the
+                    beginning of event string */
+                 if(fseek(ef, last - 1, SEEK_SET) < 0) {
+                         set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
+                         event_store_unlock(es);
+                         fclose(ef);
+                         return(-1);
+                 }
+                 /* the last enqueued event MUST end with \n */
+                 if((c=fgetc(ef)) != '\n') {
+                         /* Houston, we have got a problem */
+                         il_log(LOG_WARNING, 
+                                "    file position %ld does not point at the beginning of event string, backing off!\n",
+                                last);
+                         /* now, where were we? */
+                         if(es->offset) {
+                                 /* next try will be with
+                                    last_commited_bs */
+                                 es->offset = 0;
+                         } else {
+                                 /* this is really weird... back off completely */
+                                 es->last_committed_ls = es->last_committed_bs = 0;
+                         }
+                 } else {
+                         /* OK, break out of the loop */
+                         break;
+                 }
+         } else {
+                 /* this breaks out of the loop, we are starting at
+                  * the beginning of file
+                  */
+                 if(fseek(ef, 0, SEEK_SET) < 0) {
+                         set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
+                         event_store_unlock(es);
+                         fclose(ef);
+                         return(-1);
+                 }
+                 break;
+         }
+  }
+
+  /* enqueue all remaining events */
+  ret = 1;
+  msg = NULL;
+  while(read_event_string(ef, &hmsg) >= 0) {
+       
+    /* last holds the starting position of event_s in file */
+    il_log(LOG_DEBUG, "    reading event at %ld\n", last);
+
+    /* break from now on means there was some error */
+    ret = -1;
+
+    /* create message for server */
+    msg = server_msg_create((il_octet_string_t*)&hmsg, last);
+    if(msg == NULL) {
+           il_log(LOG_ALERT, "    event file corrupted! I will try to move it to quarantine (ie. rename it).\n");
+           /* actually do not bother if quarantine succeeded or not - we could not do more */
+           event_store_quarantine(es);
+           fclose(ef);
+           event_store_unlock(es);
+           return(-1);
+    }
+    msg->es = es;
+
+    /* first enqueue to the LS */
+    if(!bs_only && (last >= es->last_committed_ls)) {
+      
+           il_log(LOG_DEBUG, "      queueing event at %ld to server %s\n", last, eq_l->dest_name);
+
+#if !defined(IL_NOTIFICATIONS)
+      if(enqueue_msg(eq_l, msg) < 0)
+       break;
+#endif
+      }
+
+#ifdef IL_NOTIFICATIONS
+    eq_b = queue_list_get(msg->dest);
+#endif
+
+    /* now enqueue to the BS, if neccessary */
+    if((eq_b != eq_l) && 
+       (last >= es->last_committed_bs)) {
+      
+           il_log(LOG_DEBUG, "      queueing event at %ld to server %s\n", last, eq_b->dest_name);
+      
+      if(enqueue_msg(eq_b, msg) < 0)
+       break;
+    }
+    server_msg_free(msg);
+    msg = NULL;
+
+    /* now last is also the offset behind the last successfully queued event */
+    last = ftell(ef);
+
+    /* ret == 0 means EOF or incomplete event found */
+    ret = 0;
+
+  } /* while */
+
+  /* due to this little assignment we had to lock the event_store for writing */
+  es->offset = last;
+  es->last_modified = stbuf.st_mtime;
+  il_log(LOG_DEBUG, "  event store offset set to %ld\n", last);
+
+  if(msg) 
+    server_msg_free(msg);
+
+  fclose(ef);
+  il_log(LOG_DEBUG, "  finished reading events with %d\n", ret);
+
+  event_store_unlock(es);
+  return(ret);
+}
+
+
+/*
+ * event_store_sync()
+ *   - check the position of event and fill holes from file
+ *   - return 1 if the event is new,
+ *            0 if it was seen before,
+ *           -1 if there was an error
+ */
+int
+event_store_sync(struct event_store *es, long offset)
+{
+       int ret;
+
+       assert(es != NULL);
+
+       /* all events are actually read from file, the event on socket
+        * is ignored and serves just to notify us about file change
+        */
+       ret = event_store_recover(es);
+       ret = (ret < 0) ? ret : 0;
+       return(ret);
+}
+
+
+int
+event_store_next(struct event_store *es, long offset, int len)
+{
+       assert(es != NULL);
+  
+       /* offsets are good only to detect losses (differences between socket and file),
+          which is not possible now */
+       return 0;
+}
+
+
+/* 
+ * event_store_commit()
+ *
+ */
+int
+event_store_commit(struct event_store *es, int len, int ls)
+{
+  assert(es != NULL);
+
+  event_store_lock(es);
+
+  if(ls)
+    es->last_committed_ls += len;
+  else {
+    es->last_committed_bs += len;
+    if (bs_only) es->last_committed_ls += len;
+  }
+
+  if(event_store_write_ctl(es) < 0) {
+    event_store_unlock(es);
+    return(-1);
+  }
+
+  event_store_unlock(es);
+
+
+  return(0);
+}
+
+
+/*
+ * event_store_clean()
+ *  - remove the event files (event and ctl), if they are not needed anymore
+ *  - returns 0 if event_store is in use, 1 if it was removed and -1 on error
+ *
+ * Q: How do we know that we can safely remove the files?
+ * A: When all events from file have been committed both by LS and BS.
+ */
+static 
+int
+event_store_clean(struct event_store *es)
+{
+  long last;
+  int fd;
+  FILE *ef;
+  struct flock efl;
+
+  assert(es != NULL);
+
+  /* prevent sender threads from updating */
+  event_store_lock(es);
+  
+  il_log(LOG_DEBUG, "  trying to cleanup event store %s\n", es->job_id_s);
+  il_log(LOG_DEBUG, "    bytes sent to logging server: %d\n", es->last_committed_ls);
+  il_log(LOG_DEBUG, "    bytes sent to bookkeeping server: %d\n", es->last_committed_bs);
+
+  /* preliminary check to avoid opening event file */
+  /* if the positions differ, some events still have to be sent */
+  if(es->last_committed_ls != es->last_committed_bs) {
+    event_store_unlock(es);
+    il_log(LOG_DEBUG, "  not all events sent, cleanup aborted\n");
+    return(0);
+  }
+
+  /* the file can only be removed when all the events were succesfully sent 
+     (ie. committed both by LS and BS */
+  /* That also implies that the event queues are 'empty' at the moment. */
+  ef = fopen(es->event_file_name, "r+");
+  if(ef == NULL) {
+    /* if we can not open the event store, it is an error and the struct should be removed */
+    /* XXX - is it true? */
+    event_store_unlock(es);
+    il_log(LOG_ERR,  "  event_store_clean: error opening event file: %s\n", strerror(errno));
+    return(1);
+  }
+  
+  fd = fileno(ef);
+  
+  /* prevent local-logger from writing into event file */
+  efl.l_type = F_WRLCK;
+  efl.l_whence = SEEK_SET;
+  efl.l_start = 0;
+  efl.l_len = 0;
+  if(fcntl(fd, F_SETLK, &efl) < 0) {
+    il_log(LOG_DEBUG, "    could not lock event file, cleanup aborted\n");
+    fclose(ef);
+    event_store_unlock(es);
+    if(errno != EACCES &&
+       errno != EAGAIN) {
+      set_error(IL_SYS, errno, "event_store_clean: error locking event file");
+      return(-1);
+    }
+    return(0);
+  }
+  
+  /* now the file should not contain partially written event, so it is safe
+     to get offset behind last event by seeking the end of file */
+  if(fseek(ef, 0, SEEK_END) < 0) {
+    set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file");
+    event_store_unlock(es);
+    fclose(ef);
+    return(-1);
+  }
+  
+  last = ftell(ef);
+  il_log(LOG_DEBUG, "    total bytes in file: %d\n", last);
+
+  if(es->last_committed_ls < last) {
+    fclose(ef);
+    event_store_unlock(es);
+    il_log(LOG_DEBUG, "    events still waiting in queue, cleanup aborted\n");
+    return(0);
+  } else if( es->last_committed_ls > last) {
+         il_log(LOG_WARNING, "  warning: event file seems to shrink!\n");
+         /* XXX - in that case we can not continue because there may be
+            some undelivered events referring to that event store */
+         fclose(ef);
+         event_store_unlock(es);
+         return(0);
+  }
+  
+  /* now we are sure that all events were sent and the event queues are empty */
+  il_log(LOG_INFO, "    removing event file %s\n", es->event_file_name);
+  
+  /* remove the event file */
+  unlink(es->event_file_name);
+  unlink(es->control_file_name);
+  
+  /* clear the counters */
+  es->last_committed_ls = 0;
+  es->last_committed_bs = 0;
+  es->offset = 0;
+
+  /* unlock the event_store even if it is going to be removed */
+  event_store_unlock(es);
+
+  /* close the event file (that unlocks it as well) */
+  fclose(ef);
+
+  /* indicate that it is safe to remove this event_store */
+  return(1);
+}
+
+
+/* --------------------------------
+ * event store management functions
+ * --------------------------------
+ */
+struct event_store *
+event_store_find(char *job_id_s)
+{
+  struct event_store_list *q, *p;
+  struct event_store *es;
+
+  if(pthread_rwlock_wrlock(&store_list_lock)) {
+         abort();
+  }
+
+  es = NULL;
+  
+  q = NULL;
+  p = store_list;
+  
+  while(p) {
+    if(strcmp(p->es->job_id_s, job_id_s) == 0) {
+      es = p->es;
+      if(pthread_rwlock_rdlock(&es->use_lock))
+             abort();
+      if(pthread_rwlock_unlock(&store_list_lock)) 
+             abort();
+      return(es);
+    }
+
+    q = p;
+    p = p->next;
+  }
+
+  es = event_store_create(job_id_s);
+  if(es == NULL) {
+         if(pthread_rwlock_unlock(&store_list_lock)) 
+                 abort();
+         return(NULL);
+  }
+
+  p = malloc(sizeof(*p));
+  if(p == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "event_store_find: no room for new event store");
+      if(pthread_rwlock_unlock(&store_list_lock)) 
+             abort();
+    return(NULL);
+  }
+  
+  p->next = store_list;
+  store_list = p;
+    
+  p->es = es;
+
+  if(pthread_rwlock_rdlock(&es->use_lock))
+         abort();
+
+  if(pthread_rwlock_unlock(&store_list_lock)) 
+         abort();
+
+  return(es);
+}
+
+
+int
+event_store_release(struct event_store *es)
+{
+       assert(es != NULL);
+
+       if(pthread_rwlock_unlock(&es->use_lock))
+               abort();
+       il_log(LOG_DEBUG, "  released lock on %s\n", es->job_id_s);
+       return(0);
+}
+
+
+event_store_from_file(char *filename)
+{
+       struct event_store *es;
+       FILE *event_file;
+       char *job_id_s = NULL, *p;
+       il_http_message_t hmsg;
+       int ret;
+       
+       il_log(LOG_INFO, "  attaching to event file: %s\n", filename);
+       
+       if(strstr(filename, "quarantine") != NULL) {
+               il_log(LOG_INFO, "  file name belongs to quarantine, not touching that.\n");
+               return(0);
+       }
+
+       event_file = fopen(filename, "r");
+       if(event_file == NULL) {
+               set_error(IL_SYS, errno, "event_store_from_file: error opening event file");
+               return(-1);
+       }
+       ret = read_event_string(event_file, &hmsg);
+       fclose(event_file);
+       if(ret < 0) 
+               return(0);
+       
+       /* get id aka dest */
+       job_id_s = hmsg.host;
+
+       il_log(LOG_DEBUG, "  message dest: '%s'\n", job_id_s);
+       if(job_id_s == NULL) {
+               il_log(LOG_NOTICE, "  skipping file, could not parse event\n");
+               ret = 0;
+               goto out;
+       }
+       
+       es=event_store_find(job_id_s);
+       
+       if(es == NULL) {
+               ret = -1;
+               goto out;
+       }
+
+       if((es->last_committed_ls == 0) &&
+          (es->last_committed_bs == 0) &&
+          (es->offset == 0)) {
+               ret = event_store_read_ctl(es);
+       } else 
+               ret = 0;
+       
+       event_store_release(es);
+
+out:
+       if(hmsg.data) free(hmsg.data);
+       if(job_id_s) free(job_id_s);
+       return(ret);
+}
+
+
+int
+event_store_init(char *prefix)
+{
+  if(file_prefix == NULL) {
+    file_prefix = strdup(prefix);
+    store_list = NULL;
+  }
+
+  /* read directory and get a list of event files */
+  {
+    int len;
+
+    char *p, *dir;
+    DIR *event_dir;
+    struct dirent *entry;
+
+
+    /* get directory name */
+    p = strrchr(file_prefix, '/');
+    if(p == NULL) {
+      dir = strdup(".");
+      p = "";
+      len = 0;
+    } else {
+      *p = '\0';
+      dir = strdup(file_prefix);
+      *p++ = '/';
+      len = strlen(p);
+    }
+
+    event_dir = opendir(dir);
+    if(event_dir == NULL) {
+      free(dir);
+      set_error(IL_SYS, errno, "event_store_init: error opening event directory");
+      return(-1);
+    }
+    
+    while((entry=readdir(event_dir))) {
+      char *s;
+
+      /* skip all files that do not match prefix */
+      if(strncmp(entry->d_name, p, len) != 0) 
+       continue;
+
+      /* skip all control files */
+      if((s=strstr(entry->d_name, ".ctl")) != NULL &&
+        s[4] == '\0')
+       continue;
+
+      s = malloc(strlen(dir) + strlen(entry->d_name) + 2);
+      if(s == NULL) {
+       free(dir);
+       set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for file name");
+       return(-1);
+      }
+
+      *s = '\0';
+      strcat(s, dir);
+      strcat(s, "/");
+      strcat(s, entry->d_name);
+
+      if(event_store_from_file(s) < 0) {
+       free(dir);
+       free(s);
+       closedir(event_dir);
+       return(-1);
+      }
+
+      free(s);
+    }
+    closedir(event_dir);
+
+    /* one more pass - this time remove stale .ctl files */
+    event_dir = opendir(dir);
+    if(event_dir == NULL) {
+      free(dir);
+      set_error(IL_SYS, errno, "event_store_init: error opening event directory");
+      return(-1);
+    }
+    
+    while((entry=readdir(event_dir))) {
+      char *s;
+
+      /* skip all files that do not match prefix */
+      if(strncmp(entry->d_name, p, len) != 0) 
+       continue;
+
+      /* find all control files */
+      if((s=strstr(entry->d_name, ".ctl")) != NULL &&
+        s[4] == '\0') {
+             char *ef;
+             struct stat st;
+
+             /* is there corresponding event file? */
+             ef = malloc(strlen(dir) + strlen(entry->d_name) + 2);
+             if(ef == NULL) {
+                     free(dir);
+                     set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for event file name");
+                     return(-1);
+             }
+
+             s[0] = 0;
+             *ef = '\0';
+             strcat(ef, dir);
+             strcat(ef, "/");
+             strcat(ef, entry->d_name);
+             s[0] = '.';
+
+             if(stat(ef, &st) == 0) {
+                     /* something is there */
+                     /* XXX - it could be something else than event file, but do not bother now */
+             } else {
+                     /* could not stat file, remove ctl */
+                     strcat(ef, s);
+                     il_log(LOG_DEBUG, "  removing stale file %s\n", ef);
+                     if(unlink(ef)) 
+                             il_log(LOG_ERR, "  could not remove file %s: %s\n", ef, strerror(errno));
+                     
+             }
+             free(ef);
+
+      }
+    }
+    closedir(event_dir);
+    free(dir);
+  }
+
+  return(0);
+}
+
+
+int
+event_store_recover_all()
+{
+  struct event_store_list *sl;
+
+
+  if(pthread_rwlock_rdlock(&store_list_lock)) 
+         abort();
+
+  /* recover all event stores */
+  sl = store_list;
+  while(sl != NULL) {
+
+         /* recover this event store */
+         /* no need to lock use_lock in event_store, the store_list_lock is in place */
+         if(event_store_recover(sl->es) < 0) {
+                 il_log(LOG_ERR, "  error recovering event store %s:\n    %s\n", sl->es->event_file_name, error_get_msg());
+                 clear_error();
+         }
+         sl = sl->next;
+  }
+  
+  if(pthread_rwlock_unlock(&store_list_lock)) 
+         abort();
+
+  return(0);
+}
+
+
+#if 0 
+int
+event_store_remove(struct event_store *es)
+{
+  struct event_store_list *p, **q;
+
+  assert(es != NULL);
+
+  switch(event_store_clean(es)) {
+  case 0:
+    il_log(LOG_DEBUG, "  event store not removed, still used\n");
+    return(0);
+    
+  case 1:
+    if(pthread_rwlock_wrlock(&store_list_lock) < 0) {
+      set_error(IL_SYS, errno, "  event_store_remove: error locking event store list");
+      return(-1);
+    }
+
+    p = store_list;
+    q = &store_list;
+
+    while(p) {
+      if(p->es == es) {
+       (*q) = p->next;
+       event_store_free(es);
+       free(p);
+       break;
+      }
+      q = &(p->next);
+      p = p->next;
+    }
+
+    if(pthread_rwlock_unlock(&store_list_lock) < 0) {
+      set_error(IL_SYS, errno, "  event_store_remove: error unlocking event store list");
+      return(-1);
+    }
+    return(1);
+
+  default:
+    return(-1);
+  }
+  /* not reached */
+  return(0);
+}
+#endif
+
+int
+event_store_cleanup()
+{
+  struct event_store_list *sl;
+  struct event_store_list *slnext;
+  struct event_store_list **prev;
+
+  /* try to remove event files */
+
+  if(pthread_rwlock_wrlock(&store_list_lock)) 
+         abort();
+
+  sl = store_list;
+  prev = &store_list;
+
+  while(sl != NULL) {
+         int ret;
+
+         slnext = sl->next;
+         
+         /* one event store at time */
+         ret = pthread_rwlock_trywrlock(&sl->es->use_lock);
+         if(ret == EBUSY) {
+                 il_log(LOG_DEBUG, "  event_store %s is in use by another thread\n", 
+                        sl->es->job_id_s);
+                 sl = slnext;
+                 continue;
+         } else if (ret < 0)
+           abort();
+
+         switch(event_store_clean(sl->es)) {
+                 
+         case 1:
+                 /* remove this event store */
+                 (*prev) = slnext;
+                 event_store_free(sl->es);
+                 free(sl);
+                 break;
+                 
+         case -1:
+                 il_log(LOG_ERR, "  error removing event store %s (file %s):\n    %s\n", 
+                        sl->es->job_id_s, sl->es->event_file_name, error_get_msg());
+                 /* event_store_release(sl->es); */
+                 clear_error();
+                 /* go on to the next */
+                 
+         default:
+                 event_store_release(sl->es);
+                 prev = &(sl->next);
+                 break;
+         }
+         
+         sl = slnext;
+  }
+  
+  if(pthread_rwlock_unlock(&store_list_lock)) 
+         abort();
+  
+  return(0);
+}
+
diff --git a/org.glite.lb.logger/src/http.c b/org.glite.lb.logger/src/http.c
new file mode 100644 (file)
index 0000000..c9fb89b
--- /dev/null
@@ -0,0 +1,197 @@
+#ident "$Header$"
+
+#include <string.h>
+#include <errno.h>
+
+#include "interlogd.h"
+
+
+int 
+parse_request(const char *s, il_http_message_t *msg)
+{
+       if(!strncasecmp(s, "HTTP", 4)) {
+               msg->msg_type = IL_HTTP_REPLY;
+       } else if(!strncasecmp(s, "POST", 4)) {
+               msg->msg_type = IL_HTTP_POST;
+       } else if(!strncasecmp(s, "GET", 3)) {
+               msg->msg_type = IL_HTTP_GET;
+       } else {
+               msg->msg_type = IL_HTTP_OTHER;
+       }
+       if(msg->msg_type == IL_HTTP_REPLY) {
+               char *p = strchr(s, ' ');
+
+               if(!p) goto parse_end;
+               p++;
+               msg->reply_code=atoi(p);
+               p = strchr(p, ' ');
+               if(!p) goto parse_end;
+               p++;
+               msg->reply_string = strdup(p);
+
+       parse_end:
+               ;
+       }
+}
+
+
+int
+parse_header(const char *s, il_http_message_t *msg)
+{
+       if(!strncasecmp(s, "Content-Length:", 15)) {
+               msg->content_length = atoi(s + 15);
+       } else if(!strncasecmp(s, "Host:", 5)) {
+               const char *p = s + 4;
+               while(*++p == ' '); /* skip spaces */
+               msg->host = strdup(p);
+       }
+       return(0);
+}
+
+
+#define DEFAULT_CHUNK_SIZE 1024
+
+// read what is available and parse what can be parsed
+// returns the result of read operation of the underlying connection,
+// ie. the number of bytes read or error code
+int
+receive_http(void *user_data, int (*reader)(void *, char *, const int), il_http_message_t *msg)
+{
+       static enum { NONE, IN_REQUEST, IN_HEADERS, IN_BODY } state = NONE;
+       int  len, alen, clen, i, buffer_free, min_buffer_free = DEFAULT_CHUNK_SIZE;
+       char *buffer, *p, *s, *cr;
+       
+       memset(msg, 0, sizeof(*msg));
+       // msg->data = NULL;
+       // msg->len = 0;
+       state = IN_REQUEST;
+       alen = 0;
+       buffer = NULL;
+       buffer_free = 0;
+       p = NULL;
+       s = NULL;
+
+       do {
+               /* p - first empty position in buffer
+                  alen - size of allocated buffer
+                  len - number of bytes received in last read
+                  s - points behind last scanned CRLF or at buffer start 
+                  buffer_free = alen - (p - buffer) 
+               */
+
+               /* prepare at least chunk_size bytes for next data */
+               if(buffer_free < min_buffer_free) {
+                       char *n;
+                       
+                       alen += min_buffer_free;
+                       n = realloc(buffer, alen);
+                       if(n == NULL) {
+                               free(buffer);
+                               set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
+                               return(-1);
+                       }
+                       buffer_free += min_buffer_free;
+                       p = n + (p - buffer);
+                       s = n + (s - buffer);
+                       buffer = n;
+               }
+
+               if(buffer_free > 0) {
+                       len = (*reader)(user_data, p, buffer_free); 
+                       if(len < 0) {
+                               // error
+                               free(buffer);
+                               // set_error(IL_SYS, errno, "receive_http: error reading data");
+                               return -1;
+                       } else if(len == 0) {
+                               // EOF
+                               free(buffer);
+                               set_error(IL_PROTO, errno, "receive_http: error reading data - premature EOF");
+                               return -1;
+                       }
+                       buffer_free -= len;
+                       p+= len;
+               }
+
+
+               switch(state) {
+
+                       // parse buffer, look for CRLFs
+                       //   s - start scan position
+                       //   p - start of current token
+                       //   cr - current CRLF position
+
+               case IN_REQUEST:
+                       if((s < p - 1) &&
+                          (cr = (char*)memchr(s, '\r', p - s - 1)) &&
+                          (cr[1] == '\n')) {
+                               *cr = 0;
+                               parse_request(s, msg);
+                               *cr = '\r';
+                               // change state
+                               state = IN_HEADERS;
+                               // start new tokens (cr < p - 1 -> s < p + 1 <-> s <= p)
+                               s = cr + 2;
+                       } else {
+                         break;
+                       }
+
+               case IN_HEADERS:  
+                       while((state != IN_BODY) &&
+                             (s < p - 1) && 
+                             (cr = (char*)memchr(s, '\r', p - s - 1)) &&
+                             (cr[1] == '\n')) {
+                               if(s == cr) { /* do not consider request starting with CRLF */
+                                       // found CRLFCRLF
+                                       state = IN_BODY;
+                               } else {
+                                       *cr = 0;
+                                       parse_header(s, msg);
+                                       *cr = '\r';
+                               }
+                               // next scan starts after CRLF
+                               s = cr + 2; 
+                       }
+                       if(state == IN_BODY) {
+                               // we found body
+                               // content-length should be set at the moment
+                               if(msg->content_length > 0) {
+                                       int need_free = msg->content_length - (p - s);
+                                       char *n;
+                       
+                                       alen += need_free - buffer_free + 1;
+                                       n = realloc(buffer, alen);
+                                       if(n == NULL) {
+                                               free(buffer);
+                                               set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
+                                               return(-1);
+                                       }
+                                       buffer_free = need_free;
+                                       min_buffer_free = 0;
+                                       p = n + (p - buffer);
+                                       s = n + (s - buffer);
+                                       buffer = n;
+                               } else {
+                                       // report error
+                                       free(buffer);
+                                       set_error(IL_PROTO, EINVAL, "receive_http: error reading data - no content length specified\n");
+                                       return -1;
+                               }
+                       }
+                       break;
+                       
+               case IN_BODY:
+                       if(buffer_free == 0) {
+                               // finished reading
+                               *p = 0;
+                               state = NONE;
+                       }
+                       break;
+               }
+       } while(state != NONE);
+       
+       msg->data = buffer;
+       msg->len = p - buffer;
+
+       return 0;
+}
diff --git a/org.glite.lb.logger/src/input_queue_socket_http.c b/org.glite.lb.logger/src/input_queue_socket_http.c
new file mode 100644 (file)
index 0000000..939c45f
--- /dev/null
@@ -0,0 +1,167 @@
+#ident "$Header$"
+
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "interlogd.h"
+
+static const int   SOCK_QUEUE_MAX = 50;
+extern char *socket_path;
+extern char *file_prefix;
+
+static int sock;
+static int accepted;
+
+static
+int plain_reader(void *user_data, char *buffer, const int len)
+{
+       return (recv(*(int*)user_data, buffer, len, MSG_NOSIGNAL));
+}
+
+                
+int 
+input_queue_attach()
+{ 
+  struct sockaddr_un saddr;
+
+  if((sock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
+    set_error(IL_SYS, errno, "input_queue_attach: error creating socket");
+    return(-1);
+  }
+
+  memset(&saddr, 0, sizeof(saddr));
+  saddr.sun_family = AF_UNIX;
+  strcpy(saddr.sun_path, socket_path);
+
+  /* test for the presence of the socket and another instance 
+     of interlogger listening */
+  if(connect(sock, (struct sockaddr *)&saddr, sizeof(saddr.sun_path)) < 0) {
+         if(errno == ECONNREFUSED) {
+                 /* socket present, but no one at the other end; remove it */
+                 il_log(LOG_WARNING, "  removing stale input socket %s\n", socket_path);
+                 unlink(socket_path);
+         }
+         /* ignore other errors for now */
+  } else {
+         /* connection was successful, so bail out - there is 
+            another interlogger running */
+         set_error(IL_SYS, EADDRINUSE, "input_queue_attach: another instance of interlogger is running");
+         return(-1);
+  }
+  
+  if(bind(sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
+    set_error(IL_SYS, errno, "input_queue_attach: error binding socket");
+    return(-1);
+  }
+
+  if (listen(sock, SOCK_QUEUE_MAX)) {
+    set_error(IL_SYS, errno, "input_queue_attach: error listening on socket");
+    return -1;
+  }
+
+  return(0);
+}
+
+
+void input_queue_detach()
+{
+  if (sock >= 0)
+    close(sock);
+  unlink(socket_path);
+}
+
+
+
+/*
+ * Returns: -1 on error, 0 if no message available, message length otherwise
+ *
+ */
+#ifdef PERF_EVENTS_INLINE
+int
+input_queue_get(il_octet_string_t **buffer, long *offset, int timeout)
+{
+       static long o = 0;
+       int len;
+       char *jobid;
+       static il_octet_string_t my_buffer;
+       
+       assert(buffer != NULL);
+
+       *buffer = &my_buffer;
+
+       len = glite_wll_perftest_produceEventString(&my_buffer.data, &jobid);
+       my_buffer.len = len;
+       if(len) {
+               o += len;
+               *offset = o;
+       } else if (len == 0) {
+               sleep(timeout);
+       }
+       return(len);
+}
+#else
+int
+input_queue_get(il_octet_string_t **buffer, long *offset, int timeout)
+{
+  fd_set fds;
+  struct timeval tv;
+  int msg_len;
+  static il_http_message_t msg;
+
+  assert(buffer != NULL);
+
+  *buffer = (il_octet_string_t *)&msg;
+
+  FD_ZERO(&fds);
+  FD_SET(sock, &fds);
+  
+  tv.tv_sec = timeout;
+  tv.tv_usec = 0;
+  
+  msg_len = select(sock + 1, &fds, NULL, NULL, timeout >= 0 ? &tv : NULL);
+  switch(msg_len) {
+     
+  case 0: /* timeout */
+    return(0);
+    
+  case -1: /* error */
+         switch(errno) {
+         case EINTR:
+                 il_log(LOG_DEBUG, "  interrupted while waiting for event!\n");
+                 return(0);
+
+         default:
+                 set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
+                 return(-1);
+         }
+  default:
+         break;
+  }
+  
+  if((accepted=accept(sock, NULL, NULL)) < 0) {
+    set_error(IL_SYS, errno, "input_queue_get: error accepting connection");
+    return(-1);
+  }
+
+  msg_len = receive_http(&accepted, plain_reader, &msg);
+
+  if(msg_len < 0) {
+         close(accepted);
+         if(error_get_maj() != IL_OK) 
+                 return -1;
+         else
+                 return 0;
+  }
+
+  close(accepted);
+  *offset = -1;
+  return(msg.len);
+}
+#endif
+
diff --git a/org.glite.lb.logger/src/queue_mgr_http.c b/org.glite.lb.logger/src/queue_mgr_http.c
new file mode 100644 (file)
index 0000000..777e620
--- /dev/null
@@ -0,0 +1,164 @@
+#ident "$Header$"
+
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "glite/jobid/cjobid.h"
+#include "glite/lb/context.h"
+
+#include "interlogd.h"
+
+struct queue_list {
+  struct event_queue *queue;
+  char   *dest;
+  struct queue_list *next;
+  time_t expires;
+};
+
+static struct event_queue *log_queue;
+static struct queue_list  *queues;
+
+
+static 
+int
+queue_list_create()
+{
+  queues = NULL;
+
+  return(0);
+}
+
+
+static
+int
+queue_list_find(struct queue_list *ql, const char *dest, struct queue_list **el, struct queue_list **prev)
+{
+  struct queue_list *q, *p;
+
+  assert(el != NULL);
+
+  *el = NULL;
+  if(prev)
+    *prev = NULL;
+
+  if(ql == NULL) 
+    return(0);
+
+  q = NULL;
+  p = ql;
+
+  while(p) {
+    if(strcmp(p->dest, dest) == 0) {
+      *el = p;
+      if(prev)
+       *prev = q;
+      return(1);
+    }
+
+    q = p;
+    p = p->next;
+  };
+
+  return(0);
+}
+
+
+static
+int
+queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq)
+{
+  struct queue_list *el;
+  
+  assert(dest != NULL);
+  assert(eq != NULL);
+  assert(ql != NULL);
+
+  el = malloc(sizeof(*el));
+  if(el == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough room for new queue");
+    return(-1);
+  }
+
+  el->dest = strdup(dest);
+  if(el->dest == NULL) {
+    free(el);
+    set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough memory for new queue");
+    return(-1);
+  }
+  el->queue = eq;
+  el->next = *ql;
+  *ql = el;
+  return 0;
+}
+
+
+struct event_queue *
+queue_list_get(char *job_id_s)
+{
+  char *dest;
+  struct queue_list *q;
+  struct event_queue *eq;
+  dest = job_id_s;
+
+  if(dest == NULL) 
+    return(NULL);
+  
+  if(queue_list_find(queues, dest, &q, NULL)) {
+    return(q->queue);
+  } else {
+    eq = event_queue_create(dest);
+    if(eq)
+      queue_list_add(&queues, dest, eq);
+    return(eq);
+  }
+}
+
+
+int
+queue_list_is_log(struct event_queue *eq)
+{
+  return(eq == queue_list_get(NULL));
+}
+
+
+int
+queue_list_init(char *ls)
+{
+  return(queue_list_create());
+}
+
+
+static struct queue_list *current;
+
+
+struct event_queue *
+queue_list_first()
+{
+  current = queues;
+  return(current ? current->queue : NULL);
+}
+
+
+struct event_queue *
+queue_list_next()
+{
+  current = current ? current->next : NULL;
+  return(current ? current->queue : NULL);
+}
+
+
+int
+queue_list_remove_queue(struct event_queue *eq)
+{
+  assert(eq != NULL);
+
+  free(eq);
+  return(1);
+}
+
+
+
+/* Local Variables:           */
+/* c-indentation-style: gnu   */
+/* End:                       */
diff --git a/org.glite.lb.logger/src/send_event_http.c b/org.glite.lb.logger/src/send_event_http.c
new file mode 100644 (file)
index 0000000..3c90562
--- /dev/null
@@ -0,0 +1,282 @@
+#ident "$Header$"
+
+#include <assert.h>
+#include <errno.h>
+#include <stdio.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+
+/*
+ *   - L/B server protocol handling routines 
+ */
+
+#include "glite/jobid/cjobid.h"
+#include "glite/lb/il_string.h"
+#include "glite/lb/context.h"
+
+#include "interlogd.h"
+
+struct reader_data {
+       edg_wll_GssConnection *gss;
+       struct timeval *timeout;
+};
+
+
+static
+int
+gss_reader(void *user_data, char *buffer, int max_len)
+{
+  int ret;
+  struct reader_data *data = (struct reader_data *)user_data;
+  edg_wll_GssStatus gss_stat;
+
+  ret = edg_wll_gss_read(data->gss, buffer, max_len, data->timeout, &gss_stat);
+  if(ret < 0) {
+    char *gss_err = NULL;
+
+    if(ret == EDG_WLL_GSS_ERROR_GSS) {
+      edg_wll_gss_get_error(&gss_stat, "get_reply", &gss_err);
+      set_error(IL_DGGSS, ret, gss_err);
+      free(gss_err);
+    } else 
+      set_error(IL_DGGSS, ret, "get_reply");
+  }
+  return(ret);
+}
+
+
+/*
+ * Read reply from server.
+ *  Returns: -1 - error reading message, 
+ *         code > 0 - http status code from server
+ */
+static
+int 
+get_reply(struct event_queue *eq, char **buf, int *code_min)
+{
+  int ret, code;
+  int len;
+  struct timeval tv;
+  struct reader_data data;
+  il_http_message_t msg;
+
+  tv.tv_sec = TIMEOUT;
+  tv.tv_usec = 0;
+  data.gss = &eq->gss;
+  data.timeout = &tv;
+  len = receive_http(&data, gss_reader, &msg);
+  if(len < 0) {
+    set_error(IL_PROTO, LB_PROTO, "get_reply: error reading server reply");
+    return(-1);
+  }
+  if(msg.data) free(msg.data);
+  if(msg.reply_string) *buf = msg.reply_string;
+  *code_min = 0; /* XXX fill in flag for fault */
+  return(msg.reply_code);
+}
+
+
+
+/*
+ *  Returns: 0 - not connected, timeout set, 1 - OK
+ */
+int 
+event_queue_connect(struct event_queue *eq)
+{
+  int ret;
+  struct timeval tv;
+  edg_wll_GssStatus gss_stat;
+  cred_handle_t *local_cred_handle;
+
+  assert(eq != NULL);
+
+#ifdef LB_PERF
+  if(!nosend) {
+#endif
+
+  if(eq->gss.context == NULL) {
+
+    tv.tv_sec = TIMEOUT;
+    tv.tv_usec = 0;
+
+    /* get pointer to the credentials */
+    if(pthread_mutex_lock(&cred_handle_lock) < 0)
+           abort();
+    local_cred_handle = cred_handle;
+    local_cred_handle->counter++;
+    if(pthread_mutex_unlock(&cred_handle_lock) < 0)
+           abort();
+    
+    il_log(LOG_DEBUG, "    trying to connect to %s:%d\n", eq->dest_name, eq->dest_port);
+    ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &eq->gss, &gss_stat);
+    if(pthread_mutex_lock(&cred_handle_lock) < 0)
+           abort();
+    /* check if we need to release the credentials */
+    --local_cred_handle->counter;
+    if(local_cred_handle != cred_handle && local_cred_handle->counter == 0) {
+           edg_wll_gss_release_cred(&local_cred_handle->creds, NULL);
+           free(local_cred_handle);
+           il_log(LOG_DEBUG, "   freed credentials, not used anymore\n");
+    }
+    if(pthread_mutex_unlock(&cred_handle_lock) < 0) 
+           abort();
+
+    if(ret < 0) {
+      char *gss_err = NULL;
+
+      if (ret == EDG_WLL_GSS_ERROR_GSS)
+        edg_wll_gss_get_error(&gss_stat, "event_queue_connect: edg_wll_gss_connect", &gss_err);
+      set_error(IL_DGGSS, ret,
+               (ret == EDG_WLL_GSS_ERROR_GSS) ? gss_err : "event_queue_connect: edg_wll_gss_connect");
+      if (gss_err) free(gss_err);
+      eq->gss.context = NULL;
+      eq->timeout = TIMEOUT;
+      return(0);
+    }
+  }
+
+#ifdef LB_PERF
+  }
+#endif
+
+  return(1);
+}
+
+
+int
+event_queue_close(struct event_queue *eq)
+{
+  assert(eq != NULL);
+
+#ifdef LB_PERF
+  if(!nosend) {
+#endif
+
+  if(eq->gss.context != NULL) {
+    edg_wll_gss_close(&eq->gss, NULL);
+    eq->gss.context = NULL;
+  }
+#ifdef LB_PERF
+  }
+#endif
+  return(0);
+}
+
+
+/* 
+ * Send all events from the queue.
+ *   Returns: -1 - system error, 0 - not sent, 1 - queue empty
+ */
+int 
+event_queue_send(struct event_queue *eq)
+{
+  int events_sent = 0;
+  assert(eq != NULL);
+
+#ifdef LB_PERF
+  if(!nosend) {
+#endif
+  if(eq->gss.context == NULL)
+    return(0);
+#ifdef LB_PERF
+  }
+#endif
+
+  /* feed the server with events */
+  while (!event_queue_empty(eq)) {
+    struct server_msg *msg;
+    char *rep;
+    int  ret, code, code_min;
+    size_t bytes_sent;
+    struct timeval tv;
+    edg_wll_GssStatus gss_stat;
+
+    clear_error();
+
+    if(event_queue_get(eq, &msg) < 0) 
+      return(-1);
+
+    il_log(LOG_DEBUG, "    trying to deliver event at offset %d for job %s\n", msg->offset, msg->job_id_s);
+
+#ifdef LB_PERF
+    if(!nosend) {
+#endif
+        /* XXX: ljocha -- does it make sense to send empty messages ? */
+       if (msg->len) {
+           tv.tv_sec = TIMEOUT;
+           tv.tv_usec = 0;
+           ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
+           if(ret < 0) {
+                   if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && events_sent > 0) {
+                           eq->timeout = 0;
+                   }  else {
+                           il_log(LOG_ERR, "send_event: %s\n", error_get_msg());
+                           eq->timeout = TIMEOUT;
+                   }
+                   return(0);
+           }
+           if((code = get_reply(eq, &rep, &code_min)) < 0) {
+                   /* could not get the reply properly, so try again later */
+                   if (events_sent>0) 
+                           eq->timeout = 1;
+                   else {
+                           eq->timeout = TIMEOUT;
+                           il_log(LOG_ERR, "  error reading server %s reply:\n    %s\n", eq->dest_name, error_get_msg());
+                   }
+                   return(0);
+           }
+       }
+       else { code = 200; code_min = 0; rep = strdup("not sending empty message"); }
+#ifdef LB_PERF
+    } else {
+           glite_wll_perftest_consumeEventIlMsg(msg->msg+17);
+           code = 200;
+           rep = strdup("OK");
+    }
+#endif
+    
+    il_log(LOG_DEBUG, "    event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep);
+    free(rep);
+
+    /* the reply is back here, decide what to do with message */
+    /* HTTP error codes:
+       1xx - informational (eg. 100 Continue)
+       2xx - successful (eg. 200 OK)
+       3xx - redirection (eg. 301 Moved Permanently)
+       4xx - client error (eq. 400 Bad Request)
+       5xx - server error (eq. 500 Internal Server Error)
+    */
+    if(code >= 100 && code < 200) {
+
+           /* non fatal errors (for us), try to deliver later */
+           eq->timeout = TIMEOUT;
+           return(0);
+    }
+
+    /* the message was consumed (successfully or not) */
+    /* update the event pointer */
+    if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq)) < 0) 
+           /* failure committing message, this is bad */
+           return(-1);
+    
+    event_queue_remove(eq);
+    events_sent++;
+  } /* while */
+
+  return(1);
+
+} /* send_events */
+
+
+/* this is just not used */
+int
+send_confirmation(long lllid, int code)
+{
+       return 0;
+}
diff --git a/org.glite.lb.logger/src/server_msg_http.c b/org.glite.lb.logger/src/server_msg_http.c
new file mode 100644 (file)
index 0000000..8bd3623
--- /dev/null
@@ -0,0 +1,128 @@
+#ident "$Header$"
+
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+
+#include "interlogd.h"
+#include "glite/lb/il_msg.h" 
+#include "glite/lb/events_parse.h"
+#include "glite/lb/context.h"
+
+static
+int 
+create_msg(il_http_message_t *ev, char **buffer, long *receipt, time_t *expires)
+{
+  char *event = ev->data;
+
+  *receipt = 0;
+  *expires = 0;
+
+  *buffer = ev->data;
+  return ev->len;;
+}
+
+
+struct server_msg *
+server_msg_create(il_octet_string_t *event, long offset)
+{
+  struct server_msg *msg;
+
+  msg = malloc(sizeof(*msg));
+  if(msg == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "server_msg_create: out of memory allocating message");
+    return(NULL);
+  }
+
+  if(server_msg_init(msg, event) < 0) {
+    server_msg_free(msg);
+    return(NULL);
+  }
+  msg->offset = offset;
+
+  return(msg);
+}
+
+
+struct server_msg *
+server_msg_copy(struct server_msg *src)
+{
+  struct server_msg *msg;
+
+  msg = malloc(sizeof(*msg));
+  if(msg == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating message");
+    return(NULL);
+  }
+  
+  msg->msg = malloc(src->len);
+  if(msg->msg == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating server message");
+    server_msg_free(msg);
+    return(NULL);
+  }
+  msg->len = src->len;
+  memcpy(msg->msg, src->msg, src->len);
+
+  msg->job_id_s = strdup(src->job_id_s);
+  msg->ev_len = src->ev_len;
+  msg->es = src->es;
+  msg->receipt_to = src->receipt_to;
+  msg->offset = src->offset;
+#if defined(IL_NOTIFICATIONS)
+  msg->dest_name = strdup(src->dest_name);
+  msg->dest_port = src->dest_port;
+  msg->dest = strdup(src->dest);
+#endif
+  msg->expires = src->expires;
+  return(msg);
+}
+
+
+int
+server_msg_init(struct server_msg *msg, il_octet_string_t *event)
+{
+       il_http_message_t *hmsg = (il_http_message_t *)event;
+
+       assert(msg != NULL);
+       assert(event != NULL);
+
+       memset(msg, 0, sizeof(*msg));
+
+
+       msg->job_id_s = hmsg->host;
+       if(msg->job_id_s == NULL) {
+               set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "server_msg_init: error getting id");
+               return -1;
+       }
+       msg->len = create_msg(hmsg, &msg->msg, &msg->receipt_to, &msg->expires);
+       if(msg->len < 0)
+               return -1;
+       /* set this to indicate new data owner */
+       hmsg->data = NULL;
+       hmsg->host = NULL;
+       msg->ev_len = hmsg->len + 1; /* must add separator size too */
+       return 0;
+
+}
+
+
+int
+server_msg_is_priority(struct server_msg *msg)
+{
+  assert(msg != NULL);
+
+  return(msg->receipt_to != 0);
+}
+
+
+int
+server_msg_free(struct server_msg *msg)
+{
+  assert(msg != NULL);
+
+  if(msg->msg) free(msg->msg);
+  if(msg->job_id_s) free(msg->job_id_s);
+  free(msg);
+  return 0;
+}