From: cvs2svn Date: Fri, 20 Aug 2010 08:32:20 +0000 (+0000) Subject: This commit was manufactured by cvs2svn to create tag 'glite-lb_R_1_10_1_1'. X-Git-Tag: glite-lb_R_1_10_1_1 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=af74a51509d1bb8aae86f1fe9b5f5ef9728d685c;p=jra1mw.git This commit was manufactured by cvs2svn to create tag 'glite-lb_R_1_10_1_1'. Sprout from branch_RC31_3 2010-08-20 08:32:18 UTC Zdeněk Šustr 'New revision' Cherrypick from master 2009-04-08 08:15:08 UTC Aleš Křenek 'The most recent version copied. Do not modify this instance (RW in ./org.glite.lb).': org.glite.lb.logger/configure org.glite.lb.logger/project/package.description org.glite.lb.logger/project/package.summary org.glite.lb.logger/src-nt/Connection.cpp org.glite.lb.logger/src-nt/EventManager.cpp org.glite.lb.logger/src-nt/InputChannel.H org.glite.lb.logger/src-nt/InputChannel.cpp org.glite.lb.logger/src-nt/Message.H org.glite.lb.logger/src-nt/MessageStore.H org.glite.lb.logger/src-nt/MessageStore.cpp org.glite.lb.logger/src-nt/Properties.H org.glite.lb.logger/src-nt/Transport.cpp org.glite.lb.logger/src/event_store_http.c org.glite.lb.logger/src/http.c org.glite.lb.logger/src/input_queue_socket_http.c org.glite.lb.logger/src/queue_mgr_http.c org.glite.lb.logger/src/send_event_http.c org.glite.lb.logger/src/server_msg_http.c --- diff --git a/org.glite.lb.logger/configure b/org.glite.lb.logger/configure new file mode 100755 index 0000000..c289773 --- /dev/null +++ b/org.glite.lb.logger/configure @@ -0,0 +1,691 @@ +#!/usr/bin/perl + +# WARNING: Don't edit this file unless it is the master copy in org.glite.lb +# +# For the purpose of standalone builds of lb/jobid/lbjp-common components +# it is copied on tagging + +# $Header$ + +use Getopt::Long; + +my $pwd = `pwd`; chomp $pwd; +my $prefix = $pwd.'/stage'; +my $stagedir; +my $staged; +my $module; +my $thrflavour = 'gcc64dbgpthr'; +my $nothrflavour = 'gcc64dbg'; +my $mode = 'build'; +my $help = 0; +my $listmodules; +my $version; +my $output; +my $lb_tag = ''; +my $lbjp_tag = ''; +my $jp_tag = ''; +my $sec_tag = ''; +my $jobid_tag = ''; + +my @nodes = qw/client server logger utils client-java doc ws-test db jpprimary jpindex jpclient/; +my %enable_nodes; +my %disable_nodes; + +my %extern_prefix = ( + cares => '/opt/c-ares', + classads => '/opt/classads', + cppunit => '/usr', + expat => '/usr', + globus => '/opt/globus', + gsoap => '/usr', + mysql => '/usr', + 'mysql-devel' => '', + voms => '/opt/glite', + gridsite => '/opt/glite', + lcas => '/opt/glite', + ant => '/usr', + jdk => '/usr', + libtar => '/usr', +); + +my %jar = ( + 'commons-codec' => '/usr/share/java/commons-codec-1.3.jar', +); + + +my %glite_prefix; +my %need_externs; +my %need_externs_type; +my %need_jars; +my %extrafull; +my %extranodmod; +my %deps; +my %deps_type; +my %topbuild; + +my %lbmodules = ( + 'lb' => [ qw/client client-java common doc logger server state-machine types utils ws-interface ws-test/], + 'security' => [qw/gss gsoap-plugin/], + 'lbjp-common' => [qw/db maildir server-bones trio jp-interface/], + 'jobid' => [qw/api-c api-cpp api-java/], + 'jp' => [ qw/client doc index primary server-common ws-interface/ ], + ); + + +my @opts = ( + 'prefix=s' => \$prefix, + 'staged=s' => \$staged, + 'module=s' => \$module, + 'thrflavour=s' => \$thrflavour, + 'nothrflavour=s' => \$nothrflavour, + 'mode=s' => \$mode, + 'listmodules=s' => \$listmodules, + 'version=s' => \$version, + 'output=s' => \$output, + 'stage=s' => \$stagedir, + 'lb-tag=s' => \$lb_tag, + 'lbjp-common-tag=s' => \$lbjp_tag, + 'jp-tag=s' => \$jp_tag, + 'security-tag=s' => \$sec_tag, + 'jobid-tag=s' => \$jobid_tag, + 'help' => \$help, +); + +for (@nodes) { + $enable_nodes{$_} = 0; + $disable_nodes{$_} = 0; + + push @opts,"disable-$_",\$disable_nodes{$_}; + push @opts,"enable-$_",\$enable_nodes{$_}; +} + +push @opts,"with-$_=s",\$extern_prefix{$_} for keys %extern_prefix; +push @opts,"with-$_=s",\$jar{$_} for keys %jar; + +my @keeparg = @ARGV; + +GetOptions @opts or die "Errors parsing command line\n"; + +$extern_prefix{'mysql-devel'}=$extern_prefix{mysql} if $extern_prefix{'mysql-devel'} eq ''; + +if ($help) { usage(); exit 0; } + +if ($listmodules) { + my @m = map "org.glite.$listmodules.$_",@{$lbmodules{$listmodules}}; + print "@m\n"; + exit 0; +} + +warn "$0: --version and --output make sense only in --mode=etics\n" + if ($version || $output) && $mode ne 'etics'; + +my $en; +for (keys %enable_nodes) { $en = 1 if $enable_nodes{$_}; } + +my $dis; +for (keys %disable_nodes) { $dis = 1 if $disable_nodes{$_}; } + +die "--enable-* and --disable-* are mutually exclusive\n" + if $en && $dis; + +die "--module cannot be used with --enable-* or --disable-*\n" + if $module && ($en || $dis); + +die "$module: unknown module\n" if $module && ! grep $module,@{$lbmodules{lb}},@{$lbmodules{security}},{$lbmodules{jp}}; + +if ($dis) { + for (@nodes) { + $enable_nodes{$_} = 1 unless $disable_nodes{$_}; + } +} + +if (!$en && !$dis) { $enable_nodes{$_} = 1 for (@nodes) } ; + +for (keys %enable_nodes) { delete $enable_nodes{$_} unless $enable_nodes{$_}; } + +$stagedir = $prefix unless $stagedir; + +if ($mode eq 'build') { + print "Writing config.status\n"; + open CONF,">config.status" or die "config.status: $!\n"; + print CONF "$0 @keeparg\n"; + close CONF; +} + + +my @modules; +my %aux; + +if ($module) { +# push @modules,split(/[,.]+/,$module); + push @modules,$module; +} +else { + @modules = map(($extranodmod{$_} ? $extranodmod{$_} : 'lb.'.$_),(keys %enable_nodes)); + + my $n; + + do { + local $"="\n"; + $n = $#modules; + push @modules,(map @{$deps{$_}},@modules); + + undef %aux; @aux{@modules} = (1) x ($#modules+1); + @modules = keys %aux; + } while ($#modules > $n); +} + +@aux{@modules} = (1) x ($#modules+1); +delete $aux{$_} for (split /,/,$staged); +@modules = keys %aux; + +mode_build() if $mode eq 'build'; +mode_checkout() if $mode eq 'checkout'; +mode_etics($module) if $mode eq 'etics'; + +sub mode_build { + print "\nBuilding modules: @modules\n"; + + my @ext = map @{$need_externs{$_}},@modules; + my @myjars = map @{$need_jars{$_}},@modules; + undef %aux; @aux{@ext} = 1; + @ext = keys %aux; + undef %aux; @aux{@myjars} = (1) x ($#myjars+1); + @myjars = keys %aux; + + print "\nRequired externals:\n"; + print "\t$_: $extern_prefix{$_}\n" for @ext; + print "\t$_: $jar{$_}\n" for @myjars; + print "\nThis is a poor-man configure, it's up to you to have sources and externals there\n\n"; + + mkinc($_) for @modules; + + print "Creating Makefile\n"; + + open MAK,">Makefile" or die "Makefile: $!\n"; + + print MAK "all: @modules\n\nclean:\n"; + + for (@modules) { + my $full = full($_); + my $build = $topbuild{$_} ? '': '/build'; + print MAK "\tcd $full$build && \${MAKE} clean\n" + } + + print MAK "\ndistclean:\n"; + + for (@modules) { + my $full = full($_); + print MAK $topbuild{$_} ? + "\tcd $full$build && \${MAKE} distclean\n" : + "\trm -rf $full$build\n" + } + + print MAK "\n"; + + for (@modules) { + my %ldeps; undef %ldeps; + @ldeps{@{$deps{$_}}} = 1; + for my $x (split /,/,$staged) { delete $ldeps{$x}; } + my @dnames = $module ? () : keys %ldeps; + + my $full = full($_); + my $build = $topbuild{$_} ? '': '/build'; + + print MAK "$_: @dnames\n\tcd $full$build && \${MAKE} && \${MAKE} install\n\n"; + } + + close MAK; +} + +sub mode_checkout() { + for (@modules) { + my $module = $_; + my $tag = ""; + if ($lb_tag){ + for (@{$lbmodules{lb}}){ + if ("lb.".$_ eq $module){ + $tag = '-r '.$lb_tag; + } + } + } + if ($lbjp_tag){ + for (@{$lbmodules{'lbjp-common'}}){ + if ("lbjp-common.".$_ eq $module){ + $tag = '-r '.$lbjp_tag; + } + } + } + if ($jp_tag){ + for (@{$lbmodules{'jp'}}){ + if ("jp.".$_ eq $module){ + $tag = '-r '.$jp_tag; + } + } + } + if ($sec_tag){ + for (@{$lbmodules{security}}){ + if ("security.".$_ eq $module){ + $tag = '-r '.$sec_tag; + } + } + } + if ($jobid_tag){ + for (@{$lbmodules{jobid}}){ + if ("jobid.".$_ eq $module){ + $tag = '-r '.$jobid_tag; + } + } + } + #if (grep {"lb.".$_ eq $module} @{$lbmodules{lb}}){ + # print "found"; + #} + $_ = full($_); + print "\n*** Checking out $_\n"; + system("cvs checkout $tag $_") == 0 or die "cvs checkout $tag $_: $?\n"; + } +} + +BEGIN{ +%need_externs_aux = ( + 'lb.client' => [ qw/cppunit:B classads/ ], + 'lb.client-java' => [ qw/ant:B/ ], + 'lb.common' => [ qw/expat cppunit:B classads/ ], + 'lb.doc' => [], + 'lb.logger' => [ qw/cppunit:B/ ], + 'lb.server' => [ qw/globus_essentials:R globus:B expat cares mysql cppunit:B gsoap:B classads voms lcas gridsite/ ], + 'lb.state-machine' => [ qw/classads/ ], + 'lb.utils' => [ qw/cppunit:B/ ], + 'lb.ws-interface' => [], + 'lb.ws-test' => [ qw/gsoap:B/ ], + 'lb.types' => [ qw// ], + 'lbjp-common.db' => [ qw/mysql:R mysql-devel:B/ ], + 'lbjp-common.maildir' => [ qw// ], + 'lbjp-common.server-bones' => [ qw// ], + 'lbjp-common.trio' => [ qw/cppunit:B/ ], + 'lbjp-common.jp-interface' => [ qw/cppunit:B/ ], + 'security.gss' => [ qw/globus_essentials:R globus:B cares cppunit:B/ ], + 'security.gsoap-plugin' => [ qw/cppunit:B globus_essentials:R globus:B cares gsoap:B/ ], + 'jobid.api-c' => [ qw/cppunit:B/ ], + 'jobid.api-cpp' => [ qw/cppunit:B/ ], + 'jobid.api-java' => [ qw/ant:B jdk:B/ ], + 'jp.client' => [ qw/gsoap libtar globus_essentials:R globus:B/ ], + 'jp.doc' => [], + 'jp.index' => [ qw/gsoap globus_essentials:R globus:B/ ], + 'jp.primary' => [ qw/classads gsoap libtar globus_essentials:R globus:B/ ], + 'jp.server-common' => [], + 'jp.ws-interface' => [], +); + +for my $ext (keys %need_externs_aux) { + for (@{$need_externs_aux{$ext}}) { + /([^:]*)(?::(.*))?/; + push @{$need_externs{$ext}},$1; + my $type = $2 ? $2 : 'BR'; + $need_externs_type{$ext}->{$1} = $type; + } +} + +%need_jars = ( + 'jobid.api-java' => [ qw/commons-codec/ ], +); + +for my $jar (keys %need_jars) { + for (@{$need_jars{$jar}}) { + $need_externs_type{$jar}->{$_} = 'BR'; # XXX + } +} + +%deps_aux = ( + 'lb.client' => [ qw/ + lb.types:B lb.common + lbjp-common.trio + jobid.api-cpp jobid.api-c + security.gss + / ], + 'lb.client-java' => [ qw/ + lb.types:B + jobid.api-java + / ], + 'lb.common' => [ qw/ + jobid.api-cpp jobid.api-c + lb.types:B lbjp-common.trio security.gss + / ], + 'lb.doc' => [ qw/lb.types:B/ ], + 'lb.logger' => [ qw/ + lbjp-common.trio + jobid.api-c + lb.common + security.gss + / ], + 'lb.server' => [ qw/ + lb.ws-interface lb.types:B lb.common lb.state-machine + lbjp-common.db lbjp-common.server-bones lbjp-common.trio lbjp-common.maildir + jobid.api-c + security.gsoap-plugin security.gss + / ], + 'lb.state-machine' => [ qw/lb.common lbjp-common.jp-interface security.gss/ ], + 'lb.utils' => [ qw/ + lbjp-common.jp-interface + jobid.api-c + lbjp-common.trio lbjp-common.maildir + lb.client lb.state-machine + / ], + 'lb.ws-test' => [ qw/security.gsoap-plugin lb.ws-interface/ ], + 'lb.ws-interface' => [ qw/lb.types:B/ ], + 'lb.types' => [ qw// ], + 'lbjp-common.db' => [ qw/lbjp-common.trio/ ], + 'lbjp-common.maildir' => [ qw// ], + 'lbjp-common.server-bones' => [ qw// ], + 'lbjp-common.trio' => [ qw// ], + 'security.gss' => [ qw// ], + 'security.gsoap-plugin' => [ qw/security.gss/ ], + 'jobid.api-c' => [ qw// ], + 'jobid.api-cpp' => [ qw/jobid.api-c/ ], + 'jobid.api-java' => [ qw// ], + + 'lbjp-common.jp-interface' => [ qw/lbjp-common.db jobid.api-c/ ], + + 'jp.client' => [ qw/ + jp.ws-interface + lbjp-common.jp-interface lbjp-common.maildir + jobid.api-c + security.gsoap-plugin + / ], + 'jp.doc' => [ qw// ], + 'jp.index' => [ qw/ + jp.server-common jp.ws-interface + lbjp-common.jp-interface lbjp-common.trio lbjp-common.db lbjp-common.server-bones + security.gsoap-plugin + / ], + 'jp.primary' => [ qw/ + jobid.api-c + jp.server-common jp.ws-interface + lb.state-machine + lbjp-common.jp-interface lbjp-common.trio lbjp-common.db lbjp-common.server-bones + security.gsoap-plugin + / ], + 'jp.server-common' => [ qw/ + lbjp-common.jp-interface lbjp-common.db + / ], + 'jp.ws-interface' => [ qw// ], +); + +for my $ext (keys %deps_aux) { + for (@{$deps_aux{$ext}}) { + /([^:]*)(?::(.*))?/; + push @{$deps{$ext}},$1; + my $type = $2 ? $2 : 'BR'; + $deps_type{$ext}->{$1} = $type; + } +} + + +%extrafull = ( gridsite=>'org.gridsite.core'); + +#( java => 'client-java' ); +%extranodmod = ( + db => 'lbjp-common.db', + jpprimary => 'jp.primary', + jpindex => 'jp.index', + jpclient => 'jp.client', +); + +my @t = qw/lb.client-java jobid.api-java lb.types/; +@topbuild{@t} = (1) x ($#t+1); +} + +sub full +{ + my $short = shift; + return $extrafull{$short} ? $extrafull{$short} : 'org.glite.'.$short; +} + +sub mkinc +{ + my %aux; + undef %aux; + my @m=qw/ +lb.client lb.doc lb.state-machine lb.ws-interface lb.logger lb.types lb.common lb.server lb.utils lb.ws-test lb.client-java +security.gss security.gsoap-plugin +jobid.api-c jobid.api-cpp jobid.api-java +lbjp-common.db lbjp-common.maildir lbjp-common.server-bones lbjp-common.trio lbjp-common.jp-interface +jp.client jp.doc jp.index jp.primary jp.server-common jp.ws-interface +/; + @aux{@m} = (1) x ($#m+1); + + my $short = shift; + my $full = full $short; + + unless ($aux{$short}) { + print "Makefile.inc not needed in $full\n"; + return; + } + + my $build = ''; + + unless ($topbuild{$_}) { + $build = '/build'; + unless (-d "$full/build") { + mkdir "$full/build" or die "mkdir $full/build: $!\n"; + } + unlink "$full/build/Makefile"; + symlink "../Makefile","$full/build/Makefile" or die "symlink ../Makefile $full/build/Makefile: $!\n"; + } + + open MKINC,">$full$build/Makefile.inc" + or die "$full$build/Makefile.inc: $!\n"; + + print "Creating $full$build/Makefile.inc\n"; + + print MKINC qq{ +PREFIX = $prefix +stagedir = $stagedir +thrflavour = $thrflavour +nothrflavour = $nothrflavour +}; + + for (@{$need_externs{$short}}) { + print MKINC "${_}_prefix = $extern_prefix{$_}\n" + } + + for (@{$need_jars{$short}}) { + print MKINC "${_}_jar = $jar{$_}\n" + } + + my $need_gsoap = 0; + for (@{$need_externs{$short}}) { $need_gsoap = 1 if $_ eq 'gsoap'; } + + print MKINC "gsoap_default_version=".gsoap_version()."\n" if $need_gsoap; + + close MKINC; +} + +my %etics_externs; +my %etics_projects; +BEGIN{ + %etics_externs = ( + globus_essentials=>'vdt_globus_essentials', + globus=>'globus', + cares=>'c-ares', + voms=>'org.glite.security.voms-api-cpp', + gridsite=>'org.gridsite.shared', + lcas=>'org.glite.security.lcas', + ); + %etics_projects = ( + vdt=>[qw/globus globus_essentials/], + 'org.glite'=>[qw/voms gridsite lcas/], + ); +}; + +sub mode_etics { + $fmod = shift; + + die "$0: --module required with --etics\n" unless $fmod; + + my ($subsys,$module) = split /\./,$fmod; + + my ($major,$minor,$rev,$age); + + if ($version) { + $version =~ /([[:digit:]]+)\.([[:digit:]]+)\.([[:digit:]]+)-(.+)/; + ($major,$minor,$rev,$age) = ($1,$2,$3,$4); + } + else { + open V,"org.glite.$subsys.$module/project/version.properties" + or die "org.glite.$subsys.$module/project/version.properties: $!\n"; + + while ($_ = ) { + chomp; + ($major,$minor,$rev) = ($1,$2,$3) if /module\.version\s*=\s*([[:digit:]]+)\.([[:digit:]]+)\.([[:digit:]]+)/; + $age = $1 if /module\.age\s*=\s*([[:digit:]]+)/; + } + close V; + } + + my @copts = (); + my %ge; + @ge{@{$etics_projects{'org.glite'}}} = (1) x ($#{$etics_projects{'org.glite'}}+1); + + for (@{$need_externs{"$subsys.$module"}}) { + if ($need_externs_type{"$subsys.$module"}->{$_}=~/B/) { + my $eext = $etics_externs{$_} ? $etics_externs{$_} : $_; + push @copts,$ge{$_} ? "--with-$_=\${stageDir}" : "--with-$_=\${$eext.location}"; + } + } + + for (@{$need_jars{"$subsys.$module"}}) { + my $eext = $etics_externs{$_} ? $etics_externs{$_} : $_; + + push @copts,"--with-$_ \${$eext.location}/$_*.jar"; + } + + + my $conf = "glite-$subsys-${module}_R_${major}_${minor}_${rev}_${age}"; + my $file = $output ? $output : "$conf.ini"; + open C,">$file" or die "$file: $!\n"; + + my $buildroot = $topbuild{"$subsys.$module"} ? '' : "build.root = build\n"; + + my $confdir = $topbuild{"$subsys.$module"} ? '..' : '../..'; + + print STDERR "Writing $file\n"; + print C qq{ +[Configuration-$conf] +profile = None +moduleName = org.glite.$subsys.$module +displayName = $conf +description = org.glite.$subsys.$module +projectName = org.glite +age = $age +deploymentType = None +tag = $conf +version = $major.$minor.$rev +path = \${projectName}/\${moduleName}/\${version}/\${platformName}/\${packageName}-\${version}-\${age}.tar.gz + +[Platform-default:VcsCommand] +displayName = None +description = None +tag = cvs -d \${vcsroot} tag -R \${tag} \${moduleName} +branch = None +commit = None +checkout = cvs -d \${vcsroot} co -r \${tag} \${moduleName} + +[Platform-default:BuildCommand] +postpublish = None +packaging = None +displayName = None +description = None +doc = None +prepublish = None +publish = None +compile = make +init = None +install = make install +clean = make clean +test = make check +configure = cd $confdir && \${moduleName}/configure --thrflavour=\${globus.thr.flavor} --nothrflavour=\${globus.nothr.flavor} --prefix=\${prefix} --stage=\${stageDir} --module $subsys.$module @copts +checkstyle = None + +[Platform-default:Property] +$buildroot + +[Platform-default:DynamicDependency] + +}; + for (@{$need_externs{"$subsys.$module"}},@{$need_jars{"$subsys.$module"}}) { + my $eext = $etics_externs{$_} ? $etics_externs{$_} : $_; + + my $proj = 'externals'; + for my $p (keys %etics_projects) { + for $m (@{$etics_projects{$p}}) { + $proj = $p if $m eq $_; + } + } + + my $type = $need_externs_type{"$subsys.$module"}->{$_}; + print C "$proj|$eext = $type\n"; + } + + for (@{$deps{"$subsys.$module"}}) { + my $type = $deps_type{"$subsys.$module"}->{$_}; + print C "org.glite|org.glite.$_ = $type\n"; + } + + close C; +} + +sub gsoap_version { + local $_; + my $gsoap_version; + open S,"$extern_prefix{gsoap}/bin/soapcpp2 -v 2>&1 |" or die "$extern_prefix{gsoap}/bin/soapcpp2: $!\n"; + + while ($_ = ) { + chomp; + + $gsoap_version = $1 if /The gSOAP Stub and Skeleton Compiler for C and C\+\+ ([.[:digit:][:alpha:]]+)$/; + } + close S; + return $gsoap_version; +} + + +sub usage { + my @ext = keys %extern_prefix; + my @myjars, keys %jar; + + print STDERR qq{ +usage: $0 options + +General options (defaults in []): + --prefix=PREFIX destination directory [./stage] + --staged=module,module,... what is already in PREFIX (specify without org.glite.) + --thrflavour=flavour + --nothrflavour=flavour threaded and non-treaded flavours [gcc64dbgpthr,gcc64dbg] + --listmodules=subsys list modules of a subsystem + +Mode of operation: + --mode={checkout|build|etics} what to do [build] + +What to build: + --module=module build this module only (mostly in-Etics operation) + --enable-NODE build this "node" (set of modules) only. Available nodes are + @{$lbmodules{lb}},@{$lbmodules{security}} + --disable-NODE don't build this node + --lb-tag=tag checkout LB modules with specific tag + --jp-tag=tag checkout JP modules with specific tag + --lbjp-common-tag=tag checkout lbjp-common modules with specific tag + --security-tag=tag checkout security modules with specific tag + --jobid-tag=tag checkout jobid modules with specific tag + +Dependencies: + --with-EXTERNAL=PATH where to look for an external. Required externals + (not all for all modules) are: + @ext + --with-JAR=JAR where to look for jars. Required jars are: + @myjars + Summary of what will be used is always printed + +}; + +} diff --git a/org.glite.lb.logger/project/package.description b/org.glite.lb.logger/project/package.description new file mode 100644 index 0000000..cd0621b --- /dev/null +++ b/org.glite.lb.logger/project/package.description @@ -0,0 +1 @@ +glite-lb-logger is the gLite LB local-logger and inter-logger. This package contains the local-logger (glite-lb-logd), inter-logger (glite-lb-interlogd) and notification inter-logger (glite-lb-notif-interlogd) daemons. diff --git a/org.glite.lb.logger/project/package.summary b/org.glite.lb.logger/project/package.summary new file mode 100644 index 0000000..089b630 --- /dev/null +++ b/org.glite.lb.logger/project/package.summary @@ -0,0 +1 @@ +gLite Logging and Bookkeeping local-logger and inter-logger diff --git a/org.glite.lb.logger/src-nt/Connection.cpp b/org.glite.lb.logger/src-nt/Connection.cpp new file mode 100644 index 0000000..7cac943 --- /dev/null +++ b/org.glite.lb.logger/src-nt/Connection.cpp @@ -0,0 +1,4 @@ +#include "Connection.H" + +Connection::Factory::~Factory() { +} diff --git a/org.glite.lb.logger/src-nt/EventManager.cpp b/org.glite.lb.logger/src-nt/EventManager.cpp new file mode 100644 index 0000000..91efb12 --- /dev/null +++ b/org.glite.lb.logger/src-nt/EventManager.cpp @@ -0,0 +1,23 @@ +#include "EventManager.H" + +int +EventManager::postEvent(Event* &e) +{ + for(std::list::iterator i = handlers.begin(); + i != handlers.end(); + i++) { + (*i)->handleEvent(e); + } + return 0; +} + +void +EventManager::addHandler(EventHandler *handler) +{ + handlers.push_back(handler); +} + +void +EventManager::removeHandler(EventHandler *handler) +{ +} diff --git a/org.glite.lb.logger/src-nt/InputChannel.H b/org.glite.lb.logger/src-nt/InputChannel.H new file mode 100644 index 0000000..2bac262 --- /dev/null +++ b/org.glite.lb.logger/src-nt/InputChannel.H @@ -0,0 +1,29 @@ +#ifndef _INPUT_CHANNEL_H_ +#define _INPUT_CHANNEL_H_ + +#include "ThreadPool.H" +#include "Connection.H" +#include "Transport.H" + +class InputChannel + : public ThreadPool::WorkDescription { +public: + + InputChannel(Connection *conn, Transport *trans) + : ThreadPool::WorkDescription(conn->getFD()), + m_connection(conn), m_transport(trans) + {} + + void start(); + +protected: + virtual void onReady(); + virtual void onTimeout(); + virtual void onError(); + +private: + Connection *m_connection; + Transport *m_transport; +}; + +#endif diff --git a/org.glite.lb.logger/src-nt/InputChannel.cpp b/org.glite.lb.logger/src-nt/InputChannel.cpp new file mode 100644 index 0000000..e3ac9c8 --- /dev/null +++ b/org.glite.lb.logger/src-nt/InputChannel.cpp @@ -0,0 +1,37 @@ +#include "InputChannel.H" +#include "ThreadPool.H" +#include "EventManager.H" + +extern EventManager theEventManager; + +void +InputChannel::start() +{ + ThreadPool::instance()->queueWorkRead(this); +} + +void +InputChannel::onReady() +{ + Transport::Message *msg = NULL; + int ret = m_transport->receive(m_connection, msg); + if(ret <= 0) { + // no new data read + } else if(msg) { + // we have a new message + + } else { + // still need more data + ThreadPool::instance()->queueWorkRead(this); + } +} + +void +InputChannel::onTimeout() +{ +} + +void +InputChannel::onError() +{ +} diff --git a/org.glite.lb.logger/src-nt/Message.H b/org.glite.lb.logger/src-nt/Message.H new file mode 100644 index 0000000..725966a --- /dev/null +++ b/org.glite.lb.logger/src-nt/Message.H @@ -0,0 +1,66 @@ +#ifndef _MESSAGE_H_ +#define _MESSAGE_H + +#include "Properties.H" +#include "MessageStore.H" + +#include + +class Message: public MessageStore::Storable { +public: + + /** class that holds message state + * + */ + class State : public MessageStore::Storable { + public: + + /** Get size needed for storage (from Storable). + */ + virtual int getStorageSize() const; + + /** Save State (from Storable) + */ + virtual int save(void* data, int len) const; + + /** Load State (from Storable) + */ + virtual int load(void* data, int len); + }; + + + Message(); + + Message(void * data, unsigned int length) + : m_length(length), + m_data(data) + {} + + + int getContent(void* &data) const + { data = m_data; return m_length; } + + int getContentLength() const + { return m_length; } + + std::string getProperty(const std::string &name, std::string &val) + { return m_properties.getProperty(name); } + + void setProperty(const std::string &name, std::string &val) + { m_properties.setProperty(name, val); } + + Properties& getProperties() + { return m_properties; } + + void setProperties(Properties &) + {} + +private: + MessageStore::ID m_id; + unsigned int m_length; + void * m_data; + Properties m_properties; +}; + + +#endif diff --git a/org.glite.lb.logger/src-nt/MessageStore.H b/org.glite.lb.logger/src-nt/MessageStore.H new file mode 100644 index 0000000..ff03a9b --- /dev/null +++ b/org.glite.lb.logger/src-nt/MessageStore.H @@ -0,0 +1,84 @@ +#ifndef _MESSAGE_STORE_H_ +#define _MESSAGE_STORE_H_ + +#include + +/** Permanent storage for messages and their states. + */ + +class MessageStore { +public: + + /** Base class for everything that can be stored here. + */ + class Storable { + public: + /** Get size needed for object storage. + */ + virtual int getStorageSize() const = 0; + + /** Save state of object into binary data. + */ + virtual int save(void* data, int len) const = 0; + + /** Load state of object from binary data. + */ + virtual int load(void* data, int len) = 0; + + virtual ~Storable() {} + }; + + + /** Class that uniquely identifies stored content. + */ + class ID: public Storable { + public: + /** Default constructor. + * + * Creates new unique ID. + */ + ID(); + + /** Copy constructor. + */ + ID(const ID& src); + + /** Destructor. + */ + ~ID() {}; + + /** Assignment operator. + */ + ID& operator=(const ID& src); + + /** Return the string suitable for printing. + */ + std::string toString() const; + + /** Comparison operator + */ + int operator==(const ID& second); + + /** Get size needed for storage (from Storable). + */ + virtual int getStorageSize() const; + + /** Save ID (from Storable) + */ + virtual int save(void* data, int len) const; + + /** Load ID (from Storable) + */ + virtual int load(void* data, int len); + + protected: + unsigned long long getID() {return id;} + + private: + static pthread_mutex_t counterLock; + static unsigned counter; + unsigned long long id; + }; +}; + +#endif diff --git a/org.glite.lb.logger/src-nt/MessageStore.cpp b/org.glite.lb.logger/src-nt/MessageStore.cpp new file mode 100644 index 0000000..eb9de7a --- /dev/null +++ b/org.glite.lb.logger/src-nt/MessageStore.cpp @@ -0,0 +1,24 @@ +#include +#include +#include + +#include "MessageStore.H" + +pthread_mutex_t MessageStore::ID::counterLock = PTHREAD_MUTEX_INITIALIZER; +unsigned MessageStore::ID::counter = 0; + +MessageStore::ID::ID(){ + time_t t; + time(&t); + pthread_mutex_lock(&counterLock); + counter++; + id = ((unsigned long long) counter << 32) + t; + pthread_mutex_unlock(&counterLock); +} + +std::string MessageStore::ID::toString() const{ + std::ostringstream oss; + oss << id; + return oss.str(); +} + diff --git a/org.glite.lb.logger/src-nt/Properties.H b/org.glite.lb.logger/src-nt/Properties.H new file mode 100644 index 0000000..77d216d --- /dev/null +++ b/org.glite.lb.logger/src-nt/Properties.H @@ -0,0 +1,36 @@ +#ifndef _PROPERTIES_H_ +#define _PROPERTIES_H_ + +#include +#include + +class Properties { +public: + + // default constructor + Properties() + : properties() + {} + + // accessors + std::string& getProperty(const std::string &key) + { return properties[key]; } + + void setProperty(const std::string &key, std::string &val) + { properties[key] = val; } + + // iterators + typedef std::map::iterator iterator; + + iterator begin() + { return properties.begin(); } + + iterator end() + { return properties.end(); } + + +private: + std::map properties; +}; + +#endif diff --git a/org.glite.lb.logger/src-nt/Transport.cpp b/org.glite.lb.logger/src-nt/Transport.cpp new file mode 100644 index 0000000..2544997 --- /dev/null +++ b/org.glite.lb.logger/src-nt/Transport.cpp @@ -0,0 +1,5 @@ +#include "Transport.H" + +Transport::~Transport() +{ +} diff --git a/org.glite.lb.logger/src/event_store_http.c b/org.glite.lb.logger/src/event_store_http.c new file mode 100644 index 0000000..ebd5523 --- /dev/null +++ b/org.glite.lb.logger/src/event_store_http.c @@ -0,0 +1,1113 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#include +#include + +#include "glite/lb/events_parse.h" + +#include "interlogd.h" + +#ifdef __GNUC__ +#define UNUSED_VAR __attribute__((unused)) +#else +#define UNUSED_VAR +#endif + +static char *file_prefix = NULL; + + +struct event_store_list { + struct event_store *es; + struct event_store_list *next; +}; + + +static struct event_store_list *store_list; +static pthread_rwlock_t store_list_lock = PTHREAD_RWLOCK_INITIALIZER; + + +/* ---------------- + * helper functions + * ---------------- + */ +static +char * +jobid2eventfile(const char *job_id_s) +{ + char *buffer; + + if(job_id_s) { + asprintf(&buffer, "%s.%s", file_prefix, job_id_s); + } else + asprintf(&buffer, "%s.default", file_prefix); + + return(buffer); +} + + +static +char * +jobid2controlfile(char *job_id_s) +{ + char *buffer; + char *hash; + + if(job_id_s) { + asprintf(&buffer, "%s.%s.ctl", file_prefix, job_id_s); + } else + asprintf(&buffer, "%s.default.ctl", file_prefix); + + return(buffer); +} + +static +int +file_reader(void *user_data, char *buffer, const int len) +{ + size_t ret = 0; + + if(len > 0) { + ret = fread(buffer, 1, len, (FILE*)user_data); + if(ret == 0 && ferror((FILE*)user_data)) { + return -1; + } + } + return ret; +} + + +static +int +read_event_string(FILE *file, il_http_message_t *msg) +{ + int len, ret; + int fd = fileno(file); + long start; + + /* remember the start position */ + start = ftell(file); + ret = receive_http(file, file_reader, msg); + if(ret < 0) return ret; + /* seek at the end of message in case the reader read ahead */ + len = fseek(file, start + msg->len, SEEK_SET); + len = fgetc(file); + if(len != '\n') { + il_log(LOG_ERR, "error reading event from file, missing terminator character at %d, found %c(%d))\n", + start+msg->len, len, len); + if(msg->data) { free(msg->data); msg->data = NULL; } + if(msg->host) { free(msg->host); msg->host = NULL; } + return EINVAL; + } + return ret; +} + + + +/* ------------------------------ + * event_store 'member' functions + * ------------------------------ + */ +static +int +event_store_free(struct event_store *es) +{ + assert(es != NULL); + + if(es->job_id_s) free(es->job_id_s); + if(es->event_file_name) free(es->event_file_name); + if(es->control_file_name) free(es->control_file_name); + pthread_rwlock_destroy(&es->use_lock); + pthread_rwlock_destroy(&es->commit_lock); + free(es); + + return(0); +} + + +static +struct event_store * +event_store_create(char *job_id_s) +{ + struct event_store *es; + + es = malloc(sizeof(*es)); + if(es == NULL) { + set_error(IL_NOMEM, ENOMEM, "event_store_create: error allocating room for structure"); + return(NULL); + } + + memset(es, 0, sizeof(*es)); + + il_log(LOG_DEBUG, " creating event store for id %s\n", job_id_s); + + es->job_id_s = strdup(job_id_s); + es->event_file_name = jobid2eventfile(job_id_s); + es->control_file_name = jobid2controlfile(job_id_s); + + if(pthread_rwlock_init(&es->commit_lock, NULL)) + abort(); + if(pthread_rwlock_init(&es->use_lock, NULL)) + abort(); + + return(es); +} + + +static +int +event_store_lock_ro(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_rdlock(&es->commit_lock)) + abort(); + + return(0); +} + + +static +int +event_store_lock(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_wrlock(&es->commit_lock)) + abort(); + + return(0); +} + + +static +int +event_store_unlock(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_unlock(&es->commit_lock)) + abort(); + return(0); +} + + +static +int +event_store_read_ctl(struct event_store *es) +{ + FILE *ctl_file; + + assert(es != NULL); + + event_store_lock(es); + if((ctl_file = fopen(es->control_file_name, "r")) == NULL) { + /* no control file, new event file */ + es->last_committed_ls = 0; + es->last_committed_bs = 0; + } else { + /* read last seen and last committed counts */ + fscanf(ctl_file, "%*s\n%ld\n%ld\n", + &es->last_committed_ls, + &es->last_committed_bs); + fclose(ctl_file); + } + event_store_unlock(es); + + return(0); +} + + +static +int +event_store_write_ctl(struct event_store *es) +{ + FILE *ctl; + + assert(es != NULL); + + ctl = fopen(es->control_file_name, "w"); + if(ctl == NULL) { + set_error(IL_SYS, errno, "event_store_write_ctl: error opening control file"); + return(-1); + } + + if(fprintf(ctl, "%s\n%ld\n%ld\n", + es->job_id_s, + es->last_committed_ls, + es->last_committed_bs) < 0) { + set_error(IL_SYS, errno, "event_store_write_ctl: error writing control record"); + return(-1); + } + + if(fclose(ctl) < 0) { + set_error(IL_SYS, errno, "event_store_write_ctl: error closing control file"); + return(-1); + } + + return(0); +} + + +/* + * event_store_qurantine() + * - rename damaged event store file + * - essentially does the same actions as cleanup, but the event store + * does not have to be empty + * returns 0 on success, -1 on error + */ +static +int +event_store_quarantine(struct event_store *es) +{ + int num; + char newname[MAXPATHLEN+1]; + + /* find available qurantine name */ + /* we give it at most 1024 tries */ + for(num = 0; num < 1024; num++) { + struct stat st; + + snprintf(newname, MAXPATHLEN, "%s.quarantine.%d", es->event_file_name, num); + newname[MAXPATHLEN] = 0; + if(stat(newname, &st) < 0) { + if(errno == ENOENT) { + /* file not found */ + break; + } else { + /* some other error with name, probably permanent */ + set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename"); + return(-1); + + } + } else { + /* the filename is used already */ + } + } + if(num >= 1024) { + /* new name not found */ + /* XXX - is there more suitable error? */ + set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename"); + return(-1); + } + + /* actually rename the file */ + il_log(LOG_DEBUG, " renaming damaged event file from %s to %s\n", + es->event_file_name, newname); + if(rename(es->event_file_name, newname) < 0) { + set_error(IL_SYS, errno, "event_store_quarantine: error renaming event file"); + return(-1); + } + + /* clear the counters */ + es->last_committed_ls = 0; + es->last_committed_bs = 0; + es->offset = 0; + + return(0); +} + + +/* + * event_store_recover() + * - recover after restart or catch up when events missing in IPC + * - if offset > 0, read everything behind it + * - if offset == 0, read everything behind min(last_committed_bs, last_committed_es) + */ +int +event_store_recover(struct event_store *es) +{ + struct event_queue *eq_l = NULL, *eq_b = NULL; + struct server_msg *msg; + il_http_message_t hmsg; + char *event_s; + int fd, ret; + long last; + FILE *ef; + struct flock efl; + char err_msg[128]; + struct stat stbuf; + + assert(es != NULL); + +#if defined(IL_NOTIFICATIONS) + /* destination queue has to be found for each message separately */ +#else + /* find bookkepping server queue */ + eq_b = queue_list_get(es->job_id_s); + if(eq_b == NULL) + return(-1); +#endif + +#if !defined(IL_NOTIFICATIONS) + /* get log server queue */ + eq_l = queue_list_get(NULL); +#endif + + event_store_lock(es); + + il_log(LOG_DEBUG, " reading events from %s\n", es->event_file_name); + + /* open event file */ + ef = fopen(es->event_file_name, "r"); + if(ef == NULL) { + snprintf(err_msg, sizeof(err_msg), + "event_store_recover: error opening event file %s", + es->event_file_name); + set_error(IL_SYS, errno, err_msg); + event_store_unlock(es); + return(-1); + } + + /* lock the file for reading (we should not read while dglogd is writing) */ + fd = fileno(ef); + efl.l_type = F_RDLCK; + efl.l_whence = SEEK_SET; + efl.l_start = 0; + efl.l_len = 0; + if(fcntl(fd, F_SETLKW, &efl) < 0) { + snprintf(err_msg, sizeof(err_msg), + "event_store_recover: error locking event file %s", + es->event_file_name); + set_error(IL_SYS, errno, err_msg); + event_store_unlock(es); + fclose(ef); + return(-1); + } + + /* check the file modification time and size to avoid unnecessary operations */ + memset(&stbuf, 0, sizeof(stbuf)); + if(fstat(fd, &stbuf) < 0) { + il_log(LOG_ERR, " could not stat event file %s: %s\n", es->event_file_name, strerror(errno)); + fclose(ef); + event_store_unlock(es); + return -1; + } else { + if((es->offset == stbuf.st_size) && (es->last_modified == stbuf.st_mtime)) { + il_log(LOG_DEBUG, " event file not modified since last visit, skipping\n"); + fclose(ef); + event_store_unlock(es); + return(0); + } + } + + while(1) { /* try, try, try */ + + /* get the position in file to be sought */ + if(es->offset) + last = es->offset; + else { + last = es->last_committed_bs; + } + + il_log(LOG_DEBUG, " setting starting file position to %ld\n", last); + il_log(LOG_DEBUG, " bytes sent to destination: %d\n", es->last_committed_bs); + + if(last > 0) { + int c; + + /* skip all committed or already enqueued events */ + /* be careful - check, if the offset really points to the + beginning of event string */ + if(fseek(ef, last - 1, SEEK_SET) < 0) { + set_error(IL_SYS, errno, "event_store_recover: error setting position for read"); + event_store_unlock(es); + fclose(ef); + return(-1); + } + /* the last enqueued event MUST end with \n */ + if((c=fgetc(ef)) != '\n') { + /* Houston, we have got a problem */ + il_log(LOG_WARNING, + " file position %ld does not point at the beginning of event string, backing off!\n", + last); + /* now, where were we? */ + if(es->offset) { + /* next try will be with + last_commited_bs */ + es->offset = 0; + } else { + /* this is really weird... back off completely */ + es->last_committed_ls = es->last_committed_bs = 0; + } + } else { + /* OK, break out of the loop */ + break; + } + } else { + /* this breaks out of the loop, we are starting at + * the beginning of file + */ + if(fseek(ef, 0, SEEK_SET) < 0) { + set_error(IL_SYS, errno, "event_store_recover: error setting position for read"); + event_store_unlock(es); + fclose(ef); + return(-1); + } + break; + } + } + + /* enqueue all remaining events */ + ret = 1; + msg = NULL; + while(read_event_string(ef, &hmsg) >= 0) { + + /* last holds the starting position of event_s in file */ + il_log(LOG_DEBUG, " reading event at %ld\n", last); + + /* break from now on means there was some error */ + ret = -1; + + /* create message for server */ + msg = server_msg_create((il_octet_string_t*)&hmsg, last); + if(msg == NULL) { + il_log(LOG_ALERT, " event file corrupted! I will try to move it to quarantine (ie. rename it).\n"); + /* actually do not bother if quarantine succeeded or not - we could not do more */ + event_store_quarantine(es); + fclose(ef); + event_store_unlock(es); + return(-1); + } + msg->es = es; + + /* first enqueue to the LS */ + if(!bs_only && (last >= es->last_committed_ls)) { + + il_log(LOG_DEBUG, " queueing event at %ld to server %s\n", last, eq_l->dest_name); + +#if !defined(IL_NOTIFICATIONS) + if(enqueue_msg(eq_l, msg) < 0) + break; +#endif + } + +#ifdef IL_NOTIFICATIONS + eq_b = queue_list_get(msg->dest); +#endif + + /* now enqueue to the BS, if neccessary */ + if((eq_b != eq_l) && + (last >= es->last_committed_bs)) { + + il_log(LOG_DEBUG, " queueing event at %ld to server %s\n", last, eq_b->dest_name); + + if(enqueue_msg(eq_b, msg) < 0) + break; + } + server_msg_free(msg); + msg = NULL; + + /* now last is also the offset behind the last successfully queued event */ + last = ftell(ef); + + /* ret == 0 means EOF or incomplete event found */ + ret = 0; + + } /* while */ + + /* due to this little assignment we had to lock the event_store for writing */ + es->offset = last; + es->last_modified = stbuf.st_mtime; + il_log(LOG_DEBUG, " event store offset set to %ld\n", last); + + if(msg) + server_msg_free(msg); + + fclose(ef); + il_log(LOG_DEBUG, " finished reading events with %d\n", ret); + + event_store_unlock(es); + return(ret); +} + + +/* + * event_store_sync() + * - check the position of event and fill holes from file + * - return 1 if the event is new, + * 0 if it was seen before, + * -1 if there was an error + */ +int +event_store_sync(struct event_store *es, long offset) +{ + int ret; + + assert(es != NULL); + + /* all events are actually read from file, the event on socket + * is ignored and serves just to notify us about file change + */ + ret = event_store_recover(es); + ret = (ret < 0) ? ret : 0; + return(ret); +} + + +int +event_store_next(struct event_store *es, long offset, int len) +{ + assert(es != NULL); + + /* offsets are good only to detect losses (differences between socket and file), + which is not possible now */ + return 0; +} + + +/* + * event_store_commit() + * + */ +int +event_store_commit(struct event_store *es, int len, int ls) +{ + assert(es != NULL); + + event_store_lock(es); + + if(ls) + es->last_committed_ls += len; + else { + es->last_committed_bs += len; + if (bs_only) es->last_committed_ls += len; + } + + if(event_store_write_ctl(es) < 0) { + event_store_unlock(es); + return(-1); + } + + event_store_unlock(es); + + + return(0); +} + + +/* + * event_store_clean() + * - remove the event files (event and ctl), if they are not needed anymore + * - returns 0 if event_store is in use, 1 if it was removed and -1 on error + * + * Q: How do we know that we can safely remove the files? + * A: When all events from file have been committed both by LS and BS. + */ +static +int +event_store_clean(struct event_store *es) +{ + long last; + int fd; + FILE *ef; + struct flock efl; + + assert(es != NULL); + + /* prevent sender threads from updating */ + event_store_lock(es); + + il_log(LOG_DEBUG, " trying to cleanup event store %s\n", es->job_id_s); + il_log(LOG_DEBUG, " bytes sent to logging server: %d\n", es->last_committed_ls); + il_log(LOG_DEBUG, " bytes sent to bookkeeping server: %d\n", es->last_committed_bs); + + /* preliminary check to avoid opening event file */ + /* if the positions differ, some events still have to be sent */ + if(es->last_committed_ls != es->last_committed_bs) { + event_store_unlock(es); + il_log(LOG_DEBUG, " not all events sent, cleanup aborted\n"); + return(0); + } + + /* the file can only be removed when all the events were succesfully sent + (ie. committed both by LS and BS */ + /* That also implies that the event queues are 'empty' at the moment. */ + ef = fopen(es->event_file_name, "r+"); + if(ef == NULL) { + /* if we can not open the event store, it is an error and the struct should be removed */ + /* XXX - is it true? */ + event_store_unlock(es); + il_log(LOG_ERR, " event_store_clean: error opening event file: %s\n", strerror(errno)); + return(1); + } + + fd = fileno(ef); + + /* prevent local-logger from writing into event file */ + efl.l_type = F_WRLCK; + efl.l_whence = SEEK_SET; + efl.l_start = 0; + efl.l_len = 0; + if(fcntl(fd, F_SETLK, &efl) < 0) { + il_log(LOG_DEBUG, " could not lock event file, cleanup aborted\n"); + fclose(ef); + event_store_unlock(es); + if(errno != EACCES && + errno != EAGAIN) { + set_error(IL_SYS, errno, "event_store_clean: error locking event file"); + return(-1); + } + return(0); + } + + /* now the file should not contain partially written event, so it is safe + to get offset behind last event by seeking the end of file */ + if(fseek(ef, 0, SEEK_END) < 0) { + set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file"); + event_store_unlock(es); + fclose(ef); + return(-1); + } + + last = ftell(ef); + il_log(LOG_DEBUG, " total bytes in file: %d\n", last); + + if(es->last_committed_ls < last) { + fclose(ef); + event_store_unlock(es); + il_log(LOG_DEBUG, " events still waiting in queue, cleanup aborted\n"); + return(0); + } else if( es->last_committed_ls > last) { + il_log(LOG_WARNING, " warning: event file seems to shrink!\n"); + /* XXX - in that case we can not continue because there may be + some undelivered events referring to that event store */ + fclose(ef); + event_store_unlock(es); + return(0); + } + + /* now we are sure that all events were sent and the event queues are empty */ + il_log(LOG_INFO, " removing event file %s\n", es->event_file_name); + + /* remove the event file */ + unlink(es->event_file_name); + unlink(es->control_file_name); + + /* clear the counters */ + es->last_committed_ls = 0; + es->last_committed_bs = 0; + es->offset = 0; + + /* unlock the event_store even if it is going to be removed */ + event_store_unlock(es); + + /* close the event file (that unlocks it as well) */ + fclose(ef); + + /* indicate that it is safe to remove this event_store */ + return(1); +} + + +/* -------------------------------- + * event store management functions + * -------------------------------- + */ +struct event_store * +event_store_find(char *job_id_s) +{ + struct event_store_list *q, *p; + struct event_store *es; + + if(pthread_rwlock_wrlock(&store_list_lock)) { + abort(); + } + + es = NULL; + + q = NULL; + p = store_list; + + while(p) { + if(strcmp(p->es->job_id_s, job_id_s) == 0) { + es = p->es; + if(pthread_rwlock_rdlock(&es->use_lock)) + abort(); + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + return(es); + } + + q = p; + p = p->next; + } + + es = event_store_create(job_id_s); + if(es == NULL) { + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + return(NULL); + } + + p = malloc(sizeof(*p)); + if(p == NULL) { + set_error(IL_NOMEM, ENOMEM, "event_store_find: no room for new event store"); + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + return(NULL); + } + + p->next = store_list; + store_list = p; + + p->es = es; + + if(pthread_rwlock_rdlock(&es->use_lock)) + abort(); + + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + + return(es); +} + + +int +event_store_release(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_unlock(&es->use_lock)) + abort(); + il_log(LOG_DEBUG, " released lock on %s\n", es->job_id_s); + return(0); +} + + +event_store_from_file(char *filename) +{ + struct event_store *es; + FILE *event_file; + char *job_id_s = NULL, *p; + il_http_message_t hmsg; + int ret; + + il_log(LOG_INFO, " attaching to event file: %s\n", filename); + + if(strstr(filename, "quarantine") != NULL) { + il_log(LOG_INFO, " file name belongs to quarantine, not touching that.\n"); + return(0); + } + + event_file = fopen(filename, "r"); + if(event_file == NULL) { + set_error(IL_SYS, errno, "event_store_from_file: error opening event file"); + return(-1); + } + ret = read_event_string(event_file, &hmsg); + fclose(event_file); + if(ret < 0) + return(0); + + /* get id aka dest */ + job_id_s = hmsg.host; + + il_log(LOG_DEBUG, " message dest: '%s'\n", job_id_s); + if(job_id_s == NULL) { + il_log(LOG_NOTICE, " skipping file, could not parse event\n"); + ret = 0; + goto out; + } + + es=event_store_find(job_id_s); + + if(es == NULL) { + ret = -1; + goto out; + } + + if((es->last_committed_ls == 0) && + (es->last_committed_bs == 0) && + (es->offset == 0)) { + ret = event_store_read_ctl(es); + } else + ret = 0; + + event_store_release(es); + +out: + if(hmsg.data) free(hmsg.data); + if(job_id_s) free(job_id_s); + return(ret); +} + + +int +event_store_init(char *prefix) +{ + if(file_prefix == NULL) { + file_prefix = strdup(prefix); + store_list = NULL; + } + + /* read directory and get a list of event files */ + { + int len; + + char *p, *dir; + DIR *event_dir; + struct dirent *entry; + + + /* get directory name */ + p = strrchr(file_prefix, '/'); + if(p == NULL) { + dir = strdup("."); + p = ""; + len = 0; + } else { + *p = '\0'; + dir = strdup(file_prefix); + *p++ = '/'; + len = strlen(p); + } + + event_dir = opendir(dir); + if(event_dir == NULL) { + free(dir); + set_error(IL_SYS, errno, "event_store_init: error opening event directory"); + return(-1); + } + + while((entry=readdir(event_dir))) { + char *s; + + /* skip all files that do not match prefix */ + if(strncmp(entry->d_name, p, len) != 0) + continue; + + /* skip all control files */ + if((s=strstr(entry->d_name, ".ctl")) != NULL && + s[4] == '\0') + continue; + + s = malloc(strlen(dir) + strlen(entry->d_name) + 2); + if(s == NULL) { + free(dir); + set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for file name"); + return(-1); + } + + *s = '\0'; + strcat(s, dir); + strcat(s, "/"); + strcat(s, entry->d_name); + + if(event_store_from_file(s) < 0) { + free(dir); + free(s); + closedir(event_dir); + return(-1); + } + + free(s); + } + closedir(event_dir); + + /* one more pass - this time remove stale .ctl files */ + event_dir = opendir(dir); + if(event_dir == NULL) { + free(dir); + set_error(IL_SYS, errno, "event_store_init: error opening event directory"); + return(-1); + } + + while((entry=readdir(event_dir))) { + char *s; + + /* skip all files that do not match prefix */ + if(strncmp(entry->d_name, p, len) != 0) + continue; + + /* find all control files */ + if((s=strstr(entry->d_name, ".ctl")) != NULL && + s[4] == '\0') { + char *ef; + struct stat st; + + /* is there corresponding event file? */ + ef = malloc(strlen(dir) + strlen(entry->d_name) + 2); + if(ef == NULL) { + free(dir); + set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for event file name"); + return(-1); + } + + s[0] = 0; + *ef = '\0'; + strcat(ef, dir); + strcat(ef, "/"); + strcat(ef, entry->d_name); + s[0] = '.'; + + if(stat(ef, &st) == 0) { + /* something is there */ + /* XXX - it could be something else than event file, but do not bother now */ + } else { + /* could not stat file, remove ctl */ + strcat(ef, s); + il_log(LOG_DEBUG, " removing stale file %s\n", ef); + if(unlink(ef)) + il_log(LOG_ERR, " could not remove file %s: %s\n", ef, strerror(errno)); + + } + free(ef); + + } + } + closedir(event_dir); + free(dir); + } + + return(0); +} + + +int +event_store_recover_all() +{ + struct event_store_list *sl; + + + if(pthread_rwlock_rdlock(&store_list_lock)) + abort(); + + /* recover all event stores */ + sl = store_list; + while(sl != NULL) { + + /* recover this event store */ + /* no need to lock use_lock in event_store, the store_list_lock is in place */ + if(event_store_recover(sl->es) < 0) { + il_log(LOG_ERR, " error recovering event store %s:\n %s\n", sl->es->event_file_name, error_get_msg()); + clear_error(); + } + sl = sl->next; + } + + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + + return(0); +} + + +#if 0 +int +event_store_remove(struct event_store *es) +{ + struct event_store_list *p, **q; + + assert(es != NULL); + + switch(event_store_clean(es)) { + case 0: + il_log(LOG_DEBUG, " event store not removed, still used\n"); + return(0); + + case 1: + if(pthread_rwlock_wrlock(&store_list_lock) < 0) { + set_error(IL_SYS, errno, " event_store_remove: error locking event store list"); + return(-1); + } + + p = store_list; + q = &store_list; + + while(p) { + if(p->es == es) { + (*q) = p->next; + event_store_free(es); + free(p); + break; + } + q = &(p->next); + p = p->next; + } + + if(pthread_rwlock_unlock(&store_list_lock) < 0) { + set_error(IL_SYS, errno, " event_store_remove: error unlocking event store list"); + return(-1); + } + return(1); + + default: + return(-1); + } + /* not reached */ + return(0); +} +#endif + +int +event_store_cleanup() +{ + struct event_store_list *sl; + struct event_store_list *slnext; + struct event_store_list **prev; + + /* try to remove event files */ + + if(pthread_rwlock_wrlock(&store_list_lock)) + abort(); + + sl = store_list; + prev = &store_list; + + while(sl != NULL) { + int ret; + + slnext = sl->next; + + /* one event store at time */ + ret = pthread_rwlock_trywrlock(&sl->es->use_lock); + if(ret == EBUSY) { + il_log(LOG_DEBUG, " event_store %s is in use by another thread\n", + sl->es->job_id_s); + sl = slnext; + continue; + } else if (ret < 0) + abort(); + + switch(event_store_clean(sl->es)) { + + case 1: + /* remove this event store */ + (*prev) = slnext; + event_store_free(sl->es); + free(sl); + break; + + case -1: + il_log(LOG_ERR, " error removing event store %s (file %s):\n %s\n", + sl->es->job_id_s, sl->es->event_file_name, error_get_msg()); + /* event_store_release(sl->es); */ + clear_error(); + /* go on to the next */ + + default: + event_store_release(sl->es); + prev = &(sl->next); + break; + } + + sl = slnext; + } + + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + + return(0); +} + diff --git a/org.glite.lb.logger/src/http.c b/org.glite.lb.logger/src/http.c new file mode 100644 index 0000000..c9fb89b --- /dev/null +++ b/org.glite.lb.logger/src/http.c @@ -0,0 +1,197 @@ +#ident "$Header$" + +#include +#include + +#include "interlogd.h" + + +int +parse_request(const char *s, il_http_message_t *msg) +{ + if(!strncasecmp(s, "HTTP", 4)) { + msg->msg_type = IL_HTTP_REPLY; + } else if(!strncasecmp(s, "POST", 4)) { + msg->msg_type = IL_HTTP_POST; + } else if(!strncasecmp(s, "GET", 3)) { + msg->msg_type = IL_HTTP_GET; + } else { + msg->msg_type = IL_HTTP_OTHER; + } + if(msg->msg_type == IL_HTTP_REPLY) { + char *p = strchr(s, ' '); + + if(!p) goto parse_end; + p++; + msg->reply_code=atoi(p); + p = strchr(p, ' '); + if(!p) goto parse_end; + p++; + msg->reply_string = strdup(p); + + parse_end: + ; + } +} + + +int +parse_header(const char *s, il_http_message_t *msg) +{ + if(!strncasecmp(s, "Content-Length:", 15)) { + msg->content_length = atoi(s + 15); + } else if(!strncasecmp(s, "Host:", 5)) { + const char *p = s + 4; + while(*++p == ' '); /* skip spaces */ + msg->host = strdup(p); + } + return(0); +} + + +#define DEFAULT_CHUNK_SIZE 1024 + +// read what is available and parse what can be parsed +// returns the result of read operation of the underlying connection, +// ie. the number of bytes read or error code +int +receive_http(void *user_data, int (*reader)(void *, char *, const int), il_http_message_t *msg) +{ + static enum { NONE, IN_REQUEST, IN_HEADERS, IN_BODY } state = NONE; + int len, alen, clen, i, buffer_free, min_buffer_free = DEFAULT_CHUNK_SIZE; + char *buffer, *p, *s, *cr; + + memset(msg, 0, sizeof(*msg)); + // msg->data = NULL; + // msg->len = 0; + state = IN_REQUEST; + alen = 0; + buffer = NULL; + buffer_free = 0; + p = NULL; + s = NULL; + + do { + /* p - first empty position in buffer + alen - size of allocated buffer + len - number of bytes received in last read + s - points behind last scanned CRLF or at buffer start + buffer_free = alen - (p - buffer) + */ + + /* prepare at least chunk_size bytes for next data */ + if(buffer_free < min_buffer_free) { + char *n; + + alen += min_buffer_free; + n = realloc(buffer, alen); + if(n == NULL) { + free(buffer); + set_error(IL_NOMEM, ENOMEM, "read_event: no room for event"); + return(-1); + } + buffer_free += min_buffer_free; + p = n + (p - buffer); + s = n + (s - buffer); + buffer = n; + } + + if(buffer_free > 0) { + len = (*reader)(user_data, p, buffer_free); + if(len < 0) { + // error + free(buffer); + // set_error(IL_SYS, errno, "receive_http: error reading data"); + return -1; + } else if(len == 0) { + // EOF + free(buffer); + set_error(IL_PROTO, errno, "receive_http: error reading data - premature EOF"); + return -1; + } + buffer_free -= len; + p+= len; + } + + + switch(state) { + + // parse buffer, look for CRLFs + // s - start scan position + // p - start of current token + // cr - current CRLF position + + case IN_REQUEST: + if((s < p - 1) && + (cr = (char*)memchr(s, '\r', p - s - 1)) && + (cr[1] == '\n')) { + *cr = 0; + parse_request(s, msg); + *cr = '\r'; + // change state + state = IN_HEADERS; + // start new tokens (cr < p - 1 -> s < p + 1 <-> s <= p) + s = cr + 2; + } else { + break; + } + + case IN_HEADERS: + while((state != IN_BODY) && + (s < p - 1) && + (cr = (char*)memchr(s, '\r', p - s - 1)) && + (cr[1] == '\n')) { + if(s == cr) { /* do not consider request starting with CRLF */ + // found CRLFCRLF + state = IN_BODY; + } else { + *cr = 0; + parse_header(s, msg); + *cr = '\r'; + } + // next scan starts after CRLF + s = cr + 2; + } + if(state == IN_BODY) { + // we found body + // content-length should be set at the moment + if(msg->content_length > 0) { + int need_free = msg->content_length - (p - s); + char *n; + + alen += need_free - buffer_free + 1; + n = realloc(buffer, alen); + if(n == NULL) { + free(buffer); + set_error(IL_NOMEM, ENOMEM, "read_event: no room for event"); + return(-1); + } + buffer_free = need_free; + min_buffer_free = 0; + p = n + (p - buffer); + s = n + (s - buffer); + buffer = n; + } else { + // report error + free(buffer); + set_error(IL_PROTO, EINVAL, "receive_http: error reading data - no content length specified\n"); + return -1; + } + } + break; + + case IN_BODY: + if(buffer_free == 0) { + // finished reading + *p = 0; + state = NONE; + } + break; + } + } while(state != NONE); + + msg->data = buffer; + msg->len = p - buffer; + + return 0; +} diff --git a/org.glite.lb.logger/src/input_queue_socket_http.c b/org.glite.lb.logger/src/input_queue_socket_http.c new file mode 100644 index 0000000..939c45f --- /dev/null +++ b/org.glite.lb.logger/src/input_queue_socket_http.c @@ -0,0 +1,167 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "interlogd.h" + +static const int SOCK_QUEUE_MAX = 50; +extern char *socket_path; +extern char *file_prefix; + +static int sock; +static int accepted; + +static +int plain_reader(void *user_data, char *buffer, const int len) +{ + return (recv(*(int*)user_data, buffer, len, MSG_NOSIGNAL)); +} + + +int +input_queue_attach() +{ + struct sockaddr_un saddr; + + if((sock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { + set_error(IL_SYS, errno, "input_queue_attach: error creating socket"); + return(-1); + } + + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + strcpy(saddr.sun_path, socket_path); + + /* test for the presence of the socket and another instance + of interlogger listening */ + if(connect(sock, (struct sockaddr *)&saddr, sizeof(saddr.sun_path)) < 0) { + if(errno == ECONNREFUSED) { + /* socket present, but no one at the other end; remove it */ + il_log(LOG_WARNING, " removing stale input socket %s\n", socket_path); + unlink(socket_path); + } + /* ignore other errors for now */ + } else { + /* connection was successful, so bail out - there is + another interlogger running */ + set_error(IL_SYS, EADDRINUSE, "input_queue_attach: another instance of interlogger is running"); + return(-1); + } + + if(bind(sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { + set_error(IL_SYS, errno, "input_queue_attach: error binding socket"); + return(-1); + } + + if (listen(sock, SOCK_QUEUE_MAX)) { + set_error(IL_SYS, errno, "input_queue_attach: error listening on socket"); + return -1; + } + + return(0); +} + + +void input_queue_detach() +{ + if (sock >= 0) + close(sock); + unlink(socket_path); +} + + + +/* + * Returns: -1 on error, 0 if no message available, message length otherwise + * + */ +#ifdef PERF_EVENTS_INLINE +int +input_queue_get(il_octet_string_t **buffer, long *offset, int timeout) +{ + static long o = 0; + int len; + char *jobid; + static il_octet_string_t my_buffer; + + assert(buffer != NULL); + + *buffer = &my_buffer; + + len = glite_wll_perftest_produceEventString(&my_buffer.data, &jobid); + my_buffer.len = len; + if(len) { + o += len; + *offset = o; + } else if (len == 0) { + sleep(timeout); + } + return(len); +} +#else +int +input_queue_get(il_octet_string_t **buffer, long *offset, int timeout) +{ + fd_set fds; + struct timeval tv; + int msg_len; + static il_http_message_t msg; + + assert(buffer != NULL); + + *buffer = (il_octet_string_t *)&msg; + + FD_ZERO(&fds); + FD_SET(sock, &fds); + + tv.tv_sec = timeout; + tv.tv_usec = 0; + + msg_len = select(sock + 1, &fds, NULL, NULL, timeout >= 0 ? &tv : NULL); + switch(msg_len) { + + case 0: /* timeout */ + return(0); + + case -1: /* error */ + switch(errno) { + case EINTR: + il_log(LOG_DEBUG, " interrupted while waiting for event!\n"); + return(0); + + default: + set_error(IL_SYS, errno, "input_queue_get: error waiting for event"); + return(-1); + } + default: + break; + } + + if((accepted=accept(sock, NULL, NULL)) < 0) { + set_error(IL_SYS, errno, "input_queue_get: error accepting connection"); + return(-1); + } + + msg_len = receive_http(&accepted, plain_reader, &msg); + + if(msg_len < 0) { + close(accepted); + if(error_get_maj() != IL_OK) + return -1; + else + return 0; + } + + close(accepted); + *offset = -1; + return(msg.len); +} +#endif + diff --git a/org.glite.lb.logger/src/queue_mgr_http.c b/org.glite.lb.logger/src/queue_mgr_http.c new file mode 100644 index 0000000..777e620 --- /dev/null +++ b/org.glite.lb.logger/src/queue_mgr_http.c @@ -0,0 +1,164 @@ +#ident "$Header$" + +#include +#include +#include + +#include "glite/jobid/cjobid.h" +#include "glite/lb/context.h" + +#include "interlogd.h" + +struct queue_list { + struct event_queue *queue; + char *dest; + struct queue_list *next; + time_t expires; +}; + +static struct event_queue *log_queue; +static struct queue_list *queues; + + +static +int +queue_list_create() +{ + queues = NULL; + + return(0); +} + + +static +int +queue_list_find(struct queue_list *ql, const char *dest, struct queue_list **el, struct queue_list **prev) +{ + struct queue_list *q, *p; + + assert(el != NULL); + + *el = NULL; + if(prev) + *prev = NULL; + + if(ql == NULL) + return(0); + + q = NULL; + p = ql; + + while(p) { + if(strcmp(p->dest, dest) == 0) { + *el = p; + if(prev) + *prev = q; + return(1); + } + + q = p; + p = p->next; + }; + + return(0); +} + + +static +int +queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq) +{ + struct queue_list *el; + + assert(dest != NULL); + assert(eq != NULL); + assert(ql != NULL); + + el = malloc(sizeof(*el)); + if(el == NULL) { + set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough room for new queue"); + return(-1); + } + + el->dest = strdup(dest); + if(el->dest == NULL) { + free(el); + set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough memory for new queue"); + return(-1); + } + el->queue = eq; + el->next = *ql; + *ql = el; + return 0; +} + + +struct event_queue * +queue_list_get(char *job_id_s) +{ + char *dest; + struct queue_list *q; + struct event_queue *eq; + dest = job_id_s; + + if(dest == NULL) + return(NULL); + + if(queue_list_find(queues, dest, &q, NULL)) { + return(q->queue); + } else { + eq = event_queue_create(dest); + if(eq) + queue_list_add(&queues, dest, eq); + return(eq); + } +} + + +int +queue_list_is_log(struct event_queue *eq) +{ + return(eq == queue_list_get(NULL)); +} + + +int +queue_list_init(char *ls) +{ + return(queue_list_create()); +} + + +static struct queue_list *current; + + +struct event_queue * +queue_list_first() +{ + current = queues; + return(current ? current->queue : NULL); +} + + +struct event_queue * +queue_list_next() +{ + current = current ? current->next : NULL; + return(current ? current->queue : NULL); +} + + +int +queue_list_remove_queue(struct event_queue *eq) +{ + assert(eq != NULL); + + free(eq); + return(1); +} + + + +/* Local Variables: */ +/* c-indentation-style: gnu */ +/* End: */ diff --git a/org.glite.lb.logger/src/send_event_http.c b/org.glite.lb.logger/src/send_event_http.c new file mode 100644 index 0000000..3c90562 --- /dev/null +++ b/org.glite.lb.logger/src/send_event_http.c @@ -0,0 +1,282 @@ +#ident "$Header$" + +#include +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#include +#include +#include +#include + + +/* + * - L/B server protocol handling routines + */ + +#include "glite/jobid/cjobid.h" +#include "glite/lb/il_string.h" +#include "glite/lb/context.h" + +#include "interlogd.h" + +struct reader_data { + edg_wll_GssConnection *gss; + struct timeval *timeout; +}; + + +static +int +gss_reader(void *user_data, char *buffer, int max_len) +{ + int ret; + struct reader_data *data = (struct reader_data *)user_data; + edg_wll_GssStatus gss_stat; + + ret = edg_wll_gss_read(data->gss, buffer, max_len, data->timeout, &gss_stat); + if(ret < 0) { + char *gss_err = NULL; + + if(ret == EDG_WLL_GSS_ERROR_GSS) { + edg_wll_gss_get_error(&gss_stat, "get_reply", &gss_err); + set_error(IL_DGGSS, ret, gss_err); + free(gss_err); + } else + set_error(IL_DGGSS, ret, "get_reply"); + } + return(ret); +} + + +/* + * Read reply from server. + * Returns: -1 - error reading message, + * code > 0 - http status code from server + */ +static +int +get_reply(struct event_queue *eq, char **buf, int *code_min) +{ + int ret, code; + int len; + struct timeval tv; + struct reader_data data; + il_http_message_t msg; + + tv.tv_sec = TIMEOUT; + tv.tv_usec = 0; + data.gss = &eq->gss; + data.timeout = &tv; + len = receive_http(&data, gss_reader, &msg); + if(len < 0) { + set_error(IL_PROTO, LB_PROTO, "get_reply: error reading server reply"); + return(-1); + } + if(msg.data) free(msg.data); + if(msg.reply_string) *buf = msg.reply_string; + *code_min = 0; /* XXX fill in flag for fault */ + return(msg.reply_code); +} + + + +/* + * Returns: 0 - not connected, timeout set, 1 - OK + */ +int +event_queue_connect(struct event_queue *eq) +{ + int ret; + struct timeval tv; + edg_wll_GssStatus gss_stat; + cred_handle_t *local_cred_handle; + + assert(eq != NULL); + +#ifdef LB_PERF + if(!nosend) { +#endif + + if(eq->gss.context == NULL) { + + tv.tv_sec = TIMEOUT; + tv.tv_usec = 0; + + /* get pointer to the credentials */ + if(pthread_mutex_lock(&cred_handle_lock) < 0) + abort(); + local_cred_handle = cred_handle; + local_cred_handle->counter++; + if(pthread_mutex_unlock(&cred_handle_lock) < 0) + abort(); + + il_log(LOG_DEBUG, " trying to connect to %s:%d\n", eq->dest_name, eq->dest_port); + ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &eq->gss, &gss_stat); + if(pthread_mutex_lock(&cred_handle_lock) < 0) + abort(); + /* check if we need to release the credentials */ + --local_cred_handle->counter; + if(local_cred_handle != cred_handle && local_cred_handle->counter == 0) { + edg_wll_gss_release_cred(&local_cred_handle->creds, NULL); + free(local_cred_handle); + il_log(LOG_DEBUG, " freed credentials, not used anymore\n"); + } + if(pthread_mutex_unlock(&cred_handle_lock) < 0) + abort(); + + if(ret < 0) { + char *gss_err = NULL; + + if (ret == EDG_WLL_GSS_ERROR_GSS) + edg_wll_gss_get_error(&gss_stat, "event_queue_connect: edg_wll_gss_connect", &gss_err); + set_error(IL_DGGSS, ret, + (ret == EDG_WLL_GSS_ERROR_GSS) ? gss_err : "event_queue_connect: edg_wll_gss_connect"); + if (gss_err) free(gss_err); + eq->gss.context = NULL; + eq->timeout = TIMEOUT; + return(0); + } + } + +#ifdef LB_PERF + } +#endif + + return(1); +} + + +int +event_queue_close(struct event_queue *eq) +{ + assert(eq != NULL); + +#ifdef LB_PERF + if(!nosend) { +#endif + + if(eq->gss.context != NULL) { + edg_wll_gss_close(&eq->gss, NULL); + eq->gss.context = NULL; + } +#ifdef LB_PERF + } +#endif + return(0); +} + + +/* + * Send all events from the queue. + * Returns: -1 - system error, 0 - not sent, 1 - queue empty + */ +int +event_queue_send(struct event_queue *eq) +{ + int events_sent = 0; + assert(eq != NULL); + +#ifdef LB_PERF + if(!nosend) { +#endif + if(eq->gss.context == NULL) + return(0); +#ifdef LB_PERF + } +#endif + + /* feed the server with events */ + while (!event_queue_empty(eq)) { + struct server_msg *msg; + char *rep; + int ret, code, code_min; + size_t bytes_sent; + struct timeval tv; + edg_wll_GssStatus gss_stat; + + clear_error(); + + if(event_queue_get(eq, &msg) < 0) + return(-1); + + il_log(LOG_DEBUG, " trying to deliver event at offset %d for job %s\n", msg->offset, msg->job_id_s); + +#ifdef LB_PERF + if(!nosend) { +#endif + /* XXX: ljocha -- does it make sense to send empty messages ? */ + if (msg->len) { + tv.tv_sec = TIMEOUT; + tv.tv_usec = 0; + ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat); + if(ret < 0) { + if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && events_sent > 0) { + eq->timeout = 0; + } else { + il_log(LOG_ERR, "send_event: %s\n", error_get_msg()); + eq->timeout = TIMEOUT; + } + return(0); + } + if((code = get_reply(eq, &rep, &code_min)) < 0) { + /* could not get the reply properly, so try again later */ + if (events_sent>0) + eq->timeout = 1; + else { + eq->timeout = TIMEOUT; + il_log(LOG_ERR, " error reading server %s reply:\n %s\n", eq->dest_name, error_get_msg()); + } + return(0); + } + } + else { code = 200; code_min = 0; rep = strdup("not sending empty message"); } +#ifdef LB_PERF + } else { + glite_wll_perftest_consumeEventIlMsg(msg->msg+17); + code = 200; + rep = strdup("OK"); + } +#endif + + il_log(LOG_DEBUG, " event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep); + free(rep); + + /* the reply is back here, decide what to do with message */ + /* HTTP error codes: + 1xx - informational (eg. 100 Continue) + 2xx - successful (eg. 200 OK) + 3xx - redirection (eg. 301 Moved Permanently) + 4xx - client error (eq. 400 Bad Request) + 5xx - server error (eq. 500 Internal Server Error) + */ + if(code >= 100 && code < 200) { + + /* non fatal errors (for us), try to deliver later */ + eq->timeout = TIMEOUT; + return(0); + } + + /* the message was consumed (successfully or not) */ + /* update the event pointer */ + if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq)) < 0) + /* failure committing message, this is bad */ + return(-1); + + event_queue_remove(eq); + events_sent++; + } /* while */ + + return(1); + +} /* send_events */ + + +/* this is just not used */ +int +send_confirmation(long lllid, int code) +{ + return 0; +} diff --git a/org.glite.lb.logger/src/server_msg_http.c b/org.glite.lb.logger/src/server_msg_http.c new file mode 100644 index 0000000..8bd3623 --- /dev/null +++ b/org.glite.lb.logger/src/server_msg_http.c @@ -0,0 +1,128 @@ +#ident "$Header$" + +#include +#include +#include + +#include "interlogd.h" +#include "glite/lb/il_msg.h" +#include "glite/lb/events_parse.h" +#include "glite/lb/context.h" + +static +int +create_msg(il_http_message_t *ev, char **buffer, long *receipt, time_t *expires) +{ + char *event = ev->data; + + *receipt = 0; + *expires = 0; + + *buffer = ev->data; + return ev->len;; +} + + +struct server_msg * +server_msg_create(il_octet_string_t *event, long offset) +{ + struct server_msg *msg; + + msg = malloc(sizeof(*msg)); + if(msg == NULL) { + set_error(IL_NOMEM, ENOMEM, "server_msg_create: out of memory allocating message"); + return(NULL); + } + + if(server_msg_init(msg, event) < 0) { + server_msg_free(msg); + return(NULL); + } + msg->offset = offset; + + return(msg); +} + + +struct server_msg * +server_msg_copy(struct server_msg *src) +{ + struct server_msg *msg; + + msg = malloc(sizeof(*msg)); + if(msg == NULL) { + set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating message"); + return(NULL); + } + + msg->msg = malloc(src->len); + if(msg->msg == NULL) { + set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating server message"); + server_msg_free(msg); + return(NULL); + } + msg->len = src->len; + memcpy(msg->msg, src->msg, src->len); + + msg->job_id_s = strdup(src->job_id_s); + msg->ev_len = src->ev_len; + msg->es = src->es; + msg->receipt_to = src->receipt_to; + msg->offset = src->offset; +#if defined(IL_NOTIFICATIONS) + msg->dest_name = strdup(src->dest_name); + msg->dest_port = src->dest_port; + msg->dest = strdup(src->dest); +#endif + msg->expires = src->expires; + return(msg); +} + + +int +server_msg_init(struct server_msg *msg, il_octet_string_t *event) +{ + il_http_message_t *hmsg = (il_http_message_t *)event; + + assert(msg != NULL); + assert(event != NULL); + + memset(msg, 0, sizeof(*msg)); + + + msg->job_id_s = hmsg->host; + if(msg->job_id_s == NULL) { + set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "server_msg_init: error getting id"); + return -1; + } + msg->len = create_msg(hmsg, &msg->msg, &msg->receipt_to, &msg->expires); + if(msg->len < 0) + return -1; + /* set this to indicate new data owner */ + hmsg->data = NULL; + hmsg->host = NULL; + msg->ev_len = hmsg->len + 1; /* must add separator size too */ + return 0; + +} + + +int +server_msg_is_priority(struct server_msg *msg) +{ + assert(msg != NULL); + + return(msg->receipt_to != 0); +} + + +int +server_msg_free(struct server_msg *msg) +{ + assert(msg != NULL); + + if(msg->msg) free(msg->msg); + if(msg->job_id_s) free(msg->job_id_s); + free(msg); + return 0; +}