dcsimg

bashreduce: A Bare-Bones MapReduce

Harness the power of distributed computing using everyday Unix command-line tools and a clever little bash script.

In late 2004, Google surprised the world of computing with the release of the paper MapReduce: Simplified Data Processing on Large Clusters. That paper ushered in a new model for data processing across clusters of machines that had the benefit of being simple to understand and incredibly flexible. Once you adopt a MapReduce way of thinking, dozens of previously difficult or long-running tasks suddenly start to seem approachable–if you have sufficient hardware.

If you’ve managed to somehow miss most of the MapReduce revolution, Wikipedia describes it pretty well:

MapReduce is a framework for computing certain kinds of distributable problems using a large number of computers (nodes), collectively referred to as a cluster. Computational processing can occur on data stored either in a filesystem (unstructured) or within a database (structured).

“Map” step: The master node takes the input, chops it up into smaller sub-problems, and distributes those to worker nodes. A worker node may do this again in turn, leading to a multi-level tree structure.

The worker node processes that smaller problem, and passes the answer back to its master node.

“Reduce” step: The master node then takes the answers to all the sub-problems and combines them in a way to get the output – the answer to the problem it was originally trying to solve.

In fact, the MapReduce model has proven so useful that the Apache Hadoop project (an Open Source implementation of the infrastructure described in the Google paper) has become very popular in the last few years. Yahoo, which employs numerous Hadoop committers, recently hosted their annual Hadoop Summit which attracted over 500 users and developers.

Amazon.com offers Hadoop as the Elastic MapReduce service on their EC2 platform. And startup Cloudera (founded by former Yahoo, Facebook, and Google employees) has begun to offer training, support, and consulting services around Hadoop and their own packaged Hadoop distribution. (Yahoo has since published their internal version on Github too.)

Until recently, there hasn’t been an easy way, aside from possibly Amazon’s offerings, to try out MapReduce–especially on your own hardware.

bash for the win!

Earlier this year, Erik Frey of last.fm, announced bashreduce on the last.fm blog.

More than just a toy project, bashreduce lets us address a common scenario around these parts: we have a few analysis machines lying around, and we have data from various systems that are not in Hadoop. Rather than go through the rigmarole of sending it to our Hadoop cluster and writing yet another one-off Java or Dumbo program, we instead fire off a one-liner bashreduce using tools we already know in our reducer: sort, awk, grep, join, and so on.

It sounds almost comical but this makes a lot of sense, really. Like most of the Unix shell tools, bash is nearly everywhere. So why not build up enough of a bash script to facilitate basic MapReduce style processing for periodic or one-off jobs? It’s really quite handy.

bashreduce is new enough that it’s not packaged up for popular distributions yet, but you can pull a copy from github easily enough:

$ git clone git://github.com/erikfrey/bashreduce.git
Initialized empty Git repository in /home/jzawodn/cvs/bashreduce/.git/
remote: Counting objects: 136, done.
remote: Compressing objects: 100% (136/136), done.
Receiving objects: 100% (136/136), 21.17 KiB, done.
Resolving deltas: 100% (49/49), done.
remote: Total 136 (delta 49), reused 0 (delta 0)

And let’s build the optional performance boosting utilities it comes with:

$ cd bashreduce/brutils
$ make
cc -O3 -Wall   -c -o brp.o brp.c
cc -o brp brp.o
cc -O3 -Wall   -c -o brm.o brm.c
cc -o brm brm.o

$ sudo make install
install -c brp /usr/local/bin
install -c brm /usr/local/bin

Let’s give it a run to see the command-line help:

$ ./br -h
Usage: br: [-m host1 [host2...]] [-c column] [-r reduce] [-i input] [-o output]
bashreduce.  Map an input file to many hosts, sort/reduce, merge
  -m: hosts to use, can repeat hosts for multiple cores
      default hosts from /etc/br.hosts
  -c: column to partition, default = 1 (1-based)
  -r: reduce function, default = identity
  -i: input file, default = stdin
  -o: output file, default = stdout
  -t: tmp dir to use, default = /tmp
  -S: memory to use for sort, default = 256M
  -h: this help message

As you can see, br needs a few arguments and possibly a config file setup before it’s useful. First, you need to specify the list of hosts (nodes) which distribute data to and run on. You can either list them as a quoted -m argument, like "host1 host2 host3" or list them in a /etc/br.hosts file, one host per line. If you have multi-core hosts, you can list them more than once to take advantage of additional CPU cores.

You’ll need password-less ssh access to all the hosts, but you’re already running keychain anyway, right? Good.

Examples

Let’s start with a useless but instructive example:

$ br -m "localhost localhost" < /etc/passwd > /tmp/sorted

That will take your /etc/passwd file, chop it into two pieces, sort them, and them merge and sort the results. Nobody needs a sorted /etc/passwd file but if you had a much larger file in need of sorting or, preferably, a more CPU-intensive bit of processing, this approach would make some sense. The point is that you’ve just distributed this work among both CPU cores on your machine without having to do a lot of extra work.

Suppose you have a multi-field whitespace delimited log file and wanted to extract a single column, count up the occurrences of each value in that column, and see the results.

$ cut -d' ' -f6 /var/log/messages | ./br -m "host1 host2 host3 host4" -r "uniq -c"
    330 MARK
    120 --
      2 syslogd
      5 1.5.0#2ubuntu6:
      1 rsnapshot[20552]:
      1 rsnapshot[3030]:
      6 /usr/bin/rsnapshot

The choice of /var/log/messages here is primarily motivated by the fact that you’re likely to have it on your system. A multi-gigabyte Apache log or application server log would lend itself to this type of processing.

These examples are fairly trivial but serve to show you how to get started. The real power comes when you’re using your own code instead of a uniq command.

bashreduce Enhacements

Since the release of bashreduce, developer Richard Crowley has extended bashreduce, adding several useful features:

  • the ability to pass a filename (rather than the actual file data via an nc pipe) to each process. This assumes that each machine has a local copy of the data or access to a shared filesystem. This will greatly reduce the network bandwidth required.
  • supports processing a directory full of files rather than a single file. Any of the files may be compressed using gzip and bashreduce will detect that and transparently handle decompression.
  • the -M option allows you to specify your own merge program instead of the default (sort -M)

You can find his copy on github too. He’s actively using it for some data analysis tasks at OpenDNS.

Whether you use Erik’s original bashreduce or Richard’s fork, you end up with the ability to extend the basic Unix philosophy of standard tools speaking to each other on stdin/stdout to many hosts all doing work in parallel. Not bad for a little bash scripting, huh?

Comments on "bashreduce: A Bare-Bones MapReduce"

stevenworr

There\’s a huge amount of cleanup that can be done here. I question whether the -m option works. It is supposed to allow multiple hosts, but it looks like it only gets one. No? If you really want it to work you have to either allow multiple -m options or you have to have the multiple hosts separated by commas.
e.g., -m h1,h2,h3
and then break the list up.

Instead of using basename all over, I just set prog globally using

prog=\”${0##*/}\”

-z $hosts will be a syntax error if it\’s null and there are no quotes around it.

There was a comment at the end about killing a negative. That works because the signal is sent to the process group.

There\’s lots more to be done, but this is sort of an initial tickle.

#!/bin/bash
# bashreduce: mapreduce in bash
# erik@fawx.com

usage()
{
cat <<EOF 1>&2
Usage: $prog [-m host1 [host2...]] [-c column] [-r reduce] [-i input] [-o output]
$prog -h for help.
EOF
exit 2
}

showhelp()
{
cat <<EOF
Usage: $prog $1: [-m host1 [host2...]] [-c column] [-r reduce] [-i input] [-o output]
bashreduce. Map an input file to many hosts, sort/reduce, merge
-m: hosts to use, can repeat hosts for multiple cores
default hosts from /etc/br.hosts
-c: column to partition, default = 1 (1-based)
-r: reduce function, default = identity
-i: input file, default = stdin
-o: output file, default = stdout
-t: tmp dir to use, default = /tmp
-S: memory to use for sort, default = 256M
-h: this help message
EOF
exit 2
}

prog=\”${0##*/}\”
hosts=
mapcolumn=1
reduce=
input=
output=
tmp_dir=/tmp
sort_mem=256M
set -x
while getopts \’m:c:r:i:o:t:S:h\’ name
do
case $name in
m) hosts=$OPTARG;;
c) mapcolumn=$OPTARG;;
r) reduce=$OPTARG;;
i) input=$OPTARG;;
o) output=$OPTARG;;
t) tmp_dir=$OPTARG;;
S) sort_mem=$OPTARG;;
h) showhelp $prog;;
[?]) usage $prog;;
esac
done

if [[ -z \"$hosts\" ]]
then
if [[ ! -e /etc/br.hosts ]]
then
echo \”$prog: must specify hosts with -m or provide /etc/br.hosts\”
usage $0
fi
hosts=$(< /etc/br.hosts)
fi

# if we have a reduce, add the pipe explicitly
[ -n \"$reduce\" ] && reduce=\”| $reduce 2>/dev/null\”

# okay let\’s get started! first we need a name for our job
jobid=$(uuidgen)
jobpath=$tmp_dir/br_job_$jobid
nodepath=$tmp_dir/br_node_$jobid
mkdir -p $jobpath/{in,out}

# now, for each host, set up in and out fifos (and a netcat for each),
# and ssh to each host to set up workers listening on netcat

port_in=8192
port_out=$(($port_in + 1))
host_idx=0
out_files=

for host in $hosts
do
# our named pipes
mkfifo $jobpath/{in,out}/$host_idx
# lets get the pid of our listener
ssh -n $host \”mkdir -p $nodepath\”
set — $(ssh -n $host \”nc -l -p $port_out >$nodepath/in_$host_idx 2>/dev/null </dev/null & jobs -l\”)
pid=$2
ssh $host -n \”tail -s0.1 -f –pid=$pid $nodepath/in_$host_idx 2>/dev/null </dev/null | LC_ALL=\’$LC_ALL\’ sort -S$sort_mem -T$tmp_dir -k$mapcolumn,$mapcolumn 2>/dev/null $reduce | nc -q0 -l -p $port_in >&/dev/null &\”
# our local forwarders
nc $host $port_in >$jobpath/in/$host_idx &
nc -q0 $host $port_out <$jobpath/out/$host_idx &
# our vars
out_files=\”$out_files $jobpath/out/$host_idx\”
port_in=$(($port_in + 2))
port_out=$(($port_in + 1))
host_idx=$(($host_idx + 1))
done

# okay, time to map
if which brp >/dev/null
then
eval \”${input:+pv $input |} brp – $(($mapcolumn – 1)) $out_files\”
else
# use awk if we don\’t have brp
# we\’re taking advantage of a special property that awk leaves
# its file handles open until its done
# i think this is universal
# we\’re also sending a zero length string to all the handles at
# the end, in case some pipe got no love
eval \”${input:+pv $input |} awk \’{
srand(\\$$mapcolumn);
print \\$0 >>\\\”$jobpath/out/\\\”int(rand() * $host_idx);
}
END {
for (i = 0; i != $host_idx; ++i)
printf \\\”\\\” >>\\\”$jobpath/out/\\\”i;
}\’\”
fi

# save it somewhere
if which brm >/dev/null
then
eval \”brm – $(($mapcolumn – 1)) find $jobpath/in/ -type p | xargs ${output:+| pv >$output}\”
else
# use sort -m if we don\’t have brm
# sort -m creates tmp files if too many input files are specified
# brm doesn\’t do this
eval \”sort -k$mapcolumn,$mapcolumn -m $jobpath/in/* ${output:+| pv >$output}\”
fi

# finally, clean up after ourselves
rm -rf $jobpath
for host in $hosts
do
ssh $host \”rm -rf $nodepath\”
done

# TODO: is there a safe way to kill subprocesses upon fail?
# this seems to work: /bin/kill — -$$
# It works because the neg val is used to specify the process group.

solonari

The multi-host capability is simply a documentation issue. As you can see in the example utilizing two local cores, the -m switch is passed a single parameter containing a whitespace-delimited list of target hosts.

Appreciate you sharing, great blog post.Really looking forward to read more. Really Great.

Leave a Reply