Effective Utilization of the Biowulf Cluster for Bioinformatics
David Hoover, Helix Systems, CIT/NIH
May 21, 2008
Biowulf:
* ~1800 nodes
* ~5000 CPUs
* 50-100 simultaneous users
* 5-10 nodes each user
* 160 CPUs max per user
* ~7 publications per month
Examples of Bioinformatics Tasks
BLAST Example:
* 1000 nucleotide sequences
* GenBank nt database
Database is ~7.5GB!
Protein Folding Example:
* Generate 100,000 folding models using Rosetta
* Cluster top 1% of results using R
* Screen top models with various structure quality tools
Huge number of output files, several applications to pipeline
Genome-Wide Association Example:
* 80,000 SNPs per genome
* Several hundred participants
* MCMC methods using R
Each R run requires >2GB of memory
Protein Threading Example:
* Threading single sequence through 10's of thousands of templates
* Generate models for best alignments
* Screen with structure quality tools
10's of thousands of output files to parse, buggy application is prone to failure
Task Analysis
A task can contain multiple steps.
* Are the steps independent, dependent, or mixed?
* Do the steps require message passing?
* Are there files to be read/written?
* Where will these files be located?
* Are there any special I/O requirements?
* How long will the entire task need to run?
* How much memory will the task or the individual steps require?
* What happens when a step goes wrong or gives an error?
* Are there multiple tasks to be pipelined?
Independent vs. Dependent Steps
An independent step is a function that can be carried out in complete isolation. When multiple independent steps are run in parallel, this is known as embarassingly parallel.
* When does a CPU need to know what other CPUs are doing?
* Are there any partition/consolidation steps?
Example: aligning a single sequence against the nt database can be an independent step, but parsing all the alignment outputs and consolidating the data into a single summary is a dependent step.
Message Passing
The problem of message passing arises when running dependent steps in parallel.
* In instances of processing multiple inputs, how does one step know what has already been done by other steps?
Example: keep only the top 1% results, delete the rest.
Methods:
* Write to a single file (extremely bad -- file locking)
* Each step writes to its own file (also bad -- attribute caching)
* Client/server using sockets
* MySQL
* MPI
File Storage
File storage space is enormous, but always limited.
* How much disk space is required?
* Where can the files be stored?
Keep in mind:
* quotas (use quota or checkquota command)
* directory limits (~300,000 files/directory)
* capacity (local /scratch directories on some nodes are only 30GB)
* clearscratch command
* inode table limits
I/O Requirements
Data can be input and output in many forms, depending on how the programs are written and your level of sophistication.
* Environment variables
* Commandline
* Piped input
* File(s)
* Database(s)
* Internet
Each of these methods have their own peculiar quirks:
* Environment variables are not shared across nodes, but they can be inherited through qsub -V or qsub -v variable=value
* Most of the bioinformatics flatfile databases are held in /spin, a high-performance file server
* Helix Systems maintains the MySQL server biobase.nih.gov which can be accessed from all of NIH
* The Biowulf cluster nodes have no internet connectivity
* The new helixdrive Samba service allows files written to /data and /home to be accessible via mapped network drives
I/O Speed
The speed of access is highly dependent on how the data is available:
* memory >> MySQL > local /scratch > /spin > /data & /home
Rather than open and close a file multiple times within a loop, read the contents into memory once. For large files, copy them at the beginning of the task to the local /scratch area first and access them there.
Failure
The failure of one or more steps could jeopardize the entire task.
* What can go wrong?
* What happens when something fails?
Example: when all the individual sequence alignments finish, consolidate the output into a single file, run another program to process the results from that file, then remove all the intermediate files. If the single file is corrupted by a single bad sequence alignment and the secondary program fails, you are left with nothing!
Pipelining Tasks
It is more efficient to pipeline tasks, rather than to wait for a task to finish, manually consolidate the results, then feed them into the next task.
* How do the tasks interact?
Since subsequent tasks depend on the previous task completing correctly, they are dependent.
Good Practices
There are some general practices that one should always keep in mind when running on the Biowulf cluster.
Use Memory As Much As Possible (but not too much)
Instead of reading files multiple times, or using system commands to extract information, consider capturing the entire contents of a file into memory and accessing it there instead.
* How to monitor memory?
* How to diagnose memory problems?
Methods:
* free - top level free memory info
* top - general system usage
* ps - list specific information about processes
* vmstat - virtual memory (swapping) info
When a process requires more memory than is available, it swaps. This is incredibly wasteful, and dramatically slows down processing speed.
Manage Files Responsibly
* Where to keep files?
* How to read/write large files?
* How to deal with large numbers of files?
Your /home quota is only 1GB, < thumb drive!. In general use /scratch for files temporarily needed during execution and /data for general storage.
Reading large files (> node memory) can be very slow, as the system will need to swap. Consider the following alternatives:
* Break the file into smaller files prior to reading
* Compress the file with gzip, tar, or zip
* Use appropriate system commands to access (mv, grep, zgrep, cat, dd)
* Use while loop rather than foreach loop in perl scripts
It is a very bad idea to keep more than 1000 files in a single directory. Store them in a structured directory tree:
![directory tree image](directoryTree.png)
Multiple CPUs
The power of the Biowulf cluster is the huge number of nodes available to single users. Take advantage of it!
* Serialize short steps, parallelize/swarm long steps
* Cluster semi-independent (linked) steps
Benchmarking
Try running the task briefly on a test node and see what happens.
* How much memory will the task require?
* How many CPUs can it use?
* How long will it take?
Depending on requirements, you made need to think about:
* Specifically requesting node properties, such as memory, speed, or availability
* checkpointing in some way to insure against failure
Example: use qsub -l nodes=1:m4096 for high-memory R steps.
Methods: top, ps, or free.
PBS
* qsub [options] script - for submitting jobs to PBS
* qstat - direct status on jobs in PBS
* freen - what nodes are free
* npj [jobid] - what nodes allocated to a job
* jobload [options] - what is the load on the nodes allocated to a user or job
qsub
Simple options:
* -N name - set the job name
* -e [path/]filename - path and filename to write STDERR
* -o [path/]filename - path and filename to write STDOUT
* -j oe|eo|n - how and if to merge STDOUT and STDERR
* -k e|o|eo|oe|n - whether and which STDOUT and STDERR streams to keep
* -m n|a|b|e - whether and how mail regarding the job will be sent
* -S path - sets the shell path to interpret input script
* -I - run as an interactive job
* -r y|n - declares whether the job is rerunnable
* -a [[[[CC]YY]MM]DD]hhmm[.SS] - date and time for executation start
Complex options:
* -l resource_list - specify resource list
* -V - declares all environment variables are to be inherited
* -v variable_list - specifies which variables are to be inherited
* -W additional_attributes - specifies additional attributes
qsub -l
qsub -l nodes=n[:property[:property...]] where n = number of nodes and prop is:
* o2800,o2200,o2000,p2800 - node type (single-core)
* m2048,m4096,m8192 - memory size
* gige,ib,myr2k - interconnect type
* dc,o2600 - node type (dual-core)
* x86-64,k8 - 64-bit only
* altix:ncpus=x,mem=ygb - firebolt
Examples:
* qsub -l nodes=1 myjob
* qsub -l nodes=1:x86-64 my64bitjob
* qsub -l nodes=1:m4096 bigjob.bat
* qsub -l nodes=1:altix:ncpus=4,mem=12gb verybigmem.bat
* swarm -l nodes=1:m4096 -f bigjobs
qsub -v
qsub -v variable=value[,variable=value...]
Examples:
* qsub -l nodes=8:o2800:gige -v np=16 blast.sh
* qsub -l nodes=4:o2000:myr2k -v np=8 miscjob
* qsub -l nodes=16:ib -v np=32 md.sh
qsub -V
Example:
* qsub -I -V -l nodes=1
qsub -W
qsub -W attribute_name=value[,attribute_name=value...]
* depend=after|afterok|afternotok|afterany|before|beforeok| beforenotok|beforeany|on
* group_list=[group]
* block=true
* stagein=local_file@hostname:remote_file
* stageout=local_file@hostname:remote_file
* umask=NNNN
Examples:
* qsub -l nodes=10 -W depend=afterok:1001.biobos:1002.biobos:1003.biobos next.sh
* qsub -l nodes=1 -W stagein=/scratch/tmp.dat@biowulf:/home/user/input.dat, stageout=/scratch/final.output@biowulf:/home/user/final.output myScript
* swarm -l nodes=1:p2800 -W group_list=secgrp,umask=027 -f swarmcmds
Warning: Don't stage in files to /scratch and then run clearscratch!
qdel [-W delay #|force|suppress_mail=#] jobid [jobid ...]
Examples:
* qdel 1001 1002 1003 1004 1005
* qdel -W force 12345.biobos
* qdel -W delay 10 `qselect -u user -s Q`
qstat [options]
Examples:
* qstat - list everything
* qstat -u [user] - short format for [user]
* qstat -f [jobid] - long format for [jobid]
* qstat -n -1 -u [user] - include nodes assigned, all on one line
swarm [options] -f commandfile
* -f - swarm command file
* -n # - number of processes per node
* -b # - number of sequential commands to run per CPU
* qsub options - can include most qsub options
Examples:
* swarm -b 20 -f my1000commands.txt
* swarm -V -W depend=afterok:10001,10002,10003,10004,10005,10006 -f next500cmds.sh
* swarm -a 11111111.11 -f script
swarmdel [options] jobid|jobname
* -n - do a test run, don't actually delete anything
* -d - same as -n
* -i - interactive mode
* -q - quiet mode
* -s - silent mode
* -f - forceful; wait until job is gone from the queue
* -t # - number of seconds to wait (forceful mode only)
* -E|H|Q|R|T|W|S - delete swarm jobs in E|H|Q|R|T|W|S state
Examples:
* swarmdel -f -t 5 12345.biobos
* swarmdel -q swarm1n12345
* swarmdel -Q -H -W 54321
multirun
Assign a set of nodes to a single job, and retains all nodes until all processes are finished.
#!/bin/bash
#
# this file is run6.sh
#
switch ($MP_CHILD)
case 0:
/usr/local/bin/myprog arg0
breaksw
case 1:
/usr/local/bin/myprog arg1
breaksw
case 2:
/usr/local/bin/myprog arg2
breaksw
case 3:
/usr/local/bin/myprog arg3
breaksw
case 4:
/usr/local/bin/myprog arg4
breaksw
case 5:
/usr/local/bin/myprog arg5
breaksw
endsw
#!/bin/bash
#
# this file is myjob.sh
#
set path=(/usr/local/mpich/bin $path)
mpirun -machinefile $PBS_NODEFILE -np 6 \
/usr/local/bin/multirun -m /home/me/run6.sh
qsub -l nodes=3 myjob.sh
Scripts
Perl scripts are the glue that hold the core executables and system commands of a job together. Writing good code is very important for accomplishing complex tasks.
Here are some snippets of useful code for quick and simple tasks.
This script takes a file and breaks it up into $num files of equal
number of lines. The leftover lines are written to the $num-th file.
This script uses the input line number variable ($.) to keep track of which line number has been last read.
$inputfile = $ARGV[0];
$outputname = $ARGV[1];
$num = $ARGV[2];
chomp($total_lines = `wc -l $inputfile 2>&1 | cut -d" " -f1`);
if ( $total_lines > $num ) { $num_of_files = $num; }
else { $num_of_files = $total_lines; }
$lines_per_file = int $total_lines/$num_of_files;
$lines_leftover = ($total_lines % $num_of_files);
print "NUMBER OF FILES: $num_of_files\n";
print "NUMBER OF LINES: $total_lines\n";
print "LINES PER FILE: $lines_per_file\n";
print "LINES LEFTOVER: $lines_leftover\n";
open INPUT,"<$inputfile";
$next = 1;
$filenum = 1;
open OUTPUT,">$outputname.$filenum";
while (<INPUT>) {
if ($. == $next && $filenum <= $num_of_files) {
close OUTPUT;
open OUTPUT,">$outputname.$filenum";
$filenum++;
$next = (($filenum*($lines_per_file))-$lines_per_file)+1;
}
print OUTPUT $_;
}
close OUTPUT;
This script takes a FASTA-formatted file and writes the records into
smaller files $num records per file. This is done easily by modifying the
input record separator ($/) to look for the initial > at the beginning
of each FASTA record. It also uses the input line number variable ($.)
to keep track of how many records have been read.
$inputfile = $ARGV[0];
$outputname = $ARGV[1];
$num = $ARGV[2];
open INPUT,"<$inputfile";
$/="\n>";
while (<INPUT>) {
s/>$//;
s/^/>/ if ($.>1);
if (!($.%$num)) {
close OUTPUT;
open OUTPUT,">$outputname.$.";
}
print OUTPUT;
}
This script opens a file and reads through the file one megabyte at a time.
open FILE, "<$ARGV[0]";
while (read(FILE,$buffer,1048576)) {
$pointer = tell(FILE); # What is the current file position in bytes?
# do something with $buffer
}
close FILE;
Reading from zcat pipe
open FH,"zcat filename |";
while (<FH>) {
# do something
}
close FH;
Classic file slurping
$holdTerminator=$/;
undef $/;
open (FILE, "<myFile");
$content = <FILE>;
close FILE;
$/=$holdTerminator;
The perl module File::Temp is used to create temporary files safely. It makes sure that no other processes have opened the file, and allows for automatic deletion after the script exits.
use File::Temp qw/tempfile tempdir/;
# In the simplest case, create a filehandle and a filename:
($fh,$fn) = tempfile(); # open a temporary file in /tmp
print {$fh} "Hello world!\n"; # write to it
undef $fh; # close the filehandle
unlink $fn; # remove the file
# Create a temporary directory, using a template name 'tmpXXXX' (the X's will
# be replaced with random characters), delete the directory and its contents
# after the script exits, and create the directory under '/data/user':
$dir = tempdir('tmpXXXX',CLEANUP=>1,DIR=>'/data/user/');
# Create a filehandle and filename using a template name 'tmpXXXX', remove
# the file on exit, add the suffix '.dat' to the end of the name, and create
# the file under $dir:
($fh,$fn) = tempfile('tmpXXXX',UNLINK=>1,SUFFIX=>'.dat',DIR=>$dir);
This subroutine recursively creates a directory tree of 1000 terminal subdirectories (10X10X10). They can be accessed as base/0/0/0/0, base/0/0/0/1, base/0/0/0/2, ..., base/0/9/9/9.
MakeDirs("base", 10, 3);
sub MakeDirs
{
my($base, $num_dirs, $depth) = @_;
mkdir $base;
return if ($depth == 0);
my $dir_name = "0";
for (my $x = 0; $x < $num_dirs; $x++)
{
MakeDirs("$base/$dir_name", $num_dirs, $depth - 1);
$dir_name++;
}
}
This script demonstrates the perl module Parallel::ForkManager. It spawns at most $max_procs processes at a time and waits for them all to finish. It allows subroutines to be run before, during, and after the child processes are run.
use strict;
use Parallel::ForkManager;
my $max_procs = 4;
my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara );
# Array to resolve PID's back to child specific information
my $pm = new Parallel::ForkManager($max_procs);
$pm->run_on_finish(
# Run this subroutine after every forked process
# Setup a callback for when a child finishes up so we can
# get it's exit code
sub { my ($pid, $exit_code, $ident) = @_;
print "** $ident just got out of the pool ".
"with PID $pid and exit code: $exit_code\n";
}
);
$pm->run_on_start(
# Run this subroutine before every forked process
sub { my ($pid,$ident)=@_;
print "** $ident started, pid: $pid\n";
}
);
$pm->run_on_wait(
# Run this subroutine above every 2 seconds
sub {
my $time = scalar(localtime());
print "** Waiting... current time: $time\n"
},
2.0
);
foreach my $child ( 0 .. $#names ) {
my $pid = $pm->start($names[$child]) and next;
# This code is the child process
print "This is $names[$child], Child number $child\n";
sleep ( 2 * $child );
print "$names[$child], Child $child is about to get out...\n";
sleep 1;
$pm->finish($child); # pass an exit code to finish
}
print "Waiting for Children...\n";
$pm->wait_all_children;
print "Everybody is out of the pool!\n";
The perl module IO::Socket is for dealing with message passing. This first script starts a server listening on an unused port. If another process connects to that port, the server echoes the content locally until 'end' on a single line is received. It then runs the command and returns the results back to the client.
use IO::Socket;
my $host = 'biowulf.nih.gov';
my $port = 12345;
my $local;
while (!$local) {
print "trying port $port\n";
$local = IO::Socket::INET->new( Proto => 'tcp',
LocalHost => $host,
LocalPort => $port,
Reuse => 1 );
if (!$local) { $port++; }
}
$local->listen(); # listen
$local->autoflush(1); # To send response immediately
print "At your service. Waiting...\n";
my $addr; # Client handle
while ($addr = $local->accept() ) { # receive a request
print "Connected from: ", $addr->peerhost(); # Display messages
print " Port: ", $addr->peerport(), "\n";
my $command; # variable for Result
while (<$addr>) { # Read all messages from client
# (Assume all valid numbers)
last if m/^end/gi; # if message is 'end'
# then exit loop
print "Received: $_"; # Print received message
print $addr $_; # Send received message back
# to verify
$command .= $_; # Add value to result
}
chomp; # Remove the
if (m/^end/gi) { # You need this. Otherwise if
# the client terminates abruptly
# The server will encounter an
# error when it sends the result back
# and terminate
chomp $command;
chomp(my $result = `$command 2>&1`); # Format result message
foreach my $send (split /\n/,$result) {
print $addr "$send\n"; # send the result message
}
print $addr "end\n";
print "Result: $command\n"; # Display sent message
}
print "Closed connection\n"; # Inform that connection
# to client is closed
close $addr; # close client
print "At your service. Waiting...\n"; # Wait again for next request
}
The second script starts a client that attempts to connect to the port on the server host. If successful, commands can be typed interactively and sent to the host to execute. When the client sends 'end' on a single line, the commands sent to the host are executed on the host, and the results are sent back to the client where they are displayed. The client then exits.
use IO::Socket;
$remote = IO::Socket::INET->new( Proto => 'tcp', # protocol
PeerAddr=> 'biowulf.nih.gov', # Address of server
PeerPort=> "54321", # port of server
Reuse => 1, ) or die "$!";
print "Connected to ", $remote->peerhost, # Info message
" on port: ", $remote->peerport, "\n";
$remote->autoflush(1); # Send immediately
while (<>) { # Get input from STDIN
print $remote $_; # Send to Server
last if m/^end/gi; # If 'end' then exit loop
my $line = <$remote>; # Receive echo from server
if ($line ne $_) { # If not the same as input
print "Error in sending output\n"; # error
exit; # Terminate
}
}
while (<$remote>) { # Receive result from server
last if m/^end/gi; # If 'end' then exit loop
print $_; # Print the result
}
print "End of client\n"; # End of client
close $remote; # Close socket
This script demonstrates the perl module DBI. This allows access to the MySQL server via perl.
use strict;
use DBI();
my %MYSQL = (
db => "hg18",
login => "user",
password => "L1gB!8Nz",
host => "mysql",
);
eval { $MYSQL{dbh} = DBI->connect(
"DBI:mysql:database=$MYSQL{db};host=$MYSQL{host}",
$MYSQL{login},
$MYSQL{password},
{'RaiseError' => 1})
};
if ($@) {
print "$@\n";
exit;
}
print "Connected to $MYSQL{db} on $MYSQL{host} as $MYSQL{login}\n";
my $sth = $MYSQL{dbh}->prepare("SHOW TABLES");
$sth->execute();
while (my(@a) = $sth->fetchrow_array()) {
printf ("%s\n",(@a));
}
$sth->finish();
$MYSQL{dbh}->disconnect();
print "Disconnected\n";
On occasion it is preferable to send yourself or others alerts about a batch job. This script demonstrates the Mail::Mailer perl module.
use Mail::Mailer;
$mailer = Mail::Mailer->new("sendmail");
$mailer->open({
From => 'user@host.nih.gov',
To => 'user@host.nih.gov',
Subject => 'Test Email',
})
or die "Can't open: $!\n";
print $mailer 'This is the text';
$mailer->close();
|