-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathGet.pm
499 lines (478 loc) · 15.2 KB
/
Get.pm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
#!/usr/bin/perl -w
##
# Author: Ben Langmead
# Date: 2/14/2010
#
# Routines for getting and expanding jars from
#
package Get;
use strict;
use warnings;
use Fcntl qw(:DEFAULT :flock); # for locking
use FindBin qw($Bin);
use lib $Bin;
use File::Path qw(mkpath);
use File::Basename;
use Tools;
use AWS;
use Util;
use Carp;
##
# Parse a URL, extracting the protocol and type of program that will
# be needed to download it.
#
sub parse_url($) {
my ($ref) = @_;
my $type;
my @s = split(/[:]/, $ref);
my $proto = $s[0];
$proto = lc $proto;
if($proto =~ /s3n?/) {
$type = "s3";
$ref =~ s/^s3n/s3/; # because s3cmd doesn't like s3n://
} elsif($proto eq "ftp" || $proto eq "http") {
$type = "wget";
} elsif($proto eq "hdfs") {
$type = "hdfs";
} else {
$type = "local";
(-f $ref || -d $ref) || croak("URL referring to local file $ref doesn't exist or cannot be read\n");
return ("", $type);
}
return ($proto, $type);
}
##
# Prepare an s3 URL for use with s3cmd.
#
sub s3cmdify($$) {
my ($path, $env) = @_;
$path =~ s/^S3N:/s3n:/;
$path =~ s/^S3:/s3:/;
$path =~ s/^s3n:/s3:/;
# Note: this is a good way to strip out the access ID and secret
# key ID. It's better than using a regular expression because it's
# hard to think of an expression that correctly handles slashes in
# the secret key ID (which is possible).
AWS::ensureKeys($Tools::hadoop, $Tools::hadoop_arg, $env);
my $ec2key = $AWS::accessKey.":".$AWS::secretKey;
my $idx = index($path, $ec2key);
if($idx != -1) {
# Remove ID:secret and the @ on the end
substr($path, $idx, length($ec2key)+1) = "";
}
return $path;
}
sub do_s3cmd($$) {
my ($args, $env) = @_;
my $s3cmd = Tools::s3cmd($env);
my $cmd = "$s3cmd $args";
print STDERR "Get.pm:do_s3cmd: $cmd\n";
my $out = Util::backtickAndWait($cmd, "s3cmd");
$? && croak("Exitlevel from \"$cmd\" was $?\n");
return ($?, $out);
}
sub do_s3_get($$$$$$) {
my ($file, $base, $dest_dir, $counters, $retries, $env) = @_;
$file = s3cmdify($file, $env);
my $file_arg = $file;
mkpath($dest_dir);
my $cmd = "rm -f $dest_dir/$base >&2";
print STDERR "Get.pm:do_s3_get: $cmd\n";
system($cmd);
my $ret;
while($retries >= 0) {
my $out;
($ret, $out) = do_s3cmd("get --force $file_arg $dest_dir/$base >&2", $env);
(-f "$dest_dir/$base") || croak("Did not create $dest_dir/$base - wrong URL?\n");
push @{$counters}, "Fetcher,s3cmd return $ret,1";
push @{$counters}, "Fetcher,Bytes obtained with s3cmd get,".(-s "$dest_dir/$base");
push @{$counters}, "Fetcher,Files obtained with s3cmd get,1";
return $ret if $ret == 0;
system("rm -f $dest_dir/$base* $dest_dir/.$base*");
$retries--;
}
return $ret;
}
sub do_s3_put($$$$) {
my ($file, $dest, $counters, $env) = @_;
$dest = s3cmdify($dest, $env);
$dest .= "/" unless $dest =~ /\/$/;
my $base = fileparse($file);
my ($ret, $out) = do_s3cmd("put $file $dest$base >&2", $env);
push @{$counters}, "Fetcher,Bytes uploaded with s3cmd put,".(-s "$file");
push @{$counters}, "Fetcher,Files uploaded with s3cmd put,1";
}
sub do_hdfs_get($$$$) {
my ($file, $base, $dest_dir, $counters) = @_;
defined($base) || croak("Must define base\n");
defined($dest_dir) || croak("Must define dest_dir\n");
$file =~ s/^HDFS:/hdfs:/;
my $hadoop = Tools::hadoop();
mkpath($dest_dir);
my $cmd = "$hadoop dfs -get $file $dest_dir/$base >&2";
print STDERR "Get.pm:do_hdfs_get: $cmd\n";
my $ret = Util::runAndWait($cmd, "hadoop dfs -get");
print STDERR "Get.pm:returned $ret\n";
push @{$counters}, "Fetcher,hadoop dfs -get return $ret,1";
push @{$counters}, "Fetcher,Bytes obtained with hadoop dfs -get,".(-s "$dest_dir/$base");
push @{$counters}, "Fetcher,Files obtained with hadoop dfs -get,1";
return $ret;
}
##
# Put a local file into HDFS.
#
sub do_hdfs_put($$$) {
my ($file, $dest, $counters) = @_;
$dest =~ s/^HDFS:/hdfs:/;
$dest .= "/" unless $dest =~ /\/$/;
my $base = fileparse($file);
my $hadoop = Tools::hadoop();
# Ensure HDFS directory exists
my $cmd = "$hadoop dfs -mkdir $dest >&2";
Util::runAndWait($cmd, "$hadoop dfs -mkdir");
# Put the file
$cmd = "$hadoop dfs -put $file $dest$base >&2";
print STDERR "Get.pm:do_hdfs_put: $cmd\n";
my $ret = Util::runAndWait($cmd, "$hadoop dfs -put");
# Update counters
push @{$counters}, "Fetcher,hadoop dfs -put return $ret,1";
push @{$counters}, "Fetcher,Bytes uploaded with hadoop dfs -put,".(-s $file);
push @{$counters}, "Fetcher,Files uploaded with hadoop dfs -put,1";
return $ret;
}
sub do_local($$$$) {
my ($file, $base, $dest_dir, $counters) = @_;
mkpath($dest_dir);
my $cmd = "cp $file $dest_dir/$base >&2";
print STDERR "Get.pm:do_local: $cmd\n";
my $ret = Util::run($cmd);
push @{$counters}, "Fetcher,cp return $ret,1";
push @{$counters}, "Fetcher,Bytes obtained with cp,".(-s "$dest_dir/$base");
push @{$counters}, "Fetcher,Files obtained with cp,1";
return $ret;
}
##
# Workaround for the situation where the change of FTP dir is
# forbidden, but fetching the file itself is permitted (this seems to
# happen e.g. on the NCBI 1000genomes server sometimes).
#
sub fix_wget_url($) {
my $url = shift;
my @us = split(/\//, $url);
my $ret = "";
return $url if $#us <= 3;
$ret .= join("/", ($us[0], $us[1], $us[2]))."/";
shift @us; shift @us; shift @us;
$ret .= join("%2f", @us);
return $ret;
}
##
# Get a file over http or ftp using wget.
#
sub do_wget($$$$) {
my ($file, $base, $dest_dir, $counters) = @_;
my $url = fix_wget_url($file);
my $wget = Tools::wget();
mkpath($dest_dir);
my $cmd = "$wget $url -O $dest_dir/$base >&2";
print STDERR "Get.pm:do_wget: $cmd\n";
my $ret = Util::run($cmd);
push @{$counters}, "Fetcher,wget return $ret,1";
push @{$counters}, "Fetcher,Bytes obtained with wget,".(-s "$dest_dir/$base");
push @{$counters}, "Fetcher,Files obtained with wget,1";
return $ret;
}
sub lsDir($$) {
my ($dir, $env) = @_;
print STDERR "Get.pm:lsDir: About to parse URL $dir\n";
my ($proto, $type) = parse_url($dir);
my @fs = ();
if($type eq "s3") {
print STDERR "Get.pm:lsDir: About to handle S3\n";
$dir = s3cmdify($dir, $env);
$dir .= "/" if $dir !~ /\/$/;
my ($ret, $out) = do_s3cmd("ls $dir", $env);
my @fls = split(/[\r\n]+/, $out);
for (@fls) {
next if /^Bucket/;
my @fs2 = split(/[\s]+/, $_);
push @fs, $fs2[-1];
}
} elsif($type eq "local") {
print STDERR "Get.pm:lsDir: About to handle local\n";
my $out = Util::backtickRun("ls -1 $dir");
my @fls = split(/[\r\n]+/, $out);
$dir =~ s/\/$//;
for my $f (@fls) { push @fs, "$dir/$f"; }
} else {
my $fsstr = "dfs";
print STDERR "Get.pm:lsDir: About to handle HDFS\n";
my $hadoop = Tools::hadoop();
my $out = `$hadoop $fsstr -ls $dir`;
my @fls = split(/[\r\n]+/, $out);
for (@fls) {
next if /^Found/;
my @fs2 = split(/[\s]+/, $_);
my $f = $fs2[-1];
$f = "hdfs://".$f if ($f =~ /^\// && $type eq "hdfs");
push @fs, $f;
}
}
return @fs;
}
##
# Ensure all of the files in the source directory have been copied into
# dest_dir.
#
sub ensureDirFetched($$$$) {
my ($dir, $dest_dir, $counters, $env) = @_;
$dir =~ s/^S3N/s3n/;
$dir =~ s/^S3/s3/;
$dir =~ s/^HDFS/hdfs/;
my $dirDoneFile = $dir;
$dirDoneFile =~ s/[\/:]/_/g;
mkpath($dest_dir);
$dirDoneFile = "$dest_dir/.dir.$dirDoneFile";
unless(-f $dirDoneFile) {
$dir .= "/" unless $dir =~ /\/$/;
my @files = lsDir($dir, $env);
for(@files) {
print STDERR "Get.pm:ensureDirFetched: About to be fetched: $_\n";
}
for(@files) {
print STDERR "ensureDirFetched: Fetching directory file $_\n";
ensureFetched($_, $dest_dir, $counters, undef, undef, $env);
}
Util::run("touch $dirDoneFile");
}
}
##
# Do not return until the given file has been obtained and the "done"
# flag file has been installed.
#
# If the thing being decompressed is an R installation, we do a little
# ad-hoc fixup to ensure it likes the new directory it's in.
#
sub ensureFetched($$$$$$) {
my (
$file, # Path/URL of file to get
$dest_dir, # Directory to copy it to and/or extract it in
$counters, # Ref to array to store counter updates in
$doRfixup, # If it's R that's being extracted and this is
# true, we set RHOME and modify Rscript
# accordingly
$lockSub, # A parameterless subroutine to call if and
# when we get the lock
$env) = @_; # environment
print STDERR "Get.pm:ensureFetched: called on \"$file\"\n";
$file =~ s/^S3N/s3n/;
$file =~ s/^S3/s3/;
$file =~ s/^HDFS/hdfs/;
my $base = fileparse($file);
print STDERR "Get.pm:ensureFetched: base name \"$base\"\n";
mkpath($dest_dir);
my $done_file = "$dest_dir/.$base.done";
my $lock_file = "$dest_dir/.$base.lock";
print STDERR "ls -al $dest_dir/*$base* $dest_dir/.*$base*\n";
print STDERR `ls -al $dest_dir/*$base* $dest_dir/.*$base*\n`;
my ($proto, $type) = parse_url($file);
print STDERR "Pid $$: Checking for done file $done_file\n";
if(! -f $done_file) {
print STDERR "Pid $$: Done file $done_file was NOT present\n";
#
# Use perl portable file locking to prevent race conditions when
# there are multiple mappers per machine.
#
system("touch $lock_file");
print STDERR "Pid $$: Attempting to obtain lock...\n";
open(FH, "<$lock_file") or croak("Can't open lock file \"$lock_file\": $!");
if(flock(FH, LOCK_EX | LOCK_NB)) {
# Got the lock; it's up to me to download and explode the jar file
print STDERR "Pid $$: got the lock; downloading file...\n";
print STDERR "Pid $$: file name: $base\n";
my $cmd = "rm -f $dest_dir/$base >&2";
print STDERR "$cmd\n";
system($cmd);
my $ret;
print STDERR "Pid $$: downloading file...\n";
if($type eq "s3") {
$ret = do_s3_get($file, $base, $dest_dir, $counters, 3, $env);
} elsif($type eq "hdfs") {
$ret = do_hdfs_get($file, $base, $dest_dir, $counters);
} elsif($type =~ /https?/ || $proto eq "ftp") {
$ret = do_wget($file, $base, $dest_dir, $counters);
} else {
$type eq "local" || croak("Bad type: $type\n");
$ret = do_local($file, $base, $dest_dir, $counters);
}
print STDERR "ls -al $dest_dir/$base\n";
print STDERR `ls -al $dest_dir/$base`;
if($ret != 0) {
system("rm -f $dest_dir/$base* $dest_dir/.$base*");
flock(FH, LOCK_UN);
close(FH);
print STDERR "Return value from download task was $ret\n";
croak("Return value from download task was $ret\n");
}
if(! -f "$dest_dir/$base") {
flock(FH, LOCK_UN);
close(FH);
print STDERR "Return value from download task was $ret but the file $dest_dir/$base doesn't exist\n";
croak("Return value from download task was $ret but the file $dest_dir/$base doesn't exist\n");
}
if($base =~ /\.jar$/) {
print STDERR "Pid $$: extract jar\n";
#prefer unzip to jar
my $jar_exe = Tools::unzip();
my $jar_arguments = "";
if($jar_exe eq ""){
print $STDERR "Could not find unzip, falling back to jar";
$jar_exe = Tools::jar();
$jar_arguments = "xf";
}
$cmd = "cd $dest_dir && $jar_exe $jar_arguments $base >&2";
print STDERR "$cmd\n";
$ret = Util::runAndWait($cmd, "$jar_exe $jar_arguments");
} elsif($base =~ /\.tar\.gz$/ || $base =~ /\.tgz$/) {
$cmd = "cd $dest_dir && tar zxf $base >&2";
print STDERR "$cmd\n";
$ret = Util::runAndWait($cmd, "tar zxf");
} elsif($base =~ /\.tar.bz2$/) {
$cmd = "cd $dest_dir && tar jxf $base >&2";
print STDERR "$cmd\n";
$ret = Util::runAndWait($cmd, "tar jxf");
}
print STDERR "ls -al $dest_dir/$base\n";
print STDERR `ls -al $dest_dir/$base`;
if($ret != 0) {
system("rm -rf $dest_dir/$base* $dest_dir/.$base*");
flock(FH, LOCK_UN);
close(FH);
croak("Return value from extract task was $ret\n");
}
my $size = -s "$dest_dir/$base";
push @{$counters}, "Fetcher,File and size $base and $size,1";
push @{$counters}, "Fetcher,Bytes obtained,$size";
push @{$counters}, "Fetcher,Files obtained,1";
if(defined($doRfixup)) {
# This is a silly fixup we have to do if we want R and Rscript
# to run in their new home.
print STDERR "Setting RHOME = \"$dest_dir/$doRfixup\"\n";
print STDERR "Writing new \"$dest_dir/$doRfixup/bin/R\" script\n";
open(RSC, "$dest_dir/$doRfixup/bin/R") ||
croak("Could not open '$dest_dir/$doRfixup/bin/R' for reading");
open(RSCN, ">$dest_dir/$doRfixup/bin/R.new") ||
croak("Could not open '$dest_dir/$doRfixup/bin/R.new' for writing");
while(<RSC>) {
if(/^R_HOME_DIR=/) {
print STDERR "Modifying R_HOME_DIR\n";
print RSCN "R_HOME_DIR=$dest_dir/$doRfixup\n";
} else { print RSCN $_; }
}
close(RSC); close(RSCN);
system("mv $dest_dir/$doRfixup/bin/R.new $dest_dir/$doRfixup/bin/R");
system("chmod a+x $dest_dir/$doRfixup/bin/R");
push @{$counters}, "Fetcher,R path fixups performed,1";
}
# Call user-supplied function
if(defined($lockSub)) { $lockSub->(); }
system("touch $done_file");
} else {
print STDERR "Pid $$: didn't get the lock; waiting for master to finish\n";
my $sleeps = 0;
while(! -f $done_file) {
sleep(3);
if((++$sleeps % 10) == 0) {
my $secs = $sleeps * 3;
print STDERR "Pid $$: still waiting (it's been $secs seconds)\n";
}
}
print STDERR "Pid $$: master finished; continuing\n";
}
close(FH);
} else {
print STDERR "Pid $$: done file $done_file was there already; continuing\n";
}
(-f $done_file) || croak("Pid $$: about to exit ensureFetched, but done file $done_file doesn't exist\n");
}
##
# Check if a local, hdfs or s3 (or other Hadoop-supported fs) file or
# directory exists.
#
sub fs_exists {
my $path = shift;
my $rc;
if(Util::is_local($path)) {
$rc = Util::run("stat $path >& /dev/null");
} else {
my $hadoop = Tools::hadoop();
$path =~ s/^hdfs:\/\///i;
$rc = Util::run("($hadoop fs -stat $path) >& /dev/null");
}
return !$rc;
}
##
# Put a file into a a local, hdfs or s3 (or other Hadoop-supported fs)
# path.
#
# $src must be a path to a file
#
# $dst must be a path to a directory; it can't specify the destination
# filename - the basename from $src is preserved
#
sub fs_put {
my ($src, $dst) = @_;
my $base = fileparse($src);
$dst .= "/" unless $dst =~ /\/$/;
my $fulldst = "$dst$base";
if(fs_exists($fulldst)) {
print STDERR "WARNING: replacing old $dst from hdfs\n";
if(Util::is_local($fulldst)) {
Util::run("rm -rf $fulldst >&2");
} else {
my $hadoop = Tools::hadoop();
if($fulldst =~ /^hdfs:/i) {
my $fd = $fulldst;
$fd =~ s/^hdfs:\/\///i;
Util::run("$hadoop dfs -rmr $fulldst >&2");
} else {
Util::run("$hadoop fs -rmr $fulldst >&2");
}
}
}
my $rc;
if(Util::is_local($src) && Util::is_local($dst)) {
mkpath($dst);
$rc = Util::run("cp $src $dst >&2");
} else {
my $hadoop = Tools::hadoop();
if($dst =~ /^hdfs:/i) {
my ($d, $fd) = ($dst, $fulldst);
$d =~ s/^hdfs:\/\///i;
$fd =~ s/^hdfs:\/\///i;
Util::run("$hadoop dfs -mkdir $dst >&2");
$rc = Util::run("$hadoop dfs -put $src $fd >&2");
} else {
Util::run("$hadoop fs -mkdir $dst >&2");
$rc = Util::run("$hadoop fs -put $src $fulldst >&2");
}
}
die "Can't load $src to $dst ($rc)\n" if $rc;
}
##
# Remove a file in a local, hdfs or s3 (or other Hadoop-supported fs)
# path.
#
sub fs_remove {
my ($path) = @_;
my $rc;
if(Util::is_local($path)) {
$rc = Util::run("rm -rf $path >&2");
} else {
my $hadoop = Tools::hadoop();
$path =~ s/^hdfs:\/\///i;
$rc = Util::run("$hadoop fs -rmr $path >&2");
}
return $rc;
}
1;