Sprout from branch_RC31_3 2010-08-20 08:32:18 UTC Zdeněk Šustr <sustr4@cesnet.cz> 'New revision'
Cherrypick from master 2009-04-08 08:15:08 UTC Aleš Křenek <ljocha@ics.muni.cz> 'The most recent version copied. Do not modify this instance (RW in ./org.glite.lb).':
org.glite.lb.logger/configure
org.glite.lb.logger/project/package.description
org.glite.lb.logger/project/package.summary
org.glite.lb.logger/src-nt/Connection.cpp
org.glite.lb.logger/src-nt/EventManager.cpp
org.glite.lb.logger/src-nt/InputChannel.H
org.glite.lb.logger/src-nt/InputChannel.cpp
org.glite.lb.logger/src-nt/Message.H
org.glite.lb.logger/src-nt/MessageStore.H
org.glite.lb.logger/src-nt/MessageStore.cpp
org.glite.lb.logger/src-nt/Properties.H
org.glite.lb.logger/src-nt/Transport.cpp
org.glite.lb.logger/src/event_store_http.c
org.glite.lb.logger/src/http.c
org.glite.lb.logger/src/input_queue_socket_http.c
org.glite.lb.logger/src/queue_mgr_http.c
org.glite.lb.logger/src/send_event_http.c
org.glite.lb.logger/src/server_msg_http.c
--- /dev/null
+#!/usr/bin/perl
+
+# WARNING: Don't edit this file unless it is the master copy in org.glite.lb
+#
+# For the purpose of standalone builds of lb/jobid/lbjp-common components
+# it is copied on tagging
+
+# $Header$
+
+use Getopt::Long;
+
+my $pwd = `pwd`; chomp $pwd;
+my $prefix = $pwd.'/stage';
+my $stagedir;
+my $staged;
+my $module;
+my $thrflavour = 'gcc64dbgpthr';
+my $nothrflavour = 'gcc64dbg';
+my $mode = 'build';
+my $help = 0;
+my $listmodules;
+my $version;
+my $output;
+my $lb_tag = '';
+my $lbjp_tag = '';
+my $jp_tag = '';
+my $sec_tag = '';
+my $jobid_tag = '';
+
+my @nodes = qw/client server logger utils client-java doc ws-test db jpprimary jpindex jpclient/;
+my %enable_nodes;
+my %disable_nodes;
+
+my %extern_prefix = (
+ cares => '/opt/c-ares',
+ classads => '/opt/classads',
+ cppunit => '/usr',
+ expat => '/usr',
+ globus => '/opt/globus',
+ gsoap => '/usr',
+ mysql => '/usr',
+ 'mysql-devel' => '',
+ voms => '/opt/glite',
+ gridsite => '/opt/glite',
+ lcas => '/opt/glite',
+ ant => '/usr',
+ jdk => '/usr',
+ libtar => '/usr',
+);
+
+my %jar = (
+ 'commons-codec' => '/usr/share/java/commons-codec-1.3.jar',
+);
+
+
+my %glite_prefix;
+my %need_externs;
+my %need_externs_type;
+my %need_jars;
+my %extrafull;
+my %extranodmod;
+my %deps;
+my %deps_type;
+my %topbuild;
+
+my %lbmodules = (
+ 'lb' => [ qw/client client-java common doc logger server state-machine types utils ws-interface ws-test/],
+ 'security' => [qw/gss gsoap-plugin/],
+ 'lbjp-common' => [qw/db maildir server-bones trio jp-interface/],
+ 'jobid' => [qw/api-c api-cpp api-java/],
+ 'jp' => [ qw/client doc index primary server-common ws-interface/ ],
+ );
+
+
+my @opts = (
+ 'prefix=s' => \$prefix,
+ 'staged=s' => \$staged,
+ 'module=s' => \$module,
+ 'thrflavour=s' => \$thrflavour,
+ 'nothrflavour=s' => \$nothrflavour,
+ 'mode=s' => \$mode,
+ 'listmodules=s' => \$listmodules,
+ 'version=s' => \$version,
+ 'output=s' => \$output,
+ 'stage=s' => \$stagedir,
+ 'lb-tag=s' => \$lb_tag,
+ 'lbjp-common-tag=s' => \$lbjp_tag,
+ 'jp-tag=s' => \$jp_tag,
+ 'security-tag=s' => \$sec_tag,
+ 'jobid-tag=s' => \$jobid_tag,
+ 'help' => \$help,
+);
+
+for (@nodes) {
+ $enable_nodes{$_} = 0;
+ $disable_nodes{$_} = 0;
+
+ push @opts,"disable-$_",\$disable_nodes{$_};
+ push @opts,"enable-$_",\$enable_nodes{$_};
+}
+
+push @opts,"with-$_=s",\$extern_prefix{$_} for keys %extern_prefix;
+push @opts,"with-$_=s",\$jar{$_} for keys %jar;
+
+my @keeparg = @ARGV;
+
+GetOptions @opts or die "Errors parsing command line\n";
+
+$extern_prefix{'mysql-devel'}=$extern_prefix{mysql} if $extern_prefix{'mysql-devel'} eq '';
+
+if ($help) { usage(); exit 0; }
+
+if ($listmodules) {
+ my @m = map "org.glite.$listmodules.$_",@{$lbmodules{$listmodules}};
+ print "@m\n";
+ exit 0;
+}
+
+warn "$0: --version and --output make sense only in --mode=etics\n"
+ if ($version || $output) && $mode ne 'etics';
+
+my $en;
+for (keys %enable_nodes) { $en = 1 if $enable_nodes{$_}; }
+
+my $dis;
+for (keys %disable_nodes) { $dis = 1 if $disable_nodes{$_}; }
+
+die "--enable-* and --disable-* are mutually exclusive\n"
+ if $en && $dis;
+
+die "--module cannot be used with --enable-* or --disable-*\n"
+ if $module && ($en || $dis);
+
+die "$module: unknown module\n" if $module && ! grep $module,@{$lbmodules{lb}},@{$lbmodules{security}},{$lbmodules{jp}};
+
+if ($dis) {
+ for (@nodes) {
+ $enable_nodes{$_} = 1 unless $disable_nodes{$_};
+ }
+}
+
+if (!$en && !$dis) { $enable_nodes{$_} = 1 for (@nodes) } ;
+
+for (keys %enable_nodes) { delete $enable_nodes{$_} unless $enable_nodes{$_}; }
+
+$stagedir = $prefix unless $stagedir;
+
+if ($mode eq 'build') {
+ print "Writing config.status\n";
+ open CONF,">config.status" or die "config.status: $!\n";
+ print CONF "$0 @keeparg\n";
+ close CONF;
+}
+
+
+my @modules;
+my %aux;
+
+if ($module) {
+# push @modules,split(/[,.]+/,$module);
+ push @modules,$module;
+}
+else {
+ @modules = map(($extranodmod{$_} ? $extranodmod{$_} : 'lb.'.$_),(keys %enable_nodes));
+
+ my $n;
+
+ do {
+ local $"="\n";
+ $n = $#modules;
+ push @modules,(map @{$deps{$_}},@modules);
+
+ undef %aux; @aux{@modules} = (1) x ($#modules+1);
+ @modules = keys %aux;
+ } while ($#modules > $n);
+}
+
+@aux{@modules} = (1) x ($#modules+1);
+delete $aux{$_} for (split /,/,$staged);
+@modules = keys %aux;
+
+mode_build() if $mode eq 'build';
+mode_checkout() if $mode eq 'checkout';
+mode_etics($module) if $mode eq 'etics';
+
+sub mode_build {
+ print "\nBuilding modules: @modules\n";
+
+ my @ext = map @{$need_externs{$_}},@modules;
+ my @myjars = map @{$need_jars{$_}},@modules;
+ undef %aux; @aux{@ext} = 1;
+ @ext = keys %aux;
+ undef %aux; @aux{@myjars} = (1) x ($#myjars+1);
+ @myjars = keys %aux;
+
+ print "\nRequired externals:\n";
+ print "\t$_: $extern_prefix{$_}\n" for @ext;
+ print "\t$_: $jar{$_}\n" for @myjars;
+ print "\nThis is a poor-man configure, it's up to you to have sources and externals there\n\n";
+
+ mkinc($_) for @modules;
+
+ print "Creating Makefile\n";
+
+ open MAK,">Makefile" or die "Makefile: $!\n";
+
+ print MAK "all: @modules\n\nclean:\n";
+
+ for (@modules) {
+ my $full = full($_);
+ my $build = $topbuild{$_} ? '': '/build';
+ print MAK "\tcd $full$build && \${MAKE} clean\n"
+ }
+
+ print MAK "\ndistclean:\n";
+
+ for (@modules) {
+ my $full = full($_);
+ print MAK $topbuild{$_} ?
+ "\tcd $full$build && \${MAKE} distclean\n" :
+ "\trm -rf $full$build\n"
+ }
+
+ print MAK "\n";
+
+ for (@modules) {
+ my %ldeps; undef %ldeps;
+ @ldeps{@{$deps{$_}}} = 1;
+ for my $x (split /,/,$staged) { delete $ldeps{$x}; }
+ my @dnames = $module ? () : keys %ldeps;
+
+ my $full = full($_);
+ my $build = $topbuild{$_} ? '': '/build';
+
+ print MAK "$_: @dnames\n\tcd $full$build && \${MAKE} && \${MAKE} install\n\n";
+ }
+
+ close MAK;
+}
+
+sub mode_checkout() {
+ for (@modules) {
+ my $module = $_;
+ my $tag = "";
+ if ($lb_tag){
+ for (@{$lbmodules{lb}}){
+ if ("lb.".$_ eq $module){
+ $tag = '-r '.$lb_tag;
+ }
+ }
+ }
+ if ($lbjp_tag){
+ for (@{$lbmodules{'lbjp-common'}}){
+ if ("lbjp-common.".$_ eq $module){
+ $tag = '-r '.$lbjp_tag;
+ }
+ }
+ }
+ if ($jp_tag){
+ for (@{$lbmodules{'jp'}}){
+ if ("jp.".$_ eq $module){
+ $tag = '-r '.$jp_tag;
+ }
+ }
+ }
+ if ($sec_tag){
+ for (@{$lbmodules{security}}){
+ if ("security.".$_ eq $module){
+ $tag = '-r '.$sec_tag;
+ }
+ }
+ }
+ if ($jobid_tag){
+ for (@{$lbmodules{jobid}}){
+ if ("jobid.".$_ eq $module){
+ $tag = '-r '.$jobid_tag;
+ }
+ }
+ }
+ #if (grep {"lb.".$_ eq $module} @{$lbmodules{lb}}){
+ # print "found";
+ #}
+ $_ = full($_);
+ print "\n*** Checking out $_\n";
+ system("cvs checkout $tag $_") == 0 or die "cvs checkout $tag $_: $?\n";
+ }
+}
+
+BEGIN{
+%need_externs_aux = (
+ 'lb.client' => [ qw/cppunit:B classads/ ],
+ 'lb.client-java' => [ qw/ant:B/ ],
+ 'lb.common' => [ qw/expat cppunit:B classads/ ],
+ 'lb.doc' => [],
+ 'lb.logger' => [ qw/cppunit:B/ ],
+ 'lb.server' => [ qw/globus_essentials:R globus:B expat cares mysql cppunit:B gsoap:B classads voms lcas gridsite/ ],
+ 'lb.state-machine' => [ qw/classads/ ],
+ 'lb.utils' => [ qw/cppunit:B/ ],
+ 'lb.ws-interface' => [],
+ 'lb.ws-test' => [ qw/gsoap:B/ ],
+ 'lb.types' => [ qw// ],
+ 'lbjp-common.db' => [ qw/mysql:R mysql-devel:B/ ],
+ 'lbjp-common.maildir' => [ qw// ],
+ 'lbjp-common.server-bones' => [ qw// ],
+ 'lbjp-common.trio' => [ qw/cppunit:B/ ],
+ 'lbjp-common.jp-interface' => [ qw/cppunit:B/ ],
+ 'security.gss' => [ qw/globus_essentials:R globus:B cares cppunit:B/ ],
+ 'security.gsoap-plugin' => [ qw/cppunit:B globus_essentials:R globus:B cares gsoap:B/ ],
+ 'jobid.api-c' => [ qw/cppunit:B/ ],
+ 'jobid.api-cpp' => [ qw/cppunit:B/ ],
+ 'jobid.api-java' => [ qw/ant:B jdk:B/ ],
+ 'jp.client' => [ qw/gsoap libtar globus_essentials:R globus:B/ ],
+ 'jp.doc' => [],
+ 'jp.index' => [ qw/gsoap globus_essentials:R globus:B/ ],
+ 'jp.primary' => [ qw/classads gsoap libtar globus_essentials:R globus:B/ ],
+ 'jp.server-common' => [],
+ 'jp.ws-interface' => [],
+);
+
+for my $ext (keys %need_externs_aux) {
+ for (@{$need_externs_aux{$ext}}) {
+ /([^:]*)(?::(.*))?/;
+ push @{$need_externs{$ext}},$1;
+ my $type = $2 ? $2 : 'BR';
+ $need_externs_type{$ext}->{$1} = $type;
+ }
+}
+
+%need_jars = (
+ 'jobid.api-java' => [ qw/commons-codec/ ],
+);
+
+for my $jar (keys %need_jars) {
+ for (@{$need_jars{$jar}}) {
+ $need_externs_type{$jar}->{$_} = 'BR'; # XXX
+ }
+}
+
+%deps_aux = (
+ 'lb.client' => [ qw/
+ lb.types:B lb.common
+ lbjp-common.trio
+ jobid.api-cpp jobid.api-c
+ security.gss
+ / ],
+ 'lb.client-java' => [ qw/
+ lb.types:B
+ jobid.api-java
+ / ],
+ 'lb.common' => [ qw/
+ jobid.api-cpp jobid.api-c
+ lb.types:B lbjp-common.trio security.gss
+ / ],
+ 'lb.doc' => [ qw/lb.types:B/ ],
+ 'lb.logger' => [ qw/
+ lbjp-common.trio
+ jobid.api-c
+ lb.common
+ security.gss
+ / ],
+ 'lb.server' => [ qw/
+ lb.ws-interface lb.types:B lb.common lb.state-machine
+ lbjp-common.db lbjp-common.server-bones lbjp-common.trio lbjp-common.maildir
+ jobid.api-c
+ security.gsoap-plugin security.gss
+ / ],
+ 'lb.state-machine' => [ qw/lb.common lbjp-common.jp-interface security.gss/ ],
+ 'lb.utils' => [ qw/
+ lbjp-common.jp-interface
+ jobid.api-c
+ lbjp-common.trio lbjp-common.maildir
+ lb.client lb.state-machine
+ / ],
+ 'lb.ws-test' => [ qw/security.gsoap-plugin lb.ws-interface/ ],
+ 'lb.ws-interface' => [ qw/lb.types:B/ ],
+ 'lb.types' => [ qw// ],
+ 'lbjp-common.db' => [ qw/lbjp-common.trio/ ],
+ 'lbjp-common.maildir' => [ qw// ],
+ 'lbjp-common.server-bones' => [ qw// ],
+ 'lbjp-common.trio' => [ qw// ],
+ 'security.gss' => [ qw// ],
+ 'security.gsoap-plugin' => [ qw/security.gss/ ],
+ 'jobid.api-c' => [ qw// ],
+ 'jobid.api-cpp' => [ qw/jobid.api-c/ ],
+ 'jobid.api-java' => [ qw// ],
+
+ 'lbjp-common.jp-interface' => [ qw/lbjp-common.db jobid.api-c/ ],
+
+ 'jp.client' => [ qw/
+ jp.ws-interface
+ lbjp-common.jp-interface lbjp-common.maildir
+ jobid.api-c
+ security.gsoap-plugin
+ / ],
+ 'jp.doc' => [ qw// ],
+ 'jp.index' => [ qw/
+ jp.server-common jp.ws-interface
+ lbjp-common.jp-interface lbjp-common.trio lbjp-common.db lbjp-common.server-bones
+ security.gsoap-plugin
+ / ],
+ 'jp.primary' => [ qw/
+ jobid.api-c
+ jp.server-common jp.ws-interface
+ lb.state-machine
+ lbjp-common.jp-interface lbjp-common.trio lbjp-common.db lbjp-common.server-bones
+ security.gsoap-plugin
+ / ],
+ 'jp.server-common' => [ qw/
+ lbjp-common.jp-interface lbjp-common.db
+ / ],
+ 'jp.ws-interface' => [ qw// ],
+);
+
+for my $ext (keys %deps_aux) {
+ for (@{$deps_aux{$ext}}) {
+ /([^:]*)(?::(.*))?/;
+ push @{$deps{$ext}},$1;
+ my $type = $2 ? $2 : 'BR';
+ $deps_type{$ext}->{$1} = $type;
+ }
+}
+
+
+%extrafull = ( gridsite=>'org.gridsite.core');
+
+#( java => 'client-java' );
+%extranodmod = (
+ db => 'lbjp-common.db',
+ jpprimary => 'jp.primary',
+ jpindex => 'jp.index',
+ jpclient => 'jp.client',
+);
+
+my @t = qw/lb.client-java jobid.api-java lb.types/;
+@topbuild{@t} = (1) x ($#t+1);
+}
+
+sub full
+{
+ my $short = shift;
+ return $extrafull{$short} ? $extrafull{$short} : 'org.glite.'.$short;
+}
+
+sub mkinc
+{
+ my %aux;
+ undef %aux;
+ my @m=qw/
+lb.client lb.doc lb.state-machine lb.ws-interface lb.logger lb.types lb.common lb.server lb.utils lb.ws-test lb.client-java
+security.gss security.gsoap-plugin
+jobid.api-c jobid.api-cpp jobid.api-java
+lbjp-common.db lbjp-common.maildir lbjp-common.server-bones lbjp-common.trio lbjp-common.jp-interface
+jp.client jp.doc jp.index jp.primary jp.server-common jp.ws-interface
+/;
+ @aux{@m} = (1) x ($#m+1);
+
+ my $short = shift;
+ my $full = full $short;
+
+ unless ($aux{$short}) {
+ print "Makefile.inc not needed in $full\n";
+ return;
+ }
+
+ my $build = '';
+
+ unless ($topbuild{$_}) {
+ $build = '/build';
+ unless (-d "$full/build") {
+ mkdir "$full/build" or die "mkdir $full/build: $!\n";
+ }
+ unlink "$full/build/Makefile";
+ symlink "../Makefile","$full/build/Makefile" or die "symlink ../Makefile $full/build/Makefile: $!\n";
+ }
+
+ open MKINC,">$full$build/Makefile.inc"
+ or die "$full$build/Makefile.inc: $!\n";
+
+ print "Creating $full$build/Makefile.inc\n";
+
+ print MKINC qq{
+PREFIX = $prefix
+stagedir = $stagedir
+thrflavour = $thrflavour
+nothrflavour = $nothrflavour
+};
+
+ for (@{$need_externs{$short}}) {
+ print MKINC "${_}_prefix = $extern_prefix{$_}\n"
+ }
+
+ for (@{$need_jars{$short}}) {
+ print MKINC "${_}_jar = $jar{$_}\n"
+ }
+
+ my $need_gsoap = 0;
+ for (@{$need_externs{$short}}) { $need_gsoap = 1 if $_ eq 'gsoap'; }
+
+ print MKINC "gsoap_default_version=".gsoap_version()."\n" if $need_gsoap;
+
+ close MKINC;
+}
+
+my %etics_externs;
+my %etics_projects;
+BEGIN{
+ %etics_externs = (
+ globus_essentials=>'vdt_globus_essentials',
+ globus=>'globus',
+ cares=>'c-ares',
+ voms=>'org.glite.security.voms-api-cpp',
+ gridsite=>'org.gridsite.shared',
+ lcas=>'org.glite.security.lcas',
+ );
+ %etics_projects = (
+ vdt=>[qw/globus globus_essentials/],
+ 'org.glite'=>[qw/voms gridsite lcas/],
+ );
+};
+
+sub mode_etics {
+ $fmod = shift;
+
+ die "$0: --module required with --etics\n" unless $fmod;
+
+ my ($subsys,$module) = split /\./,$fmod;
+
+ my ($major,$minor,$rev,$age);
+
+ if ($version) {
+ $version =~ /([[:digit:]]+)\.([[:digit:]]+)\.([[:digit:]]+)-(.+)/;
+ ($major,$minor,$rev,$age) = ($1,$2,$3,$4);
+ }
+ else {
+ open V,"org.glite.$subsys.$module/project/version.properties"
+ or die "org.glite.$subsys.$module/project/version.properties: $!\n";
+
+ while ($_ = <V>) {
+ chomp;
+ ($major,$minor,$rev) = ($1,$2,$3) if /module\.version\s*=\s*([[:digit:]]+)\.([[:digit:]]+)\.([[:digit:]]+)/;
+ $age = $1 if /module\.age\s*=\s*([[:digit:]]+)/;
+ }
+ close V;
+ }
+
+ my @copts = ();
+ my %ge;
+ @ge{@{$etics_projects{'org.glite'}}} = (1) x ($#{$etics_projects{'org.glite'}}+1);
+
+ for (@{$need_externs{"$subsys.$module"}}) {
+ if ($need_externs_type{"$subsys.$module"}->{$_}=~/B/) {
+ my $eext = $etics_externs{$_} ? $etics_externs{$_} : $_;
+ push @copts,$ge{$_} ? "--with-$_=\${stageDir}" : "--with-$_=\${$eext.location}";
+ }
+ }
+
+ for (@{$need_jars{"$subsys.$module"}}) {
+ my $eext = $etics_externs{$_} ? $etics_externs{$_} : $_;
+
+ push @copts,"--with-$_ \${$eext.location}/$_*.jar";
+ }
+
+
+ my $conf = "glite-$subsys-${module}_R_${major}_${minor}_${rev}_${age}";
+ my $file = $output ? $output : "$conf.ini";
+ open C,">$file" or die "$file: $!\n";
+
+ my $buildroot = $topbuild{"$subsys.$module"} ? '' : "build.root = build\n";
+
+ my $confdir = $topbuild{"$subsys.$module"} ? '..' : '../..';
+
+ print STDERR "Writing $file\n";
+ print C qq{
+[Configuration-$conf]
+profile = None
+moduleName = org.glite.$subsys.$module
+displayName = $conf
+description = org.glite.$subsys.$module
+projectName = org.glite
+age = $age
+deploymentType = None
+tag = $conf
+version = $major.$minor.$rev
+path = \${projectName}/\${moduleName}/\${version}/\${platformName}/\${packageName}-\${version}-\${age}.tar.gz
+
+[Platform-default:VcsCommand]
+displayName = None
+description = None
+tag = cvs -d \${vcsroot} tag -R \${tag} \${moduleName}
+branch = None
+commit = None
+checkout = cvs -d \${vcsroot} co -r \${tag} \${moduleName}
+
+[Platform-default:BuildCommand]
+postpublish = None
+packaging = None
+displayName = None
+description = None
+doc = None
+prepublish = None
+publish = None
+compile = make
+init = None
+install = make install
+clean = make clean
+test = make check
+configure = cd $confdir && \${moduleName}/configure --thrflavour=\${globus.thr.flavor} --nothrflavour=\${globus.nothr.flavor} --prefix=\${prefix} --stage=\${stageDir} --module $subsys.$module @copts
+checkstyle = None
+
+[Platform-default:Property]
+$buildroot
+
+[Platform-default:DynamicDependency]
+
+};
+ for (@{$need_externs{"$subsys.$module"}},@{$need_jars{"$subsys.$module"}}) {
+ my $eext = $etics_externs{$_} ? $etics_externs{$_} : $_;
+
+ my $proj = 'externals';
+ for my $p (keys %etics_projects) {
+ for $m (@{$etics_projects{$p}}) {
+ $proj = $p if $m eq $_;
+ }
+ }
+
+ my $type = $need_externs_type{"$subsys.$module"}->{$_};
+ print C "$proj|$eext = $type\n";
+ }
+
+ for (@{$deps{"$subsys.$module"}}) {
+ my $type = $deps_type{"$subsys.$module"}->{$_};
+ print C "org.glite|org.glite.$_ = $type\n";
+ }
+
+ close C;
+}
+
+sub gsoap_version {
+ local $_;
+ my $gsoap_version;
+ open S,"$extern_prefix{gsoap}/bin/soapcpp2 -v 2>&1 |" or die "$extern_prefix{gsoap}/bin/soapcpp2: $!\n";
+
+ while ($_ = <S>) {
+ chomp;
+
+ $gsoap_version = $1 if /The gSOAP Stub and Skeleton Compiler for C and C\+\+ ([.[:digit:][:alpha:]]+)$/;
+ }
+ close S;
+ return $gsoap_version;
+}
+
+
+sub usage {
+ my @ext = keys %extern_prefix;
+ my @myjars, keys %jar;
+
+ print STDERR qq{
+usage: $0 options
+
+General options (defaults in []):
+ --prefix=PREFIX destination directory [./stage]
+ --staged=module,module,... what is already in PREFIX (specify without org.glite.)
+ --thrflavour=flavour
+ --nothrflavour=flavour threaded and non-treaded flavours [gcc64dbgpthr,gcc64dbg]
+ --listmodules=subsys list modules of a subsystem
+
+Mode of operation:
+ --mode={checkout|build|etics} what to do [build]
+
+What to build:
+ --module=module build this module only (mostly in-Etics operation)
+ --enable-NODE build this "node" (set of modules) only. Available nodes are
+ @{$lbmodules{lb}},@{$lbmodules{security}}
+ --disable-NODE don't build this node
+ --lb-tag=tag checkout LB modules with specific tag
+ --jp-tag=tag checkout JP modules with specific tag
+ --lbjp-common-tag=tag checkout lbjp-common modules with specific tag
+ --security-tag=tag checkout security modules with specific tag
+ --jobid-tag=tag checkout jobid modules with specific tag
+
+Dependencies:
+ --with-EXTERNAL=PATH where to look for an external. Required externals
+ (not all for all modules) are:
+ @ext
+ --with-JAR=JAR where to look for jars. Required jars are:
+ @myjars
+ Summary of what will be used is always printed
+
+};
+
+}
--- /dev/null
+glite-lb-logger is the gLite LB local-logger and inter-logger. This package contains the local-logger (glite-lb-logd), inter-logger (glite-lb-interlogd) and notification inter-logger (glite-lb-notif-interlogd) daemons.
--- /dev/null
+gLite Logging and Bookkeeping local-logger and inter-logger
--- /dev/null
+#include "Connection.H"
+
+Connection::Factory::~Factory() {
+}
--- /dev/null
+#include "EventManager.H"
+
+int
+EventManager::postEvent(Event* &e)
+{
+ for(std::list<EventHandler*>::iterator i = handlers.begin();
+ i != handlers.end();
+ i++) {
+ (*i)->handleEvent(e);
+ }
+ return 0;
+}
+
+void
+EventManager::addHandler(EventHandler *handler)
+{
+ handlers.push_back(handler);
+}
+
+void
+EventManager::removeHandler(EventHandler *handler)
+{
+}
--- /dev/null
+#ifndef _INPUT_CHANNEL_H_
+#define _INPUT_CHANNEL_H_
+
+#include "ThreadPool.H"
+#include "Connection.H"
+#include "Transport.H"
+
+class InputChannel
+ : public ThreadPool::WorkDescription {
+public:
+
+ InputChannel(Connection *conn, Transport *trans)
+ : ThreadPool::WorkDescription(conn->getFD()),
+ m_connection(conn), m_transport(trans)
+ {}
+
+ void start();
+
+protected:
+ virtual void onReady();
+ virtual void onTimeout();
+ virtual void onError();
+
+private:
+ Connection *m_connection;
+ Transport *m_transport;
+};
+
+#endif
--- /dev/null
+#include "InputChannel.H"
+#include "ThreadPool.H"
+#include "EventManager.H"
+
+extern EventManager theEventManager;
+
+void
+InputChannel::start()
+{
+ ThreadPool::instance()->queueWorkRead(this);
+}
+
+void
+InputChannel::onReady()
+{
+ Transport::Message *msg = NULL;
+ int ret = m_transport->receive(m_connection, msg);
+ if(ret <= 0) {
+ // no new data read
+ } else if(msg) {
+ // we have a new message
+
+ } else {
+ // still need more data
+ ThreadPool::instance()->queueWorkRead(this);
+ }
+}
+
+void
+InputChannel::onTimeout()
+{
+}
+
+void
+InputChannel::onError()
+{
+}
--- /dev/null
+#ifndef _MESSAGE_H_
+#define _MESSAGE_H
+
+#include "Properties.H"
+#include "MessageStore.H"
+
+#include <string>
+
+class Message: public MessageStore::Storable {
+public:
+
+ /** class that holds message state
+ *
+ */
+ class State : public MessageStore::Storable {
+ public:
+
+ /** Get size needed for storage (from Storable).
+ */
+ virtual int getStorageSize() const;
+
+ /** Save State (from Storable)
+ */
+ virtual int save(void* data, int len) const;
+
+ /** Load State (from Storable)
+ */
+ virtual int load(void* data, int len);
+ };
+
+
+ Message();
+
+ Message(void * data, unsigned int length)
+ : m_length(length),
+ m_data(data)
+ {}
+
+
+ int getContent(void* &data) const
+ { data = m_data; return m_length; }
+
+ int getContentLength() const
+ { return m_length; }
+
+ std::string getProperty(const std::string &name, std::string &val)
+ { return m_properties.getProperty(name); }
+
+ void setProperty(const std::string &name, std::string &val)
+ { m_properties.setProperty(name, val); }
+
+ Properties& getProperties()
+ { return m_properties; }
+
+ void setProperties(Properties &)
+ {}
+
+private:
+ MessageStore::ID m_id;
+ unsigned int m_length;
+ void * m_data;
+ Properties m_properties;
+};
+
+
+#endif
--- /dev/null
+#ifndef _MESSAGE_STORE_H_
+#define _MESSAGE_STORE_H_
+
+#include <pthread.h>
+
+/** Permanent storage for messages and their states.
+ */
+
+class MessageStore {
+public:
+
+ /** Base class for everything that can be stored here.
+ */
+ class Storable {
+ public:
+ /** Get size needed for object storage.
+ */
+ virtual int getStorageSize() const = 0;
+
+ /** Save state of object into binary data.
+ */
+ virtual int save(void* data, int len) const = 0;
+
+ /** Load state of object from binary data.
+ */
+ virtual int load(void* data, int len) = 0;
+
+ virtual ~Storable() {}
+ };
+
+
+ /** Class that uniquely identifies stored content.
+ */
+ class ID: public Storable {
+ public:
+ /** Default constructor.
+ *
+ * Creates new unique ID.
+ */
+ ID();
+
+ /** Copy constructor.
+ */
+ ID(const ID& src);
+
+ /** Destructor.
+ */
+ ~ID() {};
+
+ /** Assignment operator.
+ */
+ ID& operator=(const ID& src);
+
+ /** Return the string suitable for printing.
+ */
+ std::string toString() const;
+
+ /** Comparison operator
+ */
+ int operator==(const ID& second);
+
+ /** Get size needed for storage (from Storable).
+ */
+ virtual int getStorageSize() const;
+
+ /** Save ID (from Storable)
+ */
+ virtual int save(void* data, int len) const;
+
+ /** Load ID (from Storable)
+ */
+ virtual int load(void* data, int len);
+
+ protected:
+ unsigned long long getID() {return id;}
+
+ private:
+ static pthread_mutex_t counterLock;
+ static unsigned counter;
+ unsigned long long id;
+ };
+};
+
+#endif
--- /dev/null
+#include <pthread.h>
+#include <sys/time.h>
+#include <sstream>
+
+#include "MessageStore.H"
+
+pthread_mutex_t MessageStore::ID::counterLock = PTHREAD_MUTEX_INITIALIZER;
+unsigned MessageStore::ID::counter = 0;
+
+MessageStore::ID::ID(){
+ time_t t;
+ time(&t);
+ pthread_mutex_lock(&counterLock);
+ counter++;
+ id = ((unsigned long long) counter << 32) + t;
+ pthread_mutex_unlock(&counterLock);
+}
+
+std::string MessageStore::ID::toString() const{
+ std::ostringstream oss;
+ oss << id;
+ return oss.str();
+}
+
--- /dev/null
+#ifndef _PROPERTIES_H_
+#define _PROPERTIES_H_
+
+#include <map>
+#include <string>
+
+class Properties {
+public:
+
+ // default constructor
+ Properties()
+ : properties()
+ {}
+
+ // accessors
+ std::string& getProperty(const std::string &key)
+ { return properties[key]; }
+
+ void setProperty(const std::string &key, std::string &val)
+ { properties[key] = val; }
+
+ // iterators
+ typedef std::map<std::string,std::string>::iterator iterator;
+
+ iterator begin()
+ { return properties.begin(); }
+
+ iterator end()
+ { return properties.end(); }
+
+
+private:
+ std::map<std::string,std::string> properties;
+};
+
+#endif
--- /dev/null
+#include "Transport.H"
+
+Transport::~Transport()
+{
+}
--- /dev/null
+#ident "$Header$"
+
+#include <assert.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <fcntl.h>
+#include <sys/param.h>
+
+#include "glite/lb/events_parse.h"
+
+#include "interlogd.h"
+
+#ifdef __GNUC__
+#define UNUSED_VAR __attribute__((unused))
+#else
+#define UNUSED_VAR
+#endif
+
+static char *file_prefix = NULL;
+
+
+struct event_store_list {
+ struct event_store *es;
+ struct event_store_list *next;
+};
+
+
+static struct event_store_list *store_list;
+static pthread_rwlock_t store_list_lock = PTHREAD_RWLOCK_INITIALIZER;
+
+
+/* ----------------
+ * helper functions
+ * ----------------
+ */
+static
+char *
+jobid2eventfile(const char *job_id_s)
+{
+ char *buffer;
+
+ if(job_id_s) {
+ asprintf(&buffer, "%s.%s", file_prefix, job_id_s);
+ } else
+ asprintf(&buffer, "%s.default", file_prefix);
+
+ return(buffer);
+}
+
+
+static
+char *
+jobid2controlfile(char *job_id_s)
+{
+ char *buffer;
+ char *hash;
+
+ if(job_id_s) {
+ asprintf(&buffer, "%s.%s.ctl", file_prefix, job_id_s);
+ } else
+ asprintf(&buffer, "%s.default.ctl", file_prefix);
+
+ return(buffer);
+}
+
+static
+int
+file_reader(void *user_data, char *buffer, const int len)
+{
+ size_t ret = 0;
+
+ if(len > 0) {
+ ret = fread(buffer, 1, len, (FILE*)user_data);
+ if(ret == 0 && ferror((FILE*)user_data)) {
+ return -1;
+ }
+ }
+ return ret;
+}
+
+
+static
+int
+read_event_string(FILE *file, il_http_message_t *msg)
+{
+ int len, ret;
+ int fd = fileno(file);
+ long start;
+
+ /* remember the start position */
+ start = ftell(file);
+ ret = receive_http(file, file_reader, msg);
+ if(ret < 0) return ret;
+ /* seek at the end of message in case the reader read ahead */
+ len = fseek(file, start + msg->len, SEEK_SET);
+ len = fgetc(file);
+ if(len != '\n') {
+ il_log(LOG_ERR, "error reading event from file, missing terminator character at %d, found %c(%d))\n",
+ start+msg->len, len, len);
+ if(msg->data) { free(msg->data); msg->data = NULL; }
+ if(msg->host) { free(msg->host); msg->host = NULL; }
+ return EINVAL;
+ }
+ return ret;
+}
+
+
+
+/* ------------------------------
+ * event_store 'member' functions
+ * ------------------------------
+ */
+static
+int
+event_store_free(struct event_store *es)
+{
+ assert(es != NULL);
+
+ if(es->job_id_s) free(es->job_id_s);
+ if(es->event_file_name) free(es->event_file_name);
+ if(es->control_file_name) free(es->control_file_name);
+ pthread_rwlock_destroy(&es->use_lock);
+ pthread_rwlock_destroy(&es->commit_lock);
+ free(es);
+
+ return(0);
+}
+
+
+static
+struct event_store *
+event_store_create(char *job_id_s)
+{
+ struct event_store *es;
+
+ es = malloc(sizeof(*es));
+ if(es == NULL) {
+ set_error(IL_NOMEM, ENOMEM, "event_store_create: error allocating room for structure");
+ return(NULL);
+ }
+
+ memset(es, 0, sizeof(*es));
+
+ il_log(LOG_DEBUG, " creating event store for id %s\n", job_id_s);
+
+ es->job_id_s = strdup(job_id_s);
+ es->event_file_name = jobid2eventfile(job_id_s);
+ es->control_file_name = jobid2controlfile(job_id_s);
+
+ if(pthread_rwlock_init(&es->commit_lock, NULL))
+ abort();
+ if(pthread_rwlock_init(&es->use_lock, NULL))
+ abort();
+
+ return(es);
+}
+
+
+static
+int
+event_store_lock_ro(struct event_store *es)
+{
+ assert(es != NULL);
+
+ if(pthread_rwlock_rdlock(&es->commit_lock))
+ abort();
+
+ return(0);
+}
+
+
+static
+int
+event_store_lock(struct event_store *es)
+{
+ assert(es != NULL);
+
+ if(pthread_rwlock_wrlock(&es->commit_lock))
+ abort();
+
+ return(0);
+}
+
+
+static
+int
+event_store_unlock(struct event_store *es)
+{
+ assert(es != NULL);
+
+ if(pthread_rwlock_unlock(&es->commit_lock))
+ abort();
+ return(0);
+}
+
+
+static
+int
+event_store_read_ctl(struct event_store *es)
+{
+ FILE *ctl_file;
+
+ assert(es != NULL);
+
+ event_store_lock(es);
+ if((ctl_file = fopen(es->control_file_name, "r")) == NULL) {
+ /* no control file, new event file */
+ es->last_committed_ls = 0;
+ es->last_committed_bs = 0;
+ } else {
+ /* read last seen and last committed counts */
+ fscanf(ctl_file, "%*s\n%ld\n%ld\n",
+ &es->last_committed_ls,
+ &es->last_committed_bs);
+ fclose(ctl_file);
+ }
+ event_store_unlock(es);
+
+ return(0);
+}
+
+
+static
+int
+event_store_write_ctl(struct event_store *es)
+{
+ FILE *ctl;
+
+ assert(es != NULL);
+
+ ctl = fopen(es->control_file_name, "w");
+ if(ctl == NULL) {
+ set_error(IL_SYS, errno, "event_store_write_ctl: error opening control file");
+ return(-1);
+ }
+
+ if(fprintf(ctl, "%s\n%ld\n%ld\n",
+ es->job_id_s,
+ es->last_committed_ls,
+ es->last_committed_bs) < 0) {
+ set_error(IL_SYS, errno, "event_store_write_ctl: error writing control record");
+ return(-1);
+ }
+
+ if(fclose(ctl) < 0) {
+ set_error(IL_SYS, errno, "event_store_write_ctl: error closing control file");
+ return(-1);
+ }
+
+ return(0);
+}
+
+
+/*
+ * event_store_qurantine()
+ * - rename damaged event store file
+ * - essentially does the same actions as cleanup, but the event store
+ * does not have to be empty
+ * returns 0 on success, -1 on error
+ */
+static
+int
+event_store_quarantine(struct event_store *es)
+{
+ int num;
+ char newname[MAXPATHLEN+1];
+
+ /* find available qurantine name */
+ /* we give it at most 1024 tries */
+ for(num = 0; num < 1024; num++) {
+ struct stat st;
+
+ snprintf(newname, MAXPATHLEN, "%s.quarantine.%d", es->event_file_name, num);
+ newname[MAXPATHLEN] = 0;
+ if(stat(newname, &st) < 0) {
+ if(errno == ENOENT) {
+ /* file not found */
+ break;
+ } else {
+ /* some other error with name, probably permanent */
+ set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename");
+ return(-1);
+
+ }
+ } else {
+ /* the filename is used already */
+ }
+ }
+ if(num >= 1024) {
+ /* new name not found */
+ /* XXX - is there more suitable error? */
+ set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename");
+ return(-1);
+ }
+
+ /* actually rename the file */
+ il_log(LOG_DEBUG, " renaming damaged event file from %s to %s\n",
+ es->event_file_name, newname);
+ if(rename(es->event_file_name, newname) < 0) {
+ set_error(IL_SYS, errno, "event_store_quarantine: error renaming event file");
+ return(-1);
+ }
+
+ /* clear the counters */
+ es->last_committed_ls = 0;
+ es->last_committed_bs = 0;
+ es->offset = 0;
+
+ return(0);
+}
+
+
+/*
+ * event_store_recover()
+ * - recover after restart or catch up when events missing in IPC
+ * - if offset > 0, read everything behind it
+ * - if offset == 0, read everything behind min(last_committed_bs, last_committed_es)
+ */
+int
+event_store_recover(struct event_store *es)
+{
+ struct event_queue *eq_l = NULL, *eq_b = NULL;
+ struct server_msg *msg;
+ il_http_message_t hmsg;
+ char *event_s;
+ int fd, ret;
+ long last;
+ FILE *ef;
+ struct flock efl;
+ char err_msg[128];
+ struct stat stbuf;
+
+ assert(es != NULL);
+
+#if defined(IL_NOTIFICATIONS)
+ /* destination queue has to be found for each message separately */
+#else
+ /* find bookkepping server queue */
+ eq_b = queue_list_get(es->job_id_s);
+ if(eq_b == NULL)
+ return(-1);
+#endif
+
+#if !defined(IL_NOTIFICATIONS)
+ /* get log server queue */
+ eq_l = queue_list_get(NULL);
+#endif
+
+ event_store_lock(es);
+
+ il_log(LOG_DEBUG, " reading events from %s\n", es->event_file_name);
+
+ /* open event file */
+ ef = fopen(es->event_file_name, "r");
+ if(ef == NULL) {
+ snprintf(err_msg, sizeof(err_msg),
+ "event_store_recover: error opening event file %s",
+ es->event_file_name);
+ set_error(IL_SYS, errno, err_msg);
+ event_store_unlock(es);
+ return(-1);
+ }
+
+ /* lock the file for reading (we should not read while dglogd is writing) */
+ fd = fileno(ef);
+ efl.l_type = F_RDLCK;
+ efl.l_whence = SEEK_SET;
+ efl.l_start = 0;
+ efl.l_len = 0;
+ if(fcntl(fd, F_SETLKW, &efl) < 0) {
+ snprintf(err_msg, sizeof(err_msg),
+ "event_store_recover: error locking event file %s",
+ es->event_file_name);
+ set_error(IL_SYS, errno, err_msg);
+ event_store_unlock(es);
+ fclose(ef);
+ return(-1);
+ }
+
+ /* check the file modification time and size to avoid unnecessary operations */
+ memset(&stbuf, 0, sizeof(stbuf));
+ if(fstat(fd, &stbuf) < 0) {
+ il_log(LOG_ERR, " could not stat event file %s: %s\n", es->event_file_name, strerror(errno));
+ fclose(ef);
+ event_store_unlock(es);
+ return -1;
+ } else {
+ if((es->offset == stbuf.st_size) && (es->last_modified == stbuf.st_mtime)) {
+ il_log(LOG_DEBUG, " event file not modified since last visit, skipping\n");
+ fclose(ef);
+ event_store_unlock(es);
+ return(0);
+ }
+ }
+
+ while(1) { /* try, try, try */
+
+ /* get the position in file to be sought */
+ if(es->offset)
+ last = es->offset;
+ else {
+ last = es->last_committed_bs;
+ }
+
+ il_log(LOG_DEBUG, " setting starting file position to %ld\n", last);
+ il_log(LOG_DEBUG, " bytes sent to destination: %d\n", es->last_committed_bs);
+
+ if(last > 0) {
+ int c;
+
+ /* skip all committed or already enqueued events */
+ /* be careful - check, if the offset really points to the
+ beginning of event string */
+ if(fseek(ef, last - 1, SEEK_SET) < 0) {
+ set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
+ event_store_unlock(es);
+ fclose(ef);
+ return(-1);
+ }
+ /* the last enqueued event MUST end with \n */
+ if((c=fgetc(ef)) != '\n') {
+ /* Houston, we have got a problem */
+ il_log(LOG_WARNING,
+ " file position %ld does not point at the beginning of event string, backing off!\n",
+ last);
+ /* now, where were we? */
+ if(es->offset) {
+ /* next try will be with
+ last_commited_bs */
+ es->offset = 0;
+ } else {
+ /* this is really weird... back off completely */
+ es->last_committed_ls = es->last_committed_bs = 0;
+ }
+ } else {
+ /* OK, break out of the loop */
+ break;
+ }
+ } else {
+ /* this breaks out of the loop, we are starting at
+ * the beginning of file
+ */
+ if(fseek(ef, 0, SEEK_SET) < 0) {
+ set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
+ event_store_unlock(es);
+ fclose(ef);
+ return(-1);
+ }
+ break;
+ }
+ }
+
+ /* enqueue all remaining events */
+ ret = 1;
+ msg = NULL;
+ while(read_event_string(ef, &hmsg) >= 0) {
+
+ /* last holds the starting position of event_s in file */
+ il_log(LOG_DEBUG, " reading event at %ld\n", last);
+
+ /* break from now on means there was some error */
+ ret = -1;
+
+ /* create message for server */
+ msg = server_msg_create((il_octet_string_t*)&hmsg, last);
+ if(msg == NULL) {
+ il_log(LOG_ALERT, " event file corrupted! I will try to move it to quarantine (ie. rename it).\n");
+ /* actually do not bother if quarantine succeeded or not - we could not do more */
+ event_store_quarantine(es);
+ fclose(ef);
+ event_store_unlock(es);
+ return(-1);
+ }
+ msg->es = es;
+
+ /* first enqueue to the LS */
+ if(!bs_only && (last >= es->last_committed_ls)) {
+
+ il_log(LOG_DEBUG, " queueing event at %ld to server %s\n", last, eq_l->dest_name);
+
+#if !defined(IL_NOTIFICATIONS)
+ if(enqueue_msg(eq_l, msg) < 0)
+ break;
+#endif
+ }
+
+#ifdef IL_NOTIFICATIONS
+ eq_b = queue_list_get(msg->dest);
+#endif
+
+ /* now enqueue to the BS, if neccessary */
+ if((eq_b != eq_l) &&
+ (last >= es->last_committed_bs)) {
+
+ il_log(LOG_DEBUG, " queueing event at %ld to server %s\n", last, eq_b->dest_name);
+
+ if(enqueue_msg(eq_b, msg) < 0)
+ break;
+ }
+ server_msg_free(msg);
+ msg = NULL;
+
+ /* now last is also the offset behind the last successfully queued event */
+ last = ftell(ef);
+
+ /* ret == 0 means EOF or incomplete event found */
+ ret = 0;
+
+ } /* while */
+
+ /* due to this little assignment we had to lock the event_store for writing */
+ es->offset = last;
+ es->last_modified = stbuf.st_mtime;
+ il_log(LOG_DEBUG, " event store offset set to %ld\n", last);
+
+ if(msg)
+ server_msg_free(msg);
+
+ fclose(ef);
+ il_log(LOG_DEBUG, " finished reading events with %d\n", ret);
+
+ event_store_unlock(es);
+ return(ret);
+}
+
+
+/*
+ * event_store_sync()
+ * - check the position of event and fill holes from file
+ * - return 1 if the event is new,
+ * 0 if it was seen before,
+ * -1 if there was an error
+ */
+int
+event_store_sync(struct event_store *es, long offset)
+{
+ int ret;
+
+ assert(es != NULL);
+
+ /* all events are actually read from file, the event on socket
+ * is ignored and serves just to notify us about file change
+ */
+ ret = event_store_recover(es);
+ ret = (ret < 0) ? ret : 0;
+ return(ret);
+}
+
+
+int
+event_store_next(struct event_store *es, long offset, int len)
+{
+ assert(es != NULL);
+
+ /* offsets are good only to detect losses (differences between socket and file),
+ which is not possible now */
+ return 0;
+}
+
+
+/*
+ * event_store_commit()
+ *
+ */
+int
+event_store_commit(struct event_store *es, int len, int ls)
+{
+ assert(es != NULL);
+
+ event_store_lock(es);
+
+ if(ls)
+ es->last_committed_ls += len;
+ else {
+ es->last_committed_bs += len;
+ if (bs_only) es->last_committed_ls += len;
+ }
+
+ if(event_store_write_ctl(es) < 0) {
+ event_store_unlock(es);
+ return(-1);
+ }
+
+ event_store_unlock(es);
+
+
+ return(0);
+}
+
+
+/*
+ * event_store_clean()
+ * - remove the event files (event and ctl), if they are not needed anymore
+ * - returns 0 if event_store is in use, 1 if it was removed and -1 on error
+ *
+ * Q: How do we know that we can safely remove the files?
+ * A: When all events from file have been committed both by LS and BS.
+ */
+static
+int
+event_store_clean(struct event_store *es)
+{
+ long last;
+ int fd;
+ FILE *ef;
+ struct flock efl;
+
+ assert(es != NULL);
+
+ /* prevent sender threads from updating */
+ event_store_lock(es);
+
+ il_log(LOG_DEBUG, " trying to cleanup event store %s\n", es->job_id_s);
+ il_log(LOG_DEBUG, " bytes sent to logging server: %d\n", es->last_committed_ls);
+ il_log(LOG_DEBUG, " bytes sent to bookkeeping server: %d\n", es->last_committed_bs);
+
+ /* preliminary check to avoid opening event file */
+ /* if the positions differ, some events still have to be sent */
+ if(es->last_committed_ls != es->last_committed_bs) {
+ event_store_unlock(es);
+ il_log(LOG_DEBUG, " not all events sent, cleanup aborted\n");
+ return(0);
+ }
+
+ /* the file can only be removed when all the events were succesfully sent
+ (ie. committed both by LS and BS */
+ /* That also implies that the event queues are 'empty' at the moment. */
+ ef = fopen(es->event_file_name, "r+");
+ if(ef == NULL) {
+ /* if we can not open the event store, it is an error and the struct should be removed */
+ /* XXX - is it true? */
+ event_store_unlock(es);
+ il_log(LOG_ERR, " event_store_clean: error opening event file: %s\n", strerror(errno));
+ return(1);
+ }
+
+ fd = fileno(ef);
+
+ /* prevent local-logger from writing into event file */
+ efl.l_type = F_WRLCK;
+ efl.l_whence = SEEK_SET;
+ efl.l_start = 0;
+ efl.l_len = 0;
+ if(fcntl(fd, F_SETLK, &efl) < 0) {
+ il_log(LOG_DEBUG, " could not lock event file, cleanup aborted\n");
+ fclose(ef);
+ event_store_unlock(es);
+ if(errno != EACCES &&
+ errno != EAGAIN) {
+ set_error(IL_SYS, errno, "event_store_clean: error locking event file");
+ return(-1);
+ }
+ return(0);
+ }
+
+ /* now the file should not contain partially written event, so it is safe
+ to get offset behind last event by seeking the end of file */
+ if(fseek(ef, 0, SEEK_END) < 0) {
+ set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file");
+ event_store_unlock(es);
+ fclose(ef);
+ return(-1);
+ }
+
+ last = ftell(ef);
+ il_log(LOG_DEBUG, " total bytes in file: %d\n", last);
+
+ if(es->last_committed_ls < last) {
+ fclose(ef);
+ event_store_unlock(es);
+ il_log(LOG_DEBUG, " events still waiting in queue, cleanup aborted\n");
+ return(0);
+ } else if( es->last_committed_ls > last) {
+ il_log(LOG_WARNING, " warning: event file seems to shrink!\n");
+ /* XXX - in that case we can not continue because there may be
+ some undelivered events referring to that event store */
+ fclose(ef);
+ event_store_unlock(es);
+ return(0);
+ }
+
+ /* now we are sure that all events were sent and the event queues are empty */
+ il_log(LOG_INFO, " removing event file %s\n", es->event_file_name);
+
+ /* remove the event file */
+ unlink(es->event_file_name);
+ unlink(es->control_file_name);
+
+ /* clear the counters */
+ es->last_committed_ls = 0;
+ es->last_committed_bs = 0;
+ es->offset = 0;
+
+ /* unlock the event_store even if it is going to be removed */
+ event_store_unlock(es);
+
+ /* close the event file (that unlocks it as well) */
+ fclose(ef);
+
+ /* indicate that it is safe to remove this event_store */
+ return(1);
+}
+
+
+/* --------------------------------
+ * event store management functions
+ * --------------------------------
+ */
+struct event_store *
+event_store_find(char *job_id_s)
+{
+ struct event_store_list *q, *p;
+ struct event_store *es;
+
+ if(pthread_rwlock_wrlock(&store_list_lock)) {
+ abort();
+ }
+
+ es = NULL;
+
+ q = NULL;
+ p = store_list;
+
+ while(p) {
+ if(strcmp(p->es->job_id_s, job_id_s) == 0) {
+ es = p->es;
+ if(pthread_rwlock_rdlock(&es->use_lock))
+ abort();
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+ return(es);
+ }
+
+ q = p;
+ p = p->next;
+ }
+
+ es = event_store_create(job_id_s);
+ if(es == NULL) {
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+ return(NULL);
+ }
+
+ p = malloc(sizeof(*p));
+ if(p == NULL) {
+ set_error(IL_NOMEM, ENOMEM, "event_store_find: no room for new event store");
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+ return(NULL);
+ }
+
+ p->next = store_list;
+ store_list = p;
+
+ p->es = es;
+
+ if(pthread_rwlock_rdlock(&es->use_lock))
+ abort();
+
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+
+ return(es);
+}
+
+
+int
+event_store_release(struct event_store *es)
+{
+ assert(es != NULL);
+
+ if(pthread_rwlock_unlock(&es->use_lock))
+ abort();
+ il_log(LOG_DEBUG, " released lock on %s\n", es->job_id_s);
+ return(0);
+}
+
+
+event_store_from_file(char *filename)
+{
+ struct event_store *es;
+ FILE *event_file;
+ char *job_id_s = NULL, *p;
+ il_http_message_t hmsg;
+ int ret;
+
+ il_log(LOG_INFO, " attaching to event file: %s\n", filename);
+
+ if(strstr(filename, "quarantine") != NULL) {
+ il_log(LOG_INFO, " file name belongs to quarantine, not touching that.\n");
+ return(0);
+ }
+
+ event_file = fopen(filename, "r");
+ if(event_file == NULL) {
+ set_error(IL_SYS, errno, "event_store_from_file: error opening event file");
+ return(-1);
+ }
+ ret = read_event_string(event_file, &hmsg);
+ fclose(event_file);
+ if(ret < 0)
+ return(0);
+
+ /* get id aka dest */
+ job_id_s = hmsg.host;
+
+ il_log(LOG_DEBUG, " message dest: '%s'\n", job_id_s);
+ if(job_id_s == NULL) {
+ il_log(LOG_NOTICE, " skipping file, could not parse event\n");
+ ret = 0;
+ goto out;
+ }
+
+ es=event_store_find(job_id_s);
+
+ if(es == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ if((es->last_committed_ls == 0) &&
+ (es->last_committed_bs == 0) &&
+ (es->offset == 0)) {
+ ret = event_store_read_ctl(es);
+ } else
+ ret = 0;
+
+ event_store_release(es);
+
+out:
+ if(hmsg.data) free(hmsg.data);
+ if(job_id_s) free(job_id_s);
+ return(ret);
+}
+
+
+int
+event_store_init(char *prefix)
+{
+ if(file_prefix == NULL) {
+ file_prefix = strdup(prefix);
+ store_list = NULL;
+ }
+
+ /* read directory and get a list of event files */
+ {
+ int len;
+
+ char *p, *dir;
+ DIR *event_dir;
+ struct dirent *entry;
+
+
+ /* get directory name */
+ p = strrchr(file_prefix, '/');
+ if(p == NULL) {
+ dir = strdup(".");
+ p = "";
+ len = 0;
+ } else {
+ *p = '\0';
+ dir = strdup(file_prefix);
+ *p++ = '/';
+ len = strlen(p);
+ }
+
+ event_dir = opendir(dir);
+ if(event_dir == NULL) {
+ free(dir);
+ set_error(IL_SYS, errno, "event_store_init: error opening event directory");
+ return(-1);
+ }
+
+ while((entry=readdir(event_dir))) {
+ char *s;
+
+ /* skip all files that do not match prefix */
+ if(strncmp(entry->d_name, p, len) != 0)
+ continue;
+
+ /* skip all control files */
+ if((s=strstr(entry->d_name, ".ctl")) != NULL &&
+ s[4] == '\0')
+ continue;
+
+ s = malloc(strlen(dir) + strlen(entry->d_name) + 2);
+ if(s == NULL) {
+ free(dir);
+ set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for file name");
+ return(-1);
+ }
+
+ *s = '\0';
+ strcat(s, dir);
+ strcat(s, "/");
+ strcat(s, entry->d_name);
+
+ if(event_store_from_file(s) < 0) {
+ free(dir);
+ free(s);
+ closedir(event_dir);
+ return(-1);
+ }
+
+ free(s);
+ }
+ closedir(event_dir);
+
+ /* one more pass - this time remove stale .ctl files */
+ event_dir = opendir(dir);
+ if(event_dir == NULL) {
+ free(dir);
+ set_error(IL_SYS, errno, "event_store_init: error opening event directory");
+ return(-1);
+ }
+
+ while((entry=readdir(event_dir))) {
+ char *s;
+
+ /* skip all files that do not match prefix */
+ if(strncmp(entry->d_name, p, len) != 0)
+ continue;
+
+ /* find all control files */
+ if((s=strstr(entry->d_name, ".ctl")) != NULL &&
+ s[4] == '\0') {
+ char *ef;
+ struct stat st;
+
+ /* is there corresponding event file? */
+ ef = malloc(strlen(dir) + strlen(entry->d_name) + 2);
+ if(ef == NULL) {
+ free(dir);
+ set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for event file name");
+ return(-1);
+ }
+
+ s[0] = 0;
+ *ef = '\0';
+ strcat(ef, dir);
+ strcat(ef, "/");
+ strcat(ef, entry->d_name);
+ s[0] = '.';
+
+ if(stat(ef, &st) == 0) {
+ /* something is there */
+ /* XXX - it could be something else than event file, but do not bother now */
+ } else {
+ /* could not stat file, remove ctl */
+ strcat(ef, s);
+ il_log(LOG_DEBUG, " removing stale file %s\n", ef);
+ if(unlink(ef))
+ il_log(LOG_ERR, " could not remove file %s: %s\n", ef, strerror(errno));
+
+ }
+ free(ef);
+
+ }
+ }
+ closedir(event_dir);
+ free(dir);
+ }
+
+ return(0);
+}
+
+
+int
+event_store_recover_all()
+{
+ struct event_store_list *sl;
+
+
+ if(pthread_rwlock_rdlock(&store_list_lock))
+ abort();
+
+ /* recover all event stores */
+ sl = store_list;
+ while(sl != NULL) {
+
+ /* recover this event store */
+ /* no need to lock use_lock in event_store, the store_list_lock is in place */
+ if(event_store_recover(sl->es) < 0) {
+ il_log(LOG_ERR, " error recovering event store %s:\n %s\n", sl->es->event_file_name, error_get_msg());
+ clear_error();
+ }
+ sl = sl->next;
+ }
+
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+
+ return(0);
+}
+
+
+#if 0
+int
+event_store_remove(struct event_store *es)
+{
+ struct event_store_list *p, **q;
+
+ assert(es != NULL);
+
+ switch(event_store_clean(es)) {
+ case 0:
+ il_log(LOG_DEBUG, " event store not removed, still used\n");
+ return(0);
+
+ case 1:
+ if(pthread_rwlock_wrlock(&store_list_lock) < 0) {
+ set_error(IL_SYS, errno, " event_store_remove: error locking event store list");
+ return(-1);
+ }
+
+ p = store_list;
+ q = &store_list;
+
+ while(p) {
+ if(p->es == es) {
+ (*q) = p->next;
+ event_store_free(es);
+ free(p);
+ break;
+ }
+ q = &(p->next);
+ p = p->next;
+ }
+
+ if(pthread_rwlock_unlock(&store_list_lock) < 0) {
+ set_error(IL_SYS, errno, " event_store_remove: error unlocking event store list");
+ return(-1);
+ }
+ return(1);
+
+ default:
+ return(-1);
+ }
+ /* not reached */
+ return(0);
+}
+#endif
+
+int
+event_store_cleanup()
+{
+ struct event_store_list *sl;
+ struct event_store_list *slnext;
+ struct event_store_list **prev;
+
+ /* try to remove event files */
+
+ if(pthread_rwlock_wrlock(&store_list_lock))
+ abort();
+
+ sl = store_list;
+ prev = &store_list;
+
+ while(sl != NULL) {
+ int ret;
+
+ slnext = sl->next;
+
+ /* one event store at time */
+ ret = pthread_rwlock_trywrlock(&sl->es->use_lock);
+ if(ret == EBUSY) {
+ il_log(LOG_DEBUG, " event_store %s is in use by another thread\n",
+ sl->es->job_id_s);
+ sl = slnext;
+ continue;
+ } else if (ret < 0)
+ abort();
+
+ switch(event_store_clean(sl->es)) {
+
+ case 1:
+ /* remove this event store */
+ (*prev) = slnext;
+ event_store_free(sl->es);
+ free(sl);
+ break;
+
+ case -1:
+ il_log(LOG_ERR, " error removing event store %s (file %s):\n %s\n",
+ sl->es->job_id_s, sl->es->event_file_name, error_get_msg());
+ /* event_store_release(sl->es); */
+ clear_error();
+ /* go on to the next */
+
+ default:
+ event_store_release(sl->es);
+ prev = &(sl->next);
+ break;
+ }
+
+ sl = slnext;
+ }
+
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+
+ return(0);
+}
+
--- /dev/null
+#ident "$Header$"
+
+#include <string.h>
+#include <errno.h>
+
+#include "interlogd.h"
+
+
+int
+parse_request(const char *s, il_http_message_t *msg)
+{
+ if(!strncasecmp(s, "HTTP", 4)) {
+ msg->msg_type = IL_HTTP_REPLY;
+ } else if(!strncasecmp(s, "POST", 4)) {
+ msg->msg_type = IL_HTTP_POST;
+ } else if(!strncasecmp(s, "GET", 3)) {
+ msg->msg_type = IL_HTTP_GET;
+ } else {
+ msg->msg_type = IL_HTTP_OTHER;
+ }
+ if(msg->msg_type == IL_HTTP_REPLY) {
+ char *p = strchr(s, ' ');
+
+ if(!p) goto parse_end;
+ p++;
+ msg->reply_code=atoi(p);
+ p = strchr(p, ' ');
+ if(!p) goto parse_end;
+ p++;
+ msg->reply_string = strdup(p);
+
+ parse_end:
+ ;
+ }
+}
+
+
+int
+parse_header(const char *s, il_http_message_t *msg)
+{
+ if(!strncasecmp(s, "Content-Length:", 15)) {
+ msg->content_length = atoi(s + 15);
+ } else if(!strncasecmp(s, "Host:", 5)) {
+ const char *p = s + 4;
+ while(*++p == ' '); /* skip spaces */
+ msg->host = strdup(p);
+ }
+ return(0);
+}
+
+
+#define DEFAULT_CHUNK_SIZE 1024
+
+// read what is available and parse what can be parsed
+// returns the result of read operation of the underlying connection,
+// ie. the number of bytes read or error code
+int
+receive_http(void *user_data, int (*reader)(void *, char *, const int), il_http_message_t *msg)
+{
+ static enum { NONE, IN_REQUEST, IN_HEADERS, IN_BODY } state = NONE;
+ int len, alen, clen, i, buffer_free, min_buffer_free = DEFAULT_CHUNK_SIZE;
+ char *buffer, *p, *s, *cr;
+
+ memset(msg, 0, sizeof(*msg));
+ // msg->data = NULL;
+ // msg->len = 0;
+ state = IN_REQUEST;
+ alen = 0;
+ buffer = NULL;
+ buffer_free = 0;
+ p = NULL;
+ s = NULL;
+
+ do {
+ /* p - first empty position in buffer
+ alen - size of allocated buffer
+ len - number of bytes received in last read
+ s - points behind last scanned CRLF or at buffer start
+ buffer_free = alen - (p - buffer)
+ */
+
+ /* prepare at least chunk_size bytes for next data */
+ if(buffer_free < min_buffer_free) {
+ char *n;
+
+ alen += min_buffer_free;
+ n = realloc(buffer, alen);
+ if(n == NULL) {
+ free(buffer);
+ set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
+ return(-1);
+ }
+ buffer_free += min_buffer_free;
+ p = n + (p - buffer);
+ s = n + (s - buffer);
+ buffer = n;
+ }
+
+ if(buffer_free > 0) {
+ len = (*reader)(user_data, p, buffer_free);
+ if(len < 0) {
+ // error
+ free(buffer);
+ // set_error(IL_SYS, errno, "receive_http: error reading data");
+ return -1;
+ } else if(len == 0) {
+ // EOF
+ free(buffer);
+ set_error(IL_PROTO, errno, "receive_http: error reading data - premature EOF");
+ return -1;
+ }
+ buffer_free -= len;
+ p+= len;
+ }
+
+
+ switch(state) {
+
+ // parse buffer, look for CRLFs
+ // s - start scan position
+ // p - start of current token
+ // cr - current CRLF position
+
+ case IN_REQUEST:
+ if((s < p - 1) &&
+ (cr = (char*)memchr(s, '\r', p - s - 1)) &&
+ (cr[1] == '\n')) {
+ *cr = 0;
+ parse_request(s, msg);
+ *cr = '\r';
+ // change state
+ state = IN_HEADERS;
+ // start new tokens (cr < p - 1 -> s < p + 1 <-> s <= p)
+ s = cr + 2;
+ } else {
+ break;
+ }
+
+ case IN_HEADERS:
+ while((state != IN_BODY) &&
+ (s < p - 1) &&
+ (cr = (char*)memchr(s, '\r', p - s - 1)) &&
+ (cr[1] == '\n')) {
+ if(s == cr) { /* do not consider request starting with CRLF */
+ // found CRLFCRLF
+ state = IN_BODY;
+ } else {
+ *cr = 0;
+ parse_header(s, msg);
+ *cr = '\r';
+ }
+ // next scan starts after CRLF
+ s = cr + 2;
+ }
+ if(state == IN_BODY) {
+ // we found body
+ // content-length should be set at the moment
+ if(msg->content_length > 0) {
+ int need_free = msg->content_length - (p - s);
+ char *n;
+
+ alen += need_free - buffer_free + 1;
+ n = realloc(buffer, alen);
+ if(n == NULL) {
+ free(buffer);
+ set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
+ return(-1);
+ }
+ buffer_free = need_free;
+ min_buffer_free = 0;
+ p = n + (p - buffer);
+ s = n + (s - buffer);
+ buffer = n;
+ } else {
+ // report error
+ free(buffer);
+ set_error(IL_PROTO, EINVAL, "receive_http: error reading data - no content length specified\n");
+ return -1;
+ }
+ }
+ break;
+
+ case IN_BODY:
+ if(buffer_free == 0) {
+ // finished reading
+ *p = 0;
+ state = NONE;
+ }
+ break;
+ }
+ } while(state != NONE);
+
+ msg->data = buffer;
+ msg->len = p - buffer;
+
+ return 0;
+}
--- /dev/null
+#ident "$Header$"
+
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "interlogd.h"
+
+static const int SOCK_QUEUE_MAX = 50;
+extern char *socket_path;
+extern char *file_prefix;
+
+static int sock;
+static int accepted;
+
+static
+int plain_reader(void *user_data, char *buffer, const int len)
+{
+ return (recv(*(int*)user_data, buffer, len, MSG_NOSIGNAL));
+}
+
+
+int
+input_queue_attach()
+{
+ struct sockaddr_un saddr;
+
+ if((sock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
+ set_error(IL_SYS, errno, "input_queue_attach: error creating socket");
+ return(-1);
+ }
+
+ memset(&saddr, 0, sizeof(saddr));
+ saddr.sun_family = AF_UNIX;
+ strcpy(saddr.sun_path, socket_path);
+
+ /* test for the presence of the socket and another instance
+ of interlogger listening */
+ if(connect(sock, (struct sockaddr *)&saddr, sizeof(saddr.sun_path)) < 0) {
+ if(errno == ECONNREFUSED) {
+ /* socket present, but no one at the other end; remove it */
+ il_log(LOG_WARNING, " removing stale input socket %s\n", socket_path);
+ unlink(socket_path);
+ }
+ /* ignore other errors for now */
+ } else {
+ /* connection was successful, so bail out - there is
+ another interlogger running */
+ set_error(IL_SYS, EADDRINUSE, "input_queue_attach: another instance of interlogger is running");
+ return(-1);
+ }
+
+ if(bind(sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
+ set_error(IL_SYS, errno, "input_queue_attach: error binding socket");
+ return(-1);
+ }
+
+ if (listen(sock, SOCK_QUEUE_MAX)) {
+ set_error(IL_SYS, errno, "input_queue_attach: error listening on socket");
+ return -1;
+ }
+
+ return(0);
+}
+
+
+void input_queue_detach()
+{
+ if (sock >= 0)
+ close(sock);
+ unlink(socket_path);
+}
+
+
+
+/*
+ * Returns: -1 on error, 0 if no message available, message length otherwise
+ *
+ */
+#ifdef PERF_EVENTS_INLINE
+int
+input_queue_get(il_octet_string_t **buffer, long *offset, int timeout)
+{
+ static long o = 0;
+ int len;
+ char *jobid;
+ static il_octet_string_t my_buffer;
+
+ assert(buffer != NULL);
+
+ *buffer = &my_buffer;
+
+ len = glite_wll_perftest_produceEventString(&my_buffer.data, &jobid);
+ my_buffer.len = len;
+ if(len) {
+ o += len;
+ *offset = o;
+ } else if (len == 0) {
+ sleep(timeout);
+ }
+ return(len);
+}
+#else
+int
+input_queue_get(il_octet_string_t **buffer, long *offset, int timeout)
+{
+ fd_set fds;
+ struct timeval tv;
+ int msg_len;
+ static il_http_message_t msg;
+
+ assert(buffer != NULL);
+
+ *buffer = (il_octet_string_t *)&msg;
+
+ FD_ZERO(&fds);
+ FD_SET(sock, &fds);
+
+ tv.tv_sec = timeout;
+ tv.tv_usec = 0;
+
+ msg_len = select(sock + 1, &fds, NULL, NULL, timeout >= 0 ? &tv : NULL);
+ switch(msg_len) {
+
+ case 0: /* timeout */
+ return(0);
+
+ case -1: /* error */
+ switch(errno) {
+ case EINTR:
+ il_log(LOG_DEBUG, " interrupted while waiting for event!\n");
+ return(0);
+
+ default:
+ set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
+ return(-1);
+ }
+ default:
+ break;
+ }
+
+ if((accepted=accept(sock, NULL, NULL)) < 0) {
+ set_error(IL_SYS, errno, "input_queue_get: error accepting connection");
+ return(-1);
+ }
+
+ msg_len = receive_http(&accepted, plain_reader, &msg);
+
+ if(msg_len < 0) {
+ close(accepted);
+ if(error_get_maj() != IL_OK)
+ return -1;
+ else
+ return 0;
+ }
+
+ close(accepted);
+ *offset = -1;
+ return(msg.len);
+}
+#endif
+
--- /dev/null
+#ident "$Header$"
+
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "glite/jobid/cjobid.h"
+#include "glite/lb/context.h"
+
+#include "interlogd.h"
+
+struct queue_list {
+ struct event_queue *queue;
+ char *dest;
+ struct queue_list *next;
+ time_t expires;
+};
+
+static struct event_queue *log_queue;
+static struct queue_list *queues;
+
+
+static
+int
+queue_list_create()
+{
+ queues = NULL;
+
+ return(0);
+}
+
+
+static
+int
+queue_list_find(struct queue_list *ql, const char *dest, struct queue_list **el, struct queue_list **prev)
+{
+ struct queue_list *q, *p;
+
+ assert(el != NULL);
+
+ *el = NULL;
+ if(prev)
+ *prev = NULL;
+
+ if(ql == NULL)
+ return(0);
+
+ q = NULL;
+ p = ql;
+
+ while(p) {
+ if(strcmp(p->dest, dest) == 0) {
+ *el = p;
+ if(prev)
+ *prev = q;
+ return(1);
+ }
+
+ q = p;
+ p = p->next;
+ };
+
+ return(0);
+}
+
+
+static
+int
+queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq)
+{
+ struct queue_list *el;
+
+ assert(dest != NULL);
+ assert(eq != NULL);
+ assert(ql != NULL);
+
+ el = malloc(sizeof(*el));
+ if(el == NULL) {
+ set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough room for new queue");
+ return(-1);
+ }
+
+ el->dest = strdup(dest);
+ if(el->dest == NULL) {
+ free(el);
+ set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough memory for new queue");
+ return(-1);
+ }
+ el->queue = eq;
+ el->next = *ql;
+ *ql = el;
+ return 0;
+}
+
+
+struct event_queue *
+queue_list_get(char *job_id_s)
+{
+ char *dest;
+ struct queue_list *q;
+ struct event_queue *eq;
+ dest = job_id_s;
+
+ if(dest == NULL)
+ return(NULL);
+
+ if(queue_list_find(queues, dest, &q, NULL)) {
+ return(q->queue);
+ } else {
+ eq = event_queue_create(dest);
+ if(eq)
+ queue_list_add(&queues, dest, eq);
+ return(eq);
+ }
+}
+
+
+int
+queue_list_is_log(struct event_queue *eq)
+{
+ return(eq == queue_list_get(NULL));
+}
+
+
+int
+queue_list_init(char *ls)
+{
+ return(queue_list_create());
+}
+
+
+static struct queue_list *current;
+
+
+struct event_queue *
+queue_list_first()
+{
+ current = queues;
+ return(current ? current->queue : NULL);
+}
+
+
+struct event_queue *
+queue_list_next()
+{
+ current = current ? current->next : NULL;
+ return(current ? current->queue : NULL);
+}
+
+
+int
+queue_list_remove_queue(struct event_queue *eq)
+{
+ assert(eq != NULL);
+
+ free(eq);
+ return(1);
+}
+
+
+
+/* Local Variables: */
+/* c-indentation-style: gnu */
+/* End: */
--- /dev/null
+#ident "$Header$"
+
+#include <assert.h>
+#include <errno.h>
+#include <stdio.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+
+/*
+ * - L/B server protocol handling routines
+ */
+
+#include "glite/jobid/cjobid.h"
+#include "glite/lb/il_string.h"
+#include "glite/lb/context.h"
+
+#include "interlogd.h"
+
+struct reader_data {
+ edg_wll_GssConnection *gss;
+ struct timeval *timeout;
+};
+
+
+static
+int
+gss_reader(void *user_data, char *buffer, int max_len)
+{
+ int ret;
+ struct reader_data *data = (struct reader_data *)user_data;
+ edg_wll_GssStatus gss_stat;
+
+ ret = edg_wll_gss_read(data->gss, buffer, max_len, data->timeout, &gss_stat);
+ if(ret < 0) {
+ char *gss_err = NULL;
+
+ if(ret == EDG_WLL_GSS_ERROR_GSS) {
+ edg_wll_gss_get_error(&gss_stat, "get_reply", &gss_err);
+ set_error(IL_DGGSS, ret, gss_err);
+ free(gss_err);
+ } else
+ set_error(IL_DGGSS, ret, "get_reply");
+ }
+ return(ret);
+}
+
+
+/*
+ * Read reply from server.
+ * Returns: -1 - error reading message,
+ * code > 0 - http status code from server
+ */
+static
+int
+get_reply(struct event_queue *eq, char **buf, int *code_min)
+{
+ int ret, code;
+ int len;
+ struct timeval tv;
+ struct reader_data data;
+ il_http_message_t msg;
+
+ tv.tv_sec = TIMEOUT;
+ tv.tv_usec = 0;
+ data.gss = &eq->gss;
+ data.timeout = &tv;
+ len = receive_http(&data, gss_reader, &msg);
+ if(len < 0) {
+ set_error(IL_PROTO, LB_PROTO, "get_reply: error reading server reply");
+ return(-1);
+ }
+ if(msg.data) free(msg.data);
+ if(msg.reply_string) *buf = msg.reply_string;
+ *code_min = 0; /* XXX fill in flag for fault */
+ return(msg.reply_code);
+}
+
+
+
+/*
+ * Returns: 0 - not connected, timeout set, 1 - OK
+ */
+int
+event_queue_connect(struct event_queue *eq)
+{
+ int ret;
+ struct timeval tv;
+ edg_wll_GssStatus gss_stat;
+ cred_handle_t *local_cred_handle;
+
+ assert(eq != NULL);
+
+#ifdef LB_PERF
+ if(!nosend) {
+#endif
+
+ if(eq->gss.context == NULL) {
+
+ tv.tv_sec = TIMEOUT;
+ tv.tv_usec = 0;
+
+ /* get pointer to the credentials */
+ if(pthread_mutex_lock(&cred_handle_lock) < 0)
+ abort();
+ local_cred_handle = cred_handle;
+ local_cred_handle->counter++;
+ if(pthread_mutex_unlock(&cred_handle_lock) < 0)
+ abort();
+
+ il_log(LOG_DEBUG, " trying to connect to %s:%d\n", eq->dest_name, eq->dest_port);
+ ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &eq->gss, &gss_stat);
+ if(pthread_mutex_lock(&cred_handle_lock) < 0)
+ abort();
+ /* check if we need to release the credentials */
+ --local_cred_handle->counter;
+ if(local_cred_handle != cred_handle && local_cred_handle->counter == 0) {
+ edg_wll_gss_release_cred(&local_cred_handle->creds, NULL);
+ free(local_cred_handle);
+ il_log(LOG_DEBUG, " freed credentials, not used anymore\n");
+ }
+ if(pthread_mutex_unlock(&cred_handle_lock) < 0)
+ abort();
+
+ if(ret < 0) {
+ char *gss_err = NULL;
+
+ if (ret == EDG_WLL_GSS_ERROR_GSS)
+ edg_wll_gss_get_error(&gss_stat, "event_queue_connect: edg_wll_gss_connect", &gss_err);
+ set_error(IL_DGGSS, ret,
+ (ret == EDG_WLL_GSS_ERROR_GSS) ? gss_err : "event_queue_connect: edg_wll_gss_connect");
+ if (gss_err) free(gss_err);
+ eq->gss.context = NULL;
+ eq->timeout = TIMEOUT;
+ return(0);
+ }
+ }
+
+#ifdef LB_PERF
+ }
+#endif
+
+ return(1);
+}
+
+
+int
+event_queue_close(struct event_queue *eq)
+{
+ assert(eq != NULL);
+
+#ifdef LB_PERF
+ if(!nosend) {
+#endif
+
+ if(eq->gss.context != NULL) {
+ edg_wll_gss_close(&eq->gss, NULL);
+ eq->gss.context = NULL;
+ }
+#ifdef LB_PERF
+ }
+#endif
+ return(0);
+}
+
+
+/*
+ * Send all events from the queue.
+ * Returns: -1 - system error, 0 - not sent, 1 - queue empty
+ */
+int
+event_queue_send(struct event_queue *eq)
+{
+ int events_sent = 0;
+ assert(eq != NULL);
+
+#ifdef LB_PERF
+ if(!nosend) {
+#endif
+ if(eq->gss.context == NULL)
+ return(0);
+#ifdef LB_PERF
+ }
+#endif
+
+ /* feed the server with events */
+ while (!event_queue_empty(eq)) {
+ struct server_msg *msg;
+ char *rep;
+ int ret, code, code_min;
+ size_t bytes_sent;
+ struct timeval tv;
+ edg_wll_GssStatus gss_stat;
+
+ clear_error();
+
+ if(event_queue_get(eq, &msg) < 0)
+ return(-1);
+
+ il_log(LOG_DEBUG, " trying to deliver event at offset %d for job %s\n", msg->offset, msg->job_id_s);
+
+#ifdef LB_PERF
+ if(!nosend) {
+#endif
+ /* XXX: ljocha -- does it make sense to send empty messages ? */
+ if (msg->len) {
+ tv.tv_sec = TIMEOUT;
+ tv.tv_usec = 0;
+ ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
+ if(ret < 0) {
+ if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && events_sent > 0) {
+ eq->timeout = 0;
+ } else {
+ il_log(LOG_ERR, "send_event: %s\n", error_get_msg());
+ eq->timeout = TIMEOUT;
+ }
+ return(0);
+ }
+ if((code = get_reply(eq, &rep, &code_min)) < 0) {
+ /* could not get the reply properly, so try again later */
+ if (events_sent>0)
+ eq->timeout = 1;
+ else {
+ eq->timeout = TIMEOUT;
+ il_log(LOG_ERR, " error reading server %s reply:\n %s\n", eq->dest_name, error_get_msg());
+ }
+ return(0);
+ }
+ }
+ else { code = 200; code_min = 0; rep = strdup("not sending empty message"); }
+#ifdef LB_PERF
+ } else {
+ glite_wll_perftest_consumeEventIlMsg(msg->msg+17);
+ code = 200;
+ rep = strdup("OK");
+ }
+#endif
+
+ il_log(LOG_DEBUG, " event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep);
+ free(rep);
+
+ /* the reply is back here, decide what to do with message */
+ /* HTTP error codes:
+ 1xx - informational (eg. 100 Continue)
+ 2xx - successful (eg. 200 OK)
+ 3xx - redirection (eg. 301 Moved Permanently)
+ 4xx - client error (eq. 400 Bad Request)
+ 5xx - server error (eq. 500 Internal Server Error)
+ */
+ if(code >= 100 && code < 200) {
+
+ /* non fatal errors (for us), try to deliver later */
+ eq->timeout = TIMEOUT;
+ return(0);
+ }
+
+ /* the message was consumed (successfully or not) */
+ /* update the event pointer */
+ if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq)) < 0)
+ /* failure committing message, this is bad */
+ return(-1);
+
+ event_queue_remove(eq);
+ events_sent++;
+ } /* while */
+
+ return(1);
+
+} /* send_events */
+
+
+/* this is just not used */
+int
+send_confirmation(long lllid, int code)
+{
+ return 0;
+}
--- /dev/null
+#ident "$Header$"
+
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+
+#include "interlogd.h"
+#include "glite/lb/il_msg.h"
+#include "glite/lb/events_parse.h"
+#include "glite/lb/context.h"
+
+static
+int
+create_msg(il_http_message_t *ev, char **buffer, long *receipt, time_t *expires)
+{
+ char *event = ev->data;
+
+ *receipt = 0;
+ *expires = 0;
+
+ *buffer = ev->data;
+ return ev->len;;
+}
+
+
+struct server_msg *
+server_msg_create(il_octet_string_t *event, long offset)
+{
+ struct server_msg *msg;
+
+ msg = malloc(sizeof(*msg));
+ if(msg == NULL) {
+ set_error(IL_NOMEM, ENOMEM, "server_msg_create: out of memory allocating message");
+ return(NULL);
+ }
+
+ if(server_msg_init(msg, event) < 0) {
+ server_msg_free(msg);
+ return(NULL);
+ }
+ msg->offset = offset;
+
+ return(msg);
+}
+
+
+struct server_msg *
+server_msg_copy(struct server_msg *src)
+{
+ struct server_msg *msg;
+
+ msg = malloc(sizeof(*msg));
+ if(msg == NULL) {
+ set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating message");
+ return(NULL);
+ }
+
+ msg->msg = malloc(src->len);
+ if(msg->msg == NULL) {
+ set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating server message");
+ server_msg_free(msg);
+ return(NULL);
+ }
+ msg->len = src->len;
+ memcpy(msg->msg, src->msg, src->len);
+
+ msg->job_id_s = strdup(src->job_id_s);
+ msg->ev_len = src->ev_len;
+ msg->es = src->es;
+ msg->receipt_to = src->receipt_to;
+ msg->offset = src->offset;
+#if defined(IL_NOTIFICATIONS)
+ msg->dest_name = strdup(src->dest_name);
+ msg->dest_port = src->dest_port;
+ msg->dest = strdup(src->dest);
+#endif
+ msg->expires = src->expires;
+ return(msg);
+}
+
+
+int
+server_msg_init(struct server_msg *msg, il_octet_string_t *event)
+{
+ il_http_message_t *hmsg = (il_http_message_t *)event;
+
+ assert(msg != NULL);
+ assert(event != NULL);
+
+ memset(msg, 0, sizeof(*msg));
+
+
+ msg->job_id_s = hmsg->host;
+ if(msg->job_id_s == NULL) {
+ set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "server_msg_init: error getting id");
+ return -1;
+ }
+ msg->len = create_msg(hmsg, &msg->msg, &msg->receipt_to, &msg->expires);
+ if(msg->len < 0)
+ return -1;
+ /* set this to indicate new data owner */
+ hmsg->data = NULL;
+ hmsg->host = NULL;
+ msg->ev_len = hmsg->len + 1; /* must add separator size too */
+ return 0;
+
+}
+
+
+int
+server_msg_is_priority(struct server_msg *msg)
+{
+ assert(msg != NULL);
+
+ return(msg->receipt_to != 0);
+}
+
+
+int
+server_msg_free(struct server_msg *msg)
+{
+ assert(msg != NULL);
+
+ if(msg->msg) free(msg->msg);
+ if(msg->job_id_s) free(msg->job_id_s);
+ free(msg);
+ return 0;
+}