-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathCounters.pm
164 lines (157 loc) · 4.59 KB
/
Counters.pm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/usr/bin/perl -w
##
# Counters.pl
#
# Authors: Ben Langmead
# Date: February 14, 2010
#
# When it comes to counters, there are several complicating factors.
# First, single-computer mode accesses counters in a very different way
# from Hadoop or Cloud modes.
#
# Get all the counters and put them in the output directory.
#
package Counters;
use strict;
use warnings;
use Fcntl qw(:DEFAULT :flock); # for locking
use FindBin qw($Bin);
use lib $Bin;
use File::Path qw(mkpath);
use Tools;
use Util;
use AWS;
use Util;
use Carp;
##
# Given a directory with stderr output from a single-computer-mode
# stage ($dir), an output filename ($outfn), and a function to send
# warning and error messages to ($msg), parse all the counter updates
# into a counter hash and then write the hash to the file at $outfn.
#
sub dumpLocalCounters($$$) {
my ($dir, $outfn, $msg) = @_;
-d $dir || die "No such input file or directory as \"$dir\"\n";
my @fs = ();
@fs = <$dir/*>;
my %counters = ();
for my $f (@fs) {
if($f =~ /\.gz$/) {
open INP, "gzip -dc $f |" || die "Could not open pipe 'gzip -dc $f |'";
} elsif($f =~ /\.bz2$/) {
open INP, "bzip2 -dc $f |" || die "Could not open pipe 'bzip2 -dc $f |'";
} else {
open INP, "$f" || die "Could not open $f for reading\n";
}
while(<INP>) {
if(/^reporter:counter:/) {
chomp;
$_ = substr($_, length("reporter:counter:"));
my @us = split(/,/);
if(scalar(@us) != 3) {
$msg->("Warning: Ill-formed counter updated line:\n$_");
}
$counters{$us[0]}{$us[1]} += $us[2];
}
}
close(INP);
$? == 0 || die "Bad exitlevel from input slurp: $?\n";
}
open(CNT, ">>$outfn") || die "Could not open file '$outfn' for appending\n";
for my $k1 (sort keys %counters) {
for my $k2 (sort keys %{$counters{$k1}}) {
print CNT "pid=$$\t$k1\t$k2\t$counters{$k1}{$k2}\n";
}
}
close(CNT);
}
##
# Use the 'hadoop' script to (a) determine what jobs have completed,
# and (b) populate a hash with all the counter values.
#
# If we had information about job ids of previous jobs in this same job
# flow, we wouldn't have to scan this whole list.
#
# Note: the caller has to know the job id of the .
#
sub getHadoopCounters($$$$) {
my ($cnth, $selectjob, $msg, $verbose) = @_;
$msg->("In getHadoopCounters:");
my $counters = 0; # overall
my $hadoop = Tools::hadoop();
# Get all finished jobs
my $jstr = `$hadoop job -list all | awk '\$1 ~ /^job/ && \$2 == 2 {print \$1}'`;
my @jobs = split(/[\n\r]+/, $jstr);
my $jobfound = 0;
$selectjob = sub {return 1} unless defined($selectjob);
for my $job (@jobs) {
if(!$selectjob->($job)) {
$msg->(" Skipping job $job") if $verbose;
} else {
$msg->(" Examining job $job") if $verbose;
}
$jobfound++;
my $sstr = `$hadoop job -status $job`;
my @status = split(/[\n\r]+/, $sstr);
my $seccounters = 0; # per section
my $section = "";
for (@status) {
next if /^\s*$/; # skip blank lines
next if /^Job:/; # skip Job: lines
next if /^file:/; # skip file: lines
next if /^tracking URL:/;
if(/^map[(][)] completion: (.*)$/) {
$1 eq "1.0" || $msg->("Warning: Incomplete mappers:\n\"$_\"\n");
}
if(/^reduce[(][)] completion: (.*)$/) {
$1 eq "1.0" || $msg->("Warning: Incomplete reducers:\n\"$_\"\n");
}
next if /^Counters:/;
chomp;
my $l = Util::trim($_);
if(/[=]/) {
# Key=Value line
$section ne "" || $msg->("No label before line:\n\"$_\"\n");
my @s = split(/[=]/, $l);
$#s == 1 || die;
$cnth->{$section}{$s[0]} = $s[1];
$counters++;
$seccounters++;
} else {
$msg->(" section had $seccounters counters") if $verbose && $section ne "";
$section = $l;
$seccounters = 0;
$msg->(" Found section: $section") if $verbose;
}
}
$msg->(" section had $seccounters counters") if $verbose && $section ne "";
}
}
##
# Sift through a local directory of stderr output files, extract and
# compile all the counter updates into the '$counters' hashref.
#
sub getLocalCounters($$$$) {
my ($fn, $counters, $msg, $verbose) = @_;
open(CNTS, $fn) || die "Could not open counter file '$fn'";
while(<CNTS>) {
my @s = split(/\t/);
scalar(@s) == 3 || die "Ill-formatted counter line; must have 3 fields:\n$_\n";
$counters->{$s[0]}{$s[1]} = $s[2];
}
close(CNTS);
}
##
# Get counters from previous stages.
#
sub getCounters($$$$) {
my ($cntfn, $counters, $msg, $verbose) = @_;
if(!defined($cntfn) || $cntfn eq "") {
# Try to get counters from Hadoop
Counters::getHadoopCounters($counters, undef, $msg, $verbose);
} else {
# Try to get counters from specified file
Counters::getLocalCounters($cntfn, $counters, $msg, $verbose);
}
}
1;