#!/usr/bin/perl use strict; =head1 NAME runpipeline =head1 SYNOPSIS runpipeline [options] [param=value [...]] pipeline-spec.json PIPELINE_param=value [...] runpipeline [options] pipeline-spec.json Execute the specified pipeline and report results. Read pipeline specification from pipeline-spec.json. Apply parameters given on command line or PIPELINE_* environment variables. Find existing jobs that match the pipeline step specifications, or start new ones if none are found. Wait for the jobs to finish. Print periodic status messages in JSON format (same as input file, augmented with status/output for each step where applicable). =head1 OPTIONS --callback-url http://server:port/path URL where pipeline status should be POSTed periodically --callback-url - Print pipeline status on stdout periodically --callback-url /path/file File to write pipeline status to periodically (/path/file.tmp.* will be used to achieve atomic writes) --callback-param foo=bar Additional parameters to send to callback URL --callback-timeout 10 Timeout callback (seconds) --callback-type application/x-www-form-urlencoded Content type for callback (default multipart/form-data) --debug Print diagnostic messages on stderr --help Print usage message --max-nodes 10 Maximum number of nodes to request for each new job (default 2) --no-keep-going Abandon the pipeline if any step fails --no-new-jobs Do not start any new jobs --no-reuse Always run new jobs, even if jobs already exist that match the pipeline requirements --no-wait Do not wait for jobs to finish, exit asap --secret-key keyid Encrypt/decrypt alldata using the specified GPG key --tty-status Print status summary to STDERR even if not a tty --label string Display the specified string in each row of the status display, and include it as the value for the "label" key of each pipeline in the JSON data passed to callback handlers =cut use Getopt::Long; use JSON; # apt-get install libjson-perl use Pod::Usage; use Warehouse; # apt-get install libwarehouse-perl # or see http://factories.freelogy.org use Fcntl; $SIG{"CHILD"} = "IGNORE"; # otherwise GnuPG::Interface wrecks everything my $callbackurl; my %callbackparam = ( "json" => "{}" ); my $callbacktimeout; my $callbacktype = "form-data"; my $debugflag; my $helpflag; my $maxnodes_default = 2; my $nokeepgoingflag; my $nonewjobsflag; my $noreuseflag; my $nowaitflag; my $secretkey; my $ttyflag = -t STDERR; my $label; $main::Options = { "maxnodes" => undef }; GetOptions ( "callback-url=s" => \$callbackurl, "callback-param=s" => \%callbackparam, "callback-timeout=i" => \$callbacktimeout, "callback-type=s" => \$callbacktype, "debug" => \$debugflag, "help" => \$helpflag, "max-nodes=i" => \$main::Options->{"maxnodes"}, "no-keep-going" => \$nokeepgoingflag, "no-new-jobs" => \$nonewjobsflag, "no-reuse" => \$noreuseflag, "no-wait" => \$nowaitflag, "secret-key=s" => \$secretkey, "tty-status" => \$ttyflag, "label=s" => \$label, ) or die "Failed to parse command line options. Try: $0 --help\n"; if ($helpflag) { pod2usage(1); } my $supplied_param = {}; while (@ARGV && $ARGV[0] =~ /^(.*?)=(.*)/s) { $supplied_param->{$1} = $2; shift @ARGV; } while (my ($key, $value) = each %ENV) { if ($key =~ s/^PIPELINE_// && !exists $supplied_param->{$key}) { $supplied_param->{$key} = $value; } } my $whc = new Warehouse; my $gpg_keys; if ($secretkey) { my $configurl = $whc->get_config ("configurl"); $configurl =~ /^(.*?\/\/([^:\/]+).*\/)/ or die "disgrok configurl"; my $controller_key_url = $1."server_key.txt"; my $controller_hostname = $2; my $import = `wget -O- -q '$controller_key_url' | gpg --import 2>&1`; $import =~ /^gpg: key \S+: [^<]*<(root\@\Q$controller_hostname\E)>/m or die "disgrok gpg import"; my $controller_key_id = $1; print STDERR "Imported key for $controller_key_id\n" if $debugflag; local $ENV{ENCRYPT_TO} = "$controller_key_id,$secretkey"; my $cryptwhc = new Warehouse; my $secretexport = `gpg --export-secret-key '$secretkey'` or die "could not export secret key $secretkey"; $gpg_keys = $cryptwhc->store_block ($secretexport); print STDERR "Stored encrypted secret key $secretkey in $gpg_keys\n" if $debugflag; $ENV{SIGN_AS} = $secretkey; delete $ENV{NOSIGN}; } $JSON::UnMapping = 1; local $/ = undef; my $json = <>; my $pipe = jsonToObj ($json); $pipe->{"label"} = $label if defined $label; my $failed = 0; my $more_todo = 1; for (my $iteration = 0; $iteration == 0 || ($iteration == 1 && !$nonewjobsflag) || !$nowaitflag; $iteration++) { last if !$more_todo; if ($iteration > 1) { printf STDERR "Sleeping 10 seconds starting at %s\n", scalar localtime if $debugflag; sleep 10; print STDERR "Woke up\n" if $debugflag; } $more_todo = 0; for my $stepno (0..$#{$pipe->{"steps"}}) { if (do_step ($pipe, $stepno, $iteration == 0)) { $more_todo = 1; } elsif ($pipe->{"steps"}->[$stepno]->{"failed"} && $nokeepgoingflag) { $nonewjobsflag = 1; } } if ($callbackurl =~ m{^https?://}) { use LWP::UserAgent; use HTTP::Request::Common; my $ua = LWP::UserAgent->new; $ua->timeout ($callbacktimeout) if defined $callbacktimeout; my @postdata; for (keys %callbackparam) { push @postdata, ($_, $callbackparam{$_} eq "{}" ? objToJson ($pipe) : $callbackparam{$_}); } my $req = $ua->request (POST $callbackurl, Content_Type => $callbacktype, Content => \@postdata); if (!$req->is_success) { printf STDERR "Callback request failed: %s\n", $req->status_line if $debugflag; } } elsif ($callbackurl eq "-") { print objToJson ($pipe, { "pretty" => 1 }); print "\n"; } elsif ($callbackurl =~ /^\//) { my $tmpfile = "$callbackurl.tmp.$$"; if (!sysopen STATUS, $tmpfile, O_CREAT|O_EXCL|O_WRONLY) { print STDERR "Error creating $tmpfile: $!\n"; } elsif (!print STATUS objToJson ($pipe, { "pretty" => 1 })) { print STDERR "Error writing $tmpfile: $!\n"; close STATUS; unlink $tmpfile; } elsif (!close STATUS) { print STDERR "Error writing $tmpfile: $!\n"; unlink $tmpfile; } elsif (!rename $tmpfile, $callbackurl) { print STDERR "Error renaming $tmpfile to $callbackurl: $!\n"; unlink $tmpfile; } } elsif (defined $callbackurl) { printf STDERR "Callback URL \"%s\" not understood;\ntry \"http://host/uri\" or \"-\" or \"/path/to/local/file\"\n", $callbackurl if $debugflag; } if ($ttyflag) { print_status_summary ($pipe); } } sub do_step { # Try to make progress on a step. Return true if there is any # point in checking up on this step again. my $pipe = shift; my $stepno = shift; my $dryrunflag = shift; my $step = $pipe->{"steps"}->[$stepno]; # If this step is already deemed to be complete or failed, there # is no point in reinspecting return undef if $step->{"failed"} || $step->{"complete"}; delete $step->{"errors"}; delete $step->{"warnings"}; delete $step->{"waitingfor"}; if (!defined $step->{"warehousejob"} && exists $supplied_param->{$stepno."/warehousejob"}) { $step->{"warehousejob"} = { "id" => $supplied_param->{$stepno."/warehousejob"}}; } goto KNOW_JOB_ID if $step->{"warehousejob"}; my $mrparam = {}; PARAM: for my $param (@{$step->{"params"}}) { my $value; if (exists $param->{"from_step"} || exists $param->{"from_step_name"}) { my $from_pipe = $pipe; if (exists $param->{"from_pipeline_id"}) { undef $from_pipe; foreach (@{$pipe->{"pipelines_referred_to"}}) { if ($_->{"id"} eq $param->{"from_pipeline_id"}) { $from_pipe = $_; last; } } if (!defined $from_pipe) { push @{$step->{"errors"}}, "Cannot find pipeline with id \"".$param->{"from_pipeline_id"}."\" in input"; next PARAM; } } if (!defined $param->{"from_step"}) { my $n = -1; foreach (@{$from_pipe->{"steps"}}) { ++$n; if ($_->{"name"} eq $param->{"from_step_name"}) { $param->{"from_step"} = $n; last; } } if (!defined $param->{"from_step"}) { push @{$step->{"errors"}}, "Cannot find step with name \"".$param->{"from_step_name"}."\" in pipeline with id \"".$from_pipe->{"id"}."\""; next PARAM; } } my $from_step = $param->{"from_step"} + 0; if (exists $param->{"from_param_name"}) { my $found = 0; for (@{$from_pipe->{"steps"}->[$from_step]->{"params"}}) { if ($_->{"name"} eq $param->{"from_param_name"}) { for my $k (qw(value data_locator)) { $param->{$k} = $_->{$k} if exists $_->{$k}; } $value = $param->{"value"} || $param->{"data_locator"}; $found = 1; last; } } if (!$found) { push @{$step->{"errors"}}, "Cannot find param with name \"".$param->{"from_param_name"}."\" in step $from_step of pipeline with id \"".$from_pipe->{"id"}."\""; } } elsif ($from_pipe->{"steps"}->[$from_step]->{"complete"}) { $value = $from_pipe->{"steps"}->[$from_step]->{"output_data_locator"}; $param->{"value"} = $value; } else { if ($from_pipe->{"steps"}->[$from_step]->{"failed"} || $from_pipe->{"steps"}->[$from_step]->{"errors"}) { push @{$step->{"errors"}}, sprintf ("Cannot run because step %d of pipeline id %d failed", $from_step, $from_pipe->{"id"}); } else { push @{$step->{"waitingfor"}}, $from_step; } } } else { if (exists $param->{"value"}) { $value = $param->{"value"}; $value = $supplied_param->{$stepno."/".$param->{"name"}} if !defined $value; $value = $supplied_param->{$param->{"name"}} if !defined $value; $param->{"value"} = $value; } elsif (exists $param->{"hash"}) { warn "\"hash\" param type is deprecated.\n" if $debugflag; $value = $param->{"hash"}; $value = $supplied_param->{$stepno."/".$param->{"name"}} if !defined $value; $value = $supplied_param->{$param->{"name"}} if !defined $value; $param->{"hash"} = $value; } elsif (exists $param->{"data_locator"}) { $value = $param->{"data_locator"}; $value = $supplied_param->{$stepno."/".$param->{"name"}} if !defined $value; $value = $supplied_param->{$param->{"name"}} if !defined $value; $param->{"data_locator"} = $value; } else { push @{$step->{"errors"}}, "Unsupported parameter type for \"" . $param->{"name"} . "\""; next; } if (!defined $value) { if ($param->{"optional"}) { next; } push @{$step->{"errors"}}, "Missing value for \"".$param->{"name"}."\" param"; $mrparam = undef; } } if (defined $value && $value ne "" && !length $value) { die "JSON->perl mapping problem -- setting JSON::UnMapping failed?" } if (defined $value) { if (defined $param->{"subpath"}) { $value .= $param->{"subpath"}; } $mrparam->{$param->{"name"}} = $value; } } if ($step->{"errors"}) { $step->{"failed"} = 1; return undef; } return undef if $step->{"waitingfor"}; # If we get this far, all of our inputs are available and our # dependencies are satisfied. my $step_job; if ($step->{"warehousejob"}) { KNOW_JOB_ID: my $joblist = $whc->job_list ("id_min" => $step->{"warehousejob"}->{"id"}, "id_max" => $step->{"warehousejob"}->{"id"}, ); if (!$joblist || !($step_job = $joblist->[0])) { push @{$step->{"warnings"}}, "Error looking up status of job " . $step->{"warehousejob"}->{"id"} . " at " . scalar localtime; return 1; } } else { my $sysparam = { "max_steps_per_node" => $pipe->{"max_steps_per_node"}, "max_nodes" => $main::Options->{"maxnodes"} || $pipe->{"max_nodes"} || $maxnodes_default, "min_revision" => $pipe->{"min_revision"}, "max_revision" => $pipe->{"max_revision"}, }; my $mrjobspec = { "mrfunction" => $step->{"function"}, }; my @knobs; while (my ($param, $value) = each %$mrparam) { if ($param eq "INPUT") { $mrjobspec->{"inputkey"} = $value; } elsif (exists $sysparam->{$param}) { warn "param->{\"$param\"} is deprecated, use step->{\"$param\"} instead\n" if $debugflag; $sysparam->{$param} = $value; } else { push @knobs, "$param=$value"; } } while (my ($param, $value) = each %$step) { if (exists $sysparam->{$param}) { $sysparam->{$param} = $value; } } push @knobs, "GPG_KEYS=$gpg_keys" if $gpg_keys; map { s/\\/\\\\/g; s/\n/\\n/g; } @knobs; $mrjobspec->{"knobs"} = join ("\n", sort @knobs); $step_job = find_job ($mrjobspec, $sysparam, $stepno, $step); if (!defined $step_job) { # Error looking up existing jobs return 1; } elsif ($step_job eq "None") { return 1 if $dryrunflag && !$nonewjobsflag; $mrjobspec->{"photons"} = 1; # todo: command line parameter? $mrjobspec->{"nodes"} = 47; #print STDERR "maxnodes -> ".$main::Options->{"maxnodes"}."\n"; #print STDERR "sysparam maxnodes -> ".$sysparam->{"maxnodes"}."\n"; while (my ($p, $v) = each %$sysparam) { next if !defined $v; $mrjobspec->{"revision"} = $v if $p eq "max_revision"; $mrjobspec->{"nodes"} = $v if ($p eq "max_nodes" && $mrjobspec->{"nodes"} > $v); $mrjobspec->{"stepspernode"} = $v if $p eq "max_steps_per_node"; } $step_job = start_job ($mrjobspec); return !$nonewjobsflag if $step_job eq "None"; if (!$step_job) { push @{$step->{"warnings"}}, "Error starting job"; return 1; } } } $step->{"warehousejob"} = {}; $step->{"warehousejob"}->{"id"} = $step_job->{"id"}; $step->{"warehousejob"}->{"nodes"} = $step_job->{"nodes"}; $step->{"warehousejob"}->{"metakey"} = $step_job->{"metakey"}; $step->{"warehousejob"}->{"revision"} = $step_job->{"revision"}; $step->{"warehousejob"}->{"starttime"} = $step_job->{"starttime"}; printf STDERR ("Using job %d for step %d\n", $step->{"warehousejob"}->{"id"}, $stepno) if $debugflag; if ($step_job->{"success"}) { $step->{"warehousejob"}->{"elapsed"} = $step_job->{"finishtime_s"} - $step_job->{"starttime_s"}; $step->{"warehousejob"}->{"finishtime"} = $step_job->{"finishtime"}; $step->{"output_data_locator"} = $step_job->{"outputkey"}; $step->{"output_data_locator"} =~ s/\+[^,]*//g; $step->{"complete"} = 1; delete $step->{"progress"}; return undef; } elsif ($step_job->{"finishtime"}) { $step->{"warehousejob"}->{"finishtime"} = $step_job->{"finishtime"}; $step->{"failed"} = 1; delete $step->{"progress"}; return undef; } else { $step->{"progress"} = sprintf ("%d+%d/%d", $step_job->{"steps_done"}, $step_job->{"steps_running"}, $step_job->{"steps_done"} + $step_job->{"steps_running"} + $step_job->{"steps_todo"}); $step->{"warehousejob"}->{"elapsed"} = time - $step_job->{"starttime_s"} if $step_job->{"starttime_s"}; return !$nowaitflag; } } sub find_job { return "None" if $noreuseflag; my $mrjobspec = shift; my $sysparam = shift; my $stepno = shift; my $step = shift; my $joblist = $whc->job_list (%$mrjobspec); if (!$joblist) { push @{$step->{"warnings"}}, "Error retrieving job list"; return undef; } printf STDERR ("Controller returned %d candidate%s for step %d (%s)\n", $#$joblist + 1, $#$joblist == 0 ? "" : "s", $stepno, $step->{"function"}) if $debugflag; my $quoted_jobspec = { %$mrjobspec }; my $step_job = "None"; J: for my $j (@$joblist) { for my $k (qw(mrfunction inputkey knobs)) { my $want = $quoted_jobspec->{$k}; my $got = $j->{$k}; if ($k eq "inputkey") { $want =~ s/\+[^,\/]+//g; $got =~ s/\+[^,\/]+//g; } if ($want ne $got) { printf STDERR ("%d has wrong %s (%s)\n", $j->{"id"}, $k, $j->{$k}) if $debugflag; next J; } } next J if $sysparam->{"min_revision"} && $j->{"revision"} < $sysparam->{"min_revision"}; next J if $sysparam->{"max_revision"} && $j->{"revision"} > $sysparam->{"max_revision"}; next J if !$j->{"success"} && (length $j->{"success"} || length $j->{"finishtime_s"}); next J if ref $step_job && $step_job->{"success"} && !$j->{"success"}; printf STDERR "Job %d is the best yet\n", $j->{"id"} if $debugflag; $step_job = $j; } if ($step_job eq "None") { my $shorter_knobs = $mrjobspec->{knobs}; if ($shorter_knobs =~ s/^(GPG_KEYS=[0-9a-f]{32}\S*)\+\S+$/$1/m) { return find_job ({ %$mrjobspec, "knobs" => $shorter_knobs }, $sysparam, $stepno, $step); } } return $step_job; } sub start_job { my $mrjobspec = shift; print STDERR ("starting new job:\n", map { " $_:".$mrjobspec->{$_}."\n" } keys %$mrjobspec) if $debugflag; if ($nonewjobsflag) { return "None"; } my $id = $whc->job_new (%$mrjobspec); printf STDERR ("started job %d\n", $id) if $debugflag; return undef if !$id; my $newjob = $whc->job_list ("id_min" => $id, "id_max" => $id)->[0]; if (!$newjob) { $newjob = { "id" => $id }; } return $newjob; } sub print_status_summary { my $pipe = shift; printf STDERR "\n%s\n", scalar localtime; my $stepno = -1; for my $step (@{$pipe->{"steps"}}) { ++$stepno; my $j = $step->{"warehousejob"}; my $status; $status = "queued" if $j && $j->{"id"}; $status = sprintf ("%s after %d s", $step->{"progress"}, $j->{"elapsed"}) if $j && $j->{"elapsed"}; $status = $step->{"output_data_locator"} if $step->{"complete"}; $status = "failed: " . join ("; ", $step->{"errors"} ? @{$step->{"errors"}} : $step->{"warnings"} ? @{$step->{"warnings"}} : ()) if $step->{"failed"}; my $jobno = "-"; $jobno = $j->{"id"} if $j; printf STDERR "%-8s ", $pipe->{"label"} if exists $pipe->{"label"}; printf STDERR "%2d %-12.12s %6s %s\n", $stepno, $step->{"name"}, $jobno, $status; } }