<<Prev            Next>>

Effective Utilization of the Biowulf Cluster for Bioinformatics

David Hoover, Helix Systems, CIT/NIH
May 21, 2008

Biowulf:

* ~1800 nodes

* ~5000 CPUs

* 50-100 simultaneous users

* 5-10 nodes each user

* 160 CPUs max per user

* ~7 publications per month

Examples of Bioinformatics Tasks

BLAST Example:

* 1000 nucleotide sequences

* GenBank nt database

Database is ~7.5GB!

Protein Folding Example:

* Generate 100,000 folding models using Rosetta

* Cluster top 1% of results using R

* Screen top models with various structure quality tools

Huge number of output files, several applications to pipeline

Genome-Wide Association Example:

* 80,000 SNPs per genome

* Several hundred participants

* MCMC methods using R

Each R run requires >2GB of memory

Protein Threading Example:

* Threading single sequence through 10's of thousands of templates

* Generate models for best alignments

* Screen with structure quality tools

10's of thousands of output files to parse, buggy application is prone to failure

Task Analysis

A task can contain multiple steps.

* Are the steps independent, dependent, or mixed?

* Do the steps require message passing?

* Are there files to be read/written?

* Where will these files be located?

* Are there any special I/O requirements?

* How long will the entire task need to run?

* How much memory will the task or the individual steps require?

* What happens when a step goes wrong or gives an error?

* Are there multiple tasks to be pipelined?

Independent vs. Dependent Steps

An independent step is a function that can be carried out in complete isolation. When multiple independent steps are run in parallel, this is known as embarassingly parallel.

* When does a CPU need to know what other CPUs are doing?

* Are there any partition/consolidation steps?

Example: aligning a single sequence against the nt database can be an independent step, but parsing all the alignment outputs and consolidating the data into a single summary is a dependent step.

Message Passing

The problem of message passing arises when running dependent steps in parallel.

* In instances of processing multiple inputs, how does one step know what has already been done by other steps?

Example: keep only the top 1% results, delete the rest.

Methods:

* Write to a single file (extremely bad -- file locking)

* Each step writes to its own file (also bad -- attribute caching)

* Client/server using sockets

* MySQL

* MPI

File Storage

File storage space is enormous, but always limited.

* How much disk space is required?

* Where can the files be stored?

Keep in mind:

* quotas (use quota or checkquota command)

* directory limits (~300,000 files/directory)

* capacity (local /scratch directories on some nodes are only 30GB)

* clearscratch command

* inode table limits

I/O Requirements

Data can be input and output in many forms, depending on how the programs are written and your level of sophistication.

* Environment variables

* Commandline

* Piped input

* File(s)

* Database(s)

* Internet

Each of these methods have their own peculiar quirks:

* Environment variables are not shared across nodes, but they can be inherited through qsub -V or qsub -v variable=value

* Most of the bioinformatics flatfile databases are held in /spin, a high-performance file server

* Helix Systems maintains the MySQL server biobase.nih.gov which can be accessed from all of NIH

* The Biowulf cluster nodes have no internet connectivity

* The new helixdrive Samba service allows files written to /data and /home to be accessible via mapped network drives

I/O Speed

The speed of access is highly dependent on how the data is available:

* memory >> MySQL > local /scratch > /spin > /data & /home

Rather than open and close a file multiple times within a loop, read the contents into memory once. For large files, copy them at the beginning of the task to the local /scratch area first and access them there.

Failure

The failure of one or more steps could jeopardize the entire task.

* What can go wrong?

* What happens when something fails?

Example: when all the individual sequence alignments finish, consolidate the output into a single file, run another program to process the results from that file, then remove all the intermediate files. If the single file is corrupted by a single bad sequence alignment and the secondary program fails, you are left with nothing!

Pipelining Tasks

It is more efficient to pipeline tasks, rather than to wait for a task to finish, manually consolidate the results, then feed them into the next task.

* How do the tasks interact?

Since subsequent tasks depend on the previous task completing correctly, they are dependent.

Good Practices

There are some general practices that one should always keep in mind when running on the Biowulf cluster.

Use Memory As Much As Possible (but not too much)

Instead of reading files multiple times, or using system commands to extract information, consider capturing the entire contents of a file into memory and accessing it there instead.

* How to monitor memory?

* How to diagnose memory problems?

Methods:

* free - top level free memory info

* top - general system usage

* ps - list specific information about processes

* vmstat - virtual memory (swapping) info

When a process requires more memory than is available, it swaps. This is incredibly wasteful, and dramatically slows down processing speed.

Manage Files Responsibly

* Where to keep files?

* How to read/write large files?

* How to deal with large numbers of files?

Your /home quota is only 1GB, < thumb drive!. In general use /scratch for files temporarily needed during execution and /data for general storage.

Reading large files (> node memory) can be very slow, as the system will need to swap. Consider the following alternatives:

* Break the file into smaller files prior to reading

* Compress the file with gzip, tar, or zip

* Use appropriate system commands to access (mv, grep, zgrep, cat, dd)

* Use while loop rather than foreach loop in perl scripts

It is a very bad idea to keep more than 1000 files in a single directory. Store them in a structured directory tree:

directory tree image

Multiple CPUs

The power of the Biowulf cluster is the huge number of nodes available to single users. Take advantage of it!

* Serialize short steps, parallelize/swarm long steps

* Cluster semi-independent (linked) steps

Benchmarking

Try running the task briefly on a test node and see what happens.

* How much memory will the task require?

* How many CPUs can it use?

* How long will it take?

Depending on requirements, you made need to think about:

* Specifically requesting node properties, such as memory, speed, or availability

* checkpointing in some way to insure against failure

Example: use qsub -l nodes=1:m4096 for high-memory R steps.

Methods: top, ps, or free.

Batch System

PBS

* qsub [options] script - for submitting jobs to PBS

* qstat - direct status on jobs in PBS

* freen - what nodes are free

* npj [jobid] - what nodes allocated to a job

* jobload [options] - what is the load on the nodes allocated to a user or job

qsub

Simple options:

* -N name - set the job name

* -e [path/]filename - path and filename to write STDERR

* -o [path/]filename - path and filename to write STDOUT

* -j oe|eo|n - how and if to merge STDOUT and STDERR

* -k e|o|eo|oe|n - whether and which STDOUT and STDERR streams to keep

* -m n|a|b|e - whether and how mail regarding the job will be sent

* -S path - sets the shell path to interpret input script

* -I - run as an interactive job

* -r y|n - declares whether the job is rerunnable

* -a [[[[CC]YY]MM]DD]hhmm[.SS] - date and time for executation start

Complex options:

* -l resource_list - specify resource list

* -V - declares all environment variables are to be inherited

* -v variable_list - specifies which variables are to be inherited

* -W additional_attributes - specifies additional attributes

qsub -l

qsub -l nodes=n[:property[:property...]] where n = number of nodes and prop is:

* o2800,o2200,o2000,p2800 - node type (single-core)

* m2048,m4096,m8192 - memory size

* gige,ib,myr2k - interconnect type

* dc,o2600 - node type (dual-core)

* x86-64,k8 - 64-bit only

* altix:ncpus=x,mem=ygb - firebolt

Examples:

* qsub -l nodes=1 myjob

* qsub -l nodes=1:x86-64 my64bitjob

* qsub -l nodes=1:m4096 bigjob.bat

* qsub -l nodes=1:altix:ncpus=4,mem=12gb verybigmem.bat

* swarm -l nodes=1:m4096 -f bigjobs

qsub -v

qsub -v variable=value[,variable=value...]

Examples:

* qsub -l nodes=8:o2800:gige -v np=16 blast.sh

* qsub -l nodes=4:o2000:myr2k -v np=8 miscjob

* qsub -l nodes=16:ib -v np=32 md.sh

qsub -V

Example:

* qsub -I -V -l nodes=1

qsub -W

qsub -W attribute_name=value[,attribute_name=value...]

* depend=after|afterok|afternotok|afterany|before|beforeok|
beforenotok|beforeany|on

* group_list=[group]

* block=true

* stagein=local_file@hostname:remote_file

* stageout=local_file@hostname:remote_file

* umask=NNNN

Examples:

* qsub -l nodes=10
-W depend=afterok:1001.biobos:1002.biobos:1003.biobos next.sh

* qsub -l nodes=1
-W stagein=/scratch/tmp.dat@biowulf:/home/user/input.dat,
stageout=/scratch/final.output@biowulf:/home/user/final.output
myScript

* swarm -l nodes=1:p2800 -W group_list=secgrp,umask=027
-f swarmcmds

Warning: Don't stage in files to /scratch and then run clearscratch!

qdel [-W delay #|force|suppress_mail=#] jobid [jobid ...]

Examples:

* qdel 1001 1002 1003 1004 1005

* qdel -W force 12345.biobos

* qdel -W delay 10 `qselect -u user -s Q`

qstat [options]

Examples:

* qstat - list everything

* qstat -u [user] - short format for [user]

* qstat -f [jobid] - long format for [jobid]

* qstat -n -1 -u [user] - include nodes assigned, all on one line

swarm [options] -f commandfile

* -f - swarm command file

* -n # - number of processes per node

* -b # - number of sequential commands to run per CPU

* qsub options - can include most qsub options

Examples:

* swarm -b 20 -f my1000commands.txt

* swarm -V -W depend=afterok:10001,10002,10003,10004,10005,10006 -f next500cmds.sh

* swarm -a 11111111.11 -f script

swarmdel [options] jobid|jobname

* -n - do a test run, don't actually delete anything

* -d - same as -n

* -i - interactive mode

* -q - quiet mode

* -s - silent mode

* -f - forceful; wait until job is gone from the queue

* -t # - number of seconds to wait (forceful mode only)

* -E|H|Q|R|T|W|S - delete swarm jobs in E|H|Q|R|T|W|S state

Examples:

* swarmdel -f -t 5 12345.biobos

* swarmdel -q swarm1n12345

* swarmdel -Q -H -W 54321

multirun

Assign a set of nodes to a single job, and retains all nodes until all processes are finished.

#!/bin/bash # # this file is run6.sh # switch ($MP_CHILD) case 0: /usr/local/bin/myprog arg0 breaksw case 1: /usr/local/bin/myprog arg1 breaksw case 2: /usr/local/bin/myprog arg2 breaksw case 3: /usr/local/bin/myprog arg3 breaksw case 4: /usr/local/bin/myprog arg4 breaksw case 5: /usr/local/bin/myprog arg5 breaksw endsw
#!/bin/bash # # this file is myjob.sh # set path=(/usr/local/mpich/bin $path) mpirun -machinefile $PBS_NODEFILE -np 6 \ /usr/local/bin/multirun -m /home/me/run6.sh
qsub -l nodes=3 myjob.sh

Scripts

Perl scripts are the glue that hold the core executables and system commands of a job together. Writing good code is very important for accomplishing complex tasks.

Here are some snippets of useful code for quick and simple tasks.

This script takes a file and breaks it up into $num files of equal number of lines. The leftover lines are written to the $num-th file. This script uses the input line number variable ($.) to keep track of which line number has been last read.

$inputfile = $ARGV[0]; $outputname = $ARGV[1]; $num = $ARGV[2]; chomp($total_lines = `wc -l $inputfile 2>&1 | cut -d" " -f1`); if ( $total_lines > $num ) { $num_of_files = $num; } else { $num_of_files = $total_lines; } $lines_per_file = int $total_lines/$num_of_files; $lines_leftover = ($total_lines % $num_of_files); print "NUMBER OF FILES: $num_of_files\n"; print "NUMBER OF LINES: $total_lines\n"; print "LINES PER FILE: $lines_per_file\n"; print "LINES LEFTOVER: $lines_leftover\n"; open INPUT,"<$inputfile"; $next = 1; $filenum = 1; open OUTPUT,">$outputname.$filenum"; while (<INPUT>) { if ($. == $next && $filenum <= $num_of_files) { close OUTPUT; open OUTPUT,">$outputname.$filenum"; $filenum++; $next = (($filenum*($lines_per_file))-$lines_per_file)+1; } print OUTPUT $_; } close OUTPUT;

This script takes a FASTA-formatted file and writes the records into smaller files $num records per file. This is done easily by modifying the input record separator ($/) to look for the initial > at the beginning of each FASTA record. It also uses the input line number variable ($.) to keep track of how many records have been read.

$inputfile = $ARGV[0]; $outputname = $ARGV[1]; $num = $ARGV[2]; open INPUT,"<$inputfile"; $/="\n>"; while (<INPUT>) { s/>$//; s/^/>/ if ($.>1); if (!($.%$num)) { close OUTPUT; open OUTPUT,">$outputname.$."; } print OUTPUT; }

This script opens a file and reads through the file one megabyte at a time.

open FILE, "<$ARGV[0]"; while (read(FILE,$buffer,1048576)) { $pointer = tell(FILE); # What is the current file position in bytes? # do something with $buffer } close FILE;

Reading from zcat pipe

open FH,"zcat filename |"; while (<FH>) { # do something } close FH;

Classic file slurping

$holdTerminator=$/; undef $/; open (FILE, "<myFile"); $content = <FILE>; close FILE; $/=$holdTerminator;

The perl module File::Temp is used to create temporary files safely. It makes sure that no other processes have opened the file, and allows for automatic deletion after the script exits.

use File::Temp qw/tempfile tempdir/; # In the simplest case, create a filehandle and a filename: ($fh,$fn) = tempfile(); # open a temporary file in /tmp print {$fh} "Hello world!\n"; # write to it undef $fh; # close the filehandle unlink $fn; # remove the file # Create a temporary directory, using a template name 'tmpXXXX' (the X's will # be replaced with random characters), delete the directory and its contents # after the script exits, and create the directory under '/data/user': $dir = tempdir('tmpXXXX',CLEANUP=>1,DIR=>'/data/user/'); # Create a filehandle and filename using a template name 'tmpXXXX', remove # the file on exit, add the suffix '.dat' to the end of the name, and create # the file under $dir: ($fh,$fn) = tempfile('tmpXXXX',UNLINK=>1,SUFFIX=>'.dat',DIR=>$dir);

This subroutine recursively creates a directory tree of 1000 terminal subdirectories (10X10X10). They can be accessed as base/0/0/0/0, base/0/0/0/1, base/0/0/0/2, ..., base/0/9/9/9.

MakeDirs("base", 10, 3); sub MakeDirs { my($base, $num_dirs, $depth) = @_; mkdir $base; return if ($depth == 0); my $dir_name = "0"; for (my $x = 0; $x < $num_dirs; $x++) { MakeDirs("$base/$dir_name", $num_dirs, $depth - 1); $dir_name++; } }

This script demonstrates the perl module Parallel::ForkManager. It spawns at most $max_procs processes at a time and waits for them all to finish. It allows subroutines to be run before, during, and after the child processes are run.

use strict; use Parallel::ForkManager; my $max_procs = 4; my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara ); # Array to resolve PID's back to child specific information my $pm = new Parallel::ForkManager($max_procs); $pm->run_on_finish( # Run this subroutine after every forked process # Setup a callback for when a child finishes up so we can # get it's exit code sub { my ($pid, $exit_code, $ident) = @_; print "** $ident just got out of the pool ". "with PID $pid and exit code: $exit_code\n"; } ); $pm->run_on_start( # Run this subroutine before every forked process sub { my ($pid,$ident)=@_; print "** $ident started, pid: $pid\n"; } ); $pm->run_on_wait( # Run this subroutine above every 2 seconds sub { my $time = scalar(localtime()); print "** Waiting... current time: $time\n" }, 2.0 ); foreach my $child ( 0 .. $#names ) { my $pid = $pm->start($names[$child]) and next; # This code is the child process print "This is $names[$child], Child number $child\n"; sleep ( 2 * $child ); print "$names[$child], Child $child is about to get out...\n"; sleep 1; $pm->finish($child); # pass an exit code to finish } print "Waiting for Children...\n"; $pm->wait_all_children; print "Everybody is out of the pool!\n";

The perl module IO::Socket is for dealing with message passing. This first script starts a server listening on an unused port. If another process connects to that port, the server echoes the content locally until 'end' on a single line is received. It then runs the command and returns the results back to the client.

use IO::Socket; my $host = 'biowulf.nih.gov'; my $port = 12345; my $local; while (!$local) { print "trying port $port\n"; $local = IO::Socket::INET->new( Proto => 'tcp', LocalHost => $host, LocalPort => $port, Reuse => 1 ); if (!$local) { $port++; } } $local->listen(); # listen $local->autoflush(1); # To send response immediately print "At your service. Waiting...\n"; my $addr; # Client handle while ($addr = $local->accept() ) { # receive a request print "Connected from: ", $addr->peerhost(); # Display messages print " Port: ", $addr->peerport(), "\n"; my $command; # variable for Result while (<$addr>) { # Read all messages from client # (Assume all valid numbers) last if m/^end/gi; # if message is 'end' # then exit loop print "Received: $_"; # Print received message print $addr $_; # Send received message back # to verify $command .= $_; # Add value to result } chomp; # Remove the if (m/^end/gi) { # You need this. Otherwise if # the client terminates abruptly # The server will encounter an # error when it sends the result back # and terminate chomp $command; chomp(my $result = `$command 2>&1`); # Format result message foreach my $send (split /\n/,$result) { print $addr "$send\n"; # send the result message } print $addr "end\n"; print "Result: $command\n"; # Display sent message } print "Closed connection\n"; # Inform that connection # to client is closed close $addr; # close client print "At your service. Waiting...\n"; # Wait again for next request }

The second script starts a client that attempts to connect to the port on the server host. If successful, commands can be typed interactively and sent to the host to execute. When the client sends 'end' on a single line, the commands sent to the host are executed on the host, and the results are sent back to the client where they are displayed. The client then exits.

use IO::Socket; $remote = IO::Socket::INET->new( Proto => 'tcp', # protocol PeerAddr=> 'biowulf.nih.gov', # Address of server PeerPort=> "54321", # port of server Reuse => 1, ) or die "$!"; print "Connected to ", $remote->peerhost, # Info message " on port: ", $remote->peerport, "\n"; $remote->autoflush(1); # Send immediately while (<>) { # Get input from STDIN print $remote $_; # Send to Server last if m/^end/gi; # If 'end' then exit loop my $line = <$remote>; # Receive echo from server if ($line ne $_) { # If not the same as input print "Error in sending output\n"; # error exit; # Terminate } } while (<$remote>) { # Receive result from server last if m/^end/gi; # If 'end' then exit loop print $_; # Print the result } print "End of client\n"; # End of client close $remote; # Close socket

This script demonstrates the perl module DBI. This allows access to the MySQL server via perl.

use strict; use DBI(); my %MYSQL = ( db => "hg18", login => "user", password => "L1gB!8Nz", host => "mysql", ); eval { $MYSQL{dbh} = DBI->connect( "DBI:mysql:database=$MYSQL{db};host=$MYSQL{host}", $MYSQL{login}, $MYSQL{password}, {'RaiseError' => 1}) }; if ($@) { print "$@\n"; exit; } print "Connected to $MYSQL{db} on $MYSQL{host} as $MYSQL{login}\n"; my $sth = $MYSQL{dbh}->prepare("SHOW TABLES"); $sth->execute(); while (my(@a) = $sth->fetchrow_array()) { printf ("%s\n",(@a)); } $sth->finish(); $MYSQL{dbh}->disconnect(); print "Disconnected\n";

On occasion it is preferable to send yourself or others alerts about a batch job. This script demonstrates the Mail::Mailer perl module.

use Mail::Mailer; $mailer = Mail::Mailer->new("sendmail"); $mailer->open({ From => 'user@host.nih.gov', To => 'user@host.nih.gov', Subject => 'Test Email', }) or die "Can't open: $!\n"; print $mailer 'This is the text'; $mailer->close();