From 53b7f301af334b2157eb0744b732ad94586f5034 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Thu, 7 Sep 2006 14:44:19 +0000 Subject: [PATCH] First shot of provenance challenge examples. --- org.glite.jp.index/examples/pch06/pch.pm | 205 ++++++++++++++++++++++++++++ org.glite.jp.index/examples/pch06/query1.pl | 102 ++++++++++++++ 2 files changed, 307 insertions(+) create mode 100644 org.glite.jp.index/examples/pch06/pch.pm create mode 100644 org.glite.jp.index/examples/pch06/query1.pl diff --git a/org.glite.jp.index/examples/pch06/pch.pm b/org.glite.jp.index/examples/pch06/pch.pm new file mode 100644 index 0000000..2e054ab --- /dev/null +++ b/org.glite.jp.index/examples/pch06/pch.pm @@ -0,0 +1,205 @@ +# +# Job Provenance queries wrapper (Primary and Index queries) +# +# $debug - trace calls +# $err - error status from last query +# + +package pch; + +use strict; +use warnings; +use XML::Twig; +#use Data::Dumper; + +our $lbattr='http://egee.cesnet.cz/en/Schema/LB/Attributes'; +our $jpsys='http://egee.cesnet.cz/en/Schema/JP/System'; +our $jpwf='http://egee.cesnet.cz/en/Schema/JP/Workflow'; +our $jplbtag='http://egee.cesnet.cz/en/WSDL/jp-lbtag'; + +our $debug = 0; +our $err = 0; + +my $jpis_client_program = "./glite-jpis-client"; +my $jpps_client_program = "./glite-jp-primary-test"; +my @default_is_attributes = ( + "http://egee.cesnet.cz/en/Schema/JP/System:owner", + "http://egee.cesnet.cz/en/Schema/JP/System:jobId", + "http://egee.cesnet.cz/en/Schema/LB/Attributes:finalStatus", + "http://egee.cesnet.cz/en/Schema/LB/Attributes:user", + "http://egee.cesnet.cz/en/WSDL/jp-lbtag:IPAW_PROGRAM", + "http://egee.cesnet.cz/en/Schema/JP/Workflow:ancestor" +); +my @isquery = ( +' + +', +' +' +); + + +my @jobs; + + +# +# query to Job Provenance Index Server +# +sub isquery { + my ($server, $queries, $attributes) = @_; + my ($s, @jobs); + my $args = ''; + my @attributes; + my $fh; + + $err = 0; + if ($attributes) { @attributes = @$attributes; } + else { @attributes = @default_is_attributes; } + + $s = $isquery[0]; + foreach my $query (@$queries) { + my @query = @$query; + my $i = 1; + $s .= "\n"; + $s .= "\t$query[0]\n"; + while ($i <= $#query) { + my @record = @{$query[$i]}; + $s .= "\t\n"; + $s .= "\t\t$record[0]\n"; + $s .= "\t\t$record[1]\n"; + $s .= "\t\t$record[2]\n" if ($record[2]); + $s .= "\t\n"; + $i++; + } + $s .= "\n"; + } + + foreach my $attribute (@attributes) { + $s .= "$attribute\n"; + } + $s .= $isquery[1]; + + $args .= "-i $server " if ($server); + $args .= '-q -'; + + if ($debug) { + print STDERR "calling 'echo '$s' | $jpis_client_program $args |'\n"; + } + if (!open($fh, "echo '$s' | $jpis_client_program $args |")) { + print STDERR "Can't execute '$jpis_client_program $args'\n"; + $err = 1; + return (); + } + @jobs = parse_is($fh); +# print STDERR <$fh>; print STDERR "\n"; + close $fh; + if ($?) { + print STDERR "Error returned from $jpis_client_program $args\n"; + $err = 1; + return (); + } + + return @jobs; +} + + +sub parse_is { + my ($fh) = @_; + my $twig; + + @jobs = (); + + $twig = new XML::Twig(TwigHandlers => { jobs => \&jobs_handler }); + if (!$twig->safe_parse($fh)) { $err = 1; return (); } + else { return @jobs; } +} + + +sub jobs_handler { + my($twig, $xmljobs)= @_; + my (%attributes, $xmljobid, $xmlattribute, %job); + %attributes = (); + + $xmljobid = $xmljobs->first_child('jobid'); + die "No jobid on '".$xmljobs->text."'" if (!$xmljobid); + $job{jobid} = $xmljobid->text; + + $xmlattribute = $xmljobs->first_child('attributes'); + while ($xmlattribute) { + my @values = (); + my ($xmlname, $xmlvalue, %attribute); + %attribute = (); + + $xmlname = $xmlattribute->first_child('name'); + die "No name on '".$xmlattribute->text."'" if (!$xmlname); + $xmlvalue = $xmlattribute->first_child('value'); + while ($xmlvalue) { + push @values, $xmlvalue->text; + $xmlvalue = $xmlvalue->next_sibling('value'); + } + $attribute{value} = \@values; + $attribute{timestamp} = $xmlattribute->first_child('timestamp')->text; + $xmlattribute = $xmlattribute->next_sibling('attributes'); + + $attributes{$xmlname->text} = \%attribute; + } + $job{attributes} = \%attributes; + + push @jobs, \%job; +} + +# +# query to Job Provenance Primary Storage +# ==> array of string +# +sub psquery { + my ($server, $jobid, $attribute) = @_; + my $args = ''; + my @attrs = (); + my $fh; + + $err = 0; + $args .= "-s $server " if ($server); + $args .= "GetJobAttr $jobid $attribute"; + if ($debug) { + print STDERR "calling '$jpps_client_program $args |'\n"; + } + if (!open($fh, "$jpps_client_program $args |")) { + print STDERR "Can't execute '$jpps_client_program $args'\n"; + $err = 1; + return (); + } + @attrs = parse_ps($fh); + close $fh; + if ($?) { + print STDERR "Error returned from $jpps_client_program $args\n"; + $err = 1; + return (); + } + + return @attrs; +} + + +sub parse_ps { + my ($fh) = @_; + my @attrs = (); + my $attr; + + while (<$fh>) { + chomp; + next if (!$_); + next if (/^OK$/); + next if (/^Attribute values:$/); +# print STDERR "$_\n"; + $attr = $_; + $attr =~ s/\t*//; + $attr =~ s/\t.*//; + push @attrs, $attr; + } + + return @attrs; +} + + +1; diff --git a/org.glite.jp.index/examples/pch06/query1.pl b/org.glite.jp.index/examples/pch06/query1.pl new file mode 100644 index 0000000..a2c30a8 --- /dev/null +++ b/org.glite.jp.index/examples/pch06/query1.pl @@ -0,0 +1,102 @@ +#! /usr/bin/perl + +# +# first query implementation +# call: +# ./query1.pl OUTPUT_FILE_NAME 2>/dev/null +# + +use strict; +use pch; +use Data::Dumper; + +my $ps='https://skurut1.cesnet.cz:8901'; +my $is='https://skurut1.cesnet.cz:8902'; + +my @according_jobs = (); # sequencially jobid list +my %according_jobs = (); # hash jobid list +my $according_count = 0; +my $output; + + +if ($#ARGV + 1 != 1) { + print STDERR "Usage: $0 OUTPUT_FILE\n"; + exit 1 +} +$output = $ARGV[0]; + +# debug calls +$pch::debug = 1; +my $debug = 0; + +# +# find out processes with given output +# +my @jobs = pch::isquery($is, [ + ["$pch::jplbtag:IPAW_OUTPUT", ['EQUAL', "$output"]], +], ["$pch::jpsys:jobId", "$pch::jpwf:ancestor"]); +print Dumper(@jobs) if ($debug); +die "...so exit on error" if ($pch::err); + +# +# initial set from index server +# +foreach my $job (@jobs) { + my %job = %$job; + my %attributes = %{$job{attributes}}; + + if (!exists $according_jobs{$job{jobid}}) { + push @according_jobs, $job{jobid}; + $according_jobs{$job{jobid}} = 1; + } +} +undef @jobs; + + +# +# collect all jobids (tree browsing) +# +# better implementation will be: using children attribute on LB:parent +# +$according_count = 0; +foreach my $jobid (@according_jobs) { + my @attrs; + + print "Handling $jobid (position $according_count)\n" if ($debug); + @attrs = pch::psquery($ps, $jobid, "$pch::jpwf:ancestor"); + + for my $anc_jobid (@attrs) { + print "Considered: $anc_jobid\n" if ($debug); + if (!exists $according_jobs{$anc_jobid}) { + $according_jobs{$anc_jobid} = 1; + push @according_jobs, $anc_jobid; + print "Added $anc_jobid to $#according_jobs\n" if ($debug); + } + else { + print "Already existing $anc_jobid\n" if ($debug); + } + } + $according_count++; +} + + +# +# queries on result set +# +print "Results\n"; +print "=======\n"; +print "\n"; +foreach my $jobid (keys %according_jobs) { + print "jobid $jobid:\n"; + + # query & output all desired atributes + foreach my $attr ("$pch::jplbtag:IPAW_STAGE", "$pch::jplbtag:IPAW_PROGRAM", "$pch::jplbtag:IPAW_PARAM", "$pch::jplbtag:IPAW_INPUT", "$pch::jplbtag:IPAW_OUTPUT", "$pch::lbattr:CE", "$pch::lbattr:lastStatusHistory") { + my @attrs; + my $attr_name = $attr; $attr_name =~ s/.*://; + + @attrs = pch::psquery($ps, $jobid, $attr); + print " attr $attr_name: "; print join(", ", @attrs); print "\n"; + } + + print "\n"; +} -- 1.8.2.3