Sunday, July 22, 2012

Hadoop for Business and Security Intelligence - BI and SI with the help of POB - A Hadoop Tutorial

To generate comprehensive near to real time reports for Business Intelligence (BI) and Security Intelligence (SI) you will need to collect data from different sources, mash it up and aggregate it. As the term suggests "Big Data" refers to a huge amount of data generated by several systems in a company. While that is the reality of most companies few actually analyze their big data (like server logs) which in turn carry valuable information that can be used to make decisions on the Business side and on the Security side.

Specially Security is such an important aspect for Business (so many times underestimated) that I prefer to give it a very special treatment.

Intrusion Detection is one of those areas where analyzing different logs from switches, routers, OS and applications could give a good non-false-positive alert by the time an intruder gains access to important resources.

Hadoop allows us to achieve this important goal (SI) the same way it allows us to increase our BI solutions portfolio. Specially Hadoop Streaming API allows your Plain Old Bash ( POB ) scripts to run now in a cluster analyzing not only a local log but logs from any appliance in your Data Center (DC).

Not only you can use bash scripts to define Map and Reduce functionality but you can use POB recipes to automate the installation and configuration of Hadoop Clusters.

Here is my attempt to bring Hadoop to System and Data Base Administrators, to Software Engineers and Developers, to Security experts and Executives. You need no java knowledge, just bash knowledge which I assume is mandatory for any Linux operator. Hadoop is officially supported in Linux for production systems and bash scripting is a fundamental skill to master Linux.

What is Hadoop?

Hadoop is a distributed computing platform that includes a distributed file system and a parallel programming model which allows to distribute the load of long time taking tasks through inexpensive nodes in a cluster. Both data storage and data processing are distributed so if you need to process more data or if you need to provide more complex processing you just need to add more cluster nodes. The same applies if you want to speed up the time taken to provide results. Distributed computing is of course not just about speed but also about fault tolerance and Hadoop addresses that issue as well. If one node goes down the results of computations will still be available.

What Hadoop is not?

Transactional systems are not well suited for hadoop however analytics systems are. Do not think about hadoop as a database but rather a Platform that allows to extract valuable information in a NoSQL way: Using Keys and Value Pairs Mapping plus Reduce functions (MapReduce Pattern)

Hadoop Topology

Social issues aside Hadoop uses a Master/Slave approach. As you figured the Master will command what the Slaves will do, it will coordinate and assign the job a Slave. If a Slave dies failing to execute some logic then Hadoop will re-assign the job to a different Slave node. A Master runs two daemons (NameNode and JobTracker) while a Slave runs another two (DataNode and TaskTracker).

You can start with just one machine that process your data (using standalone or pseudo distributed mode) and as data or computation complexity grows you can add more nodes to split the work. Of course just one machine will not provide fault tolerance/redundancy plus most likely the performance will be the same if not worst than just running scripts (bash+find+awk+grep+cut+sort+uniq for example) from command line so it would be impractical to say the least. On the other side if you are considering Hadoop it is because your current data processing is taking too much time and you want it to scale horizontally.

While a NameNode and a JobTracker service (Master) can run in the same machine you will find that as you need more nodes to process bigger data the two services will need to be separated and as you need at least two machines as DataNodes/TaskTrackers (Slaves) to have failover and redundancy then at a minimum Hadoop typical production installation will start with three or four separated machines: Each DataNode machine will also be a TaskTracker machine, this is to ensure the map or reduce jobs happen in the same local file system. Of course with only two nodes to store data and perform the jobs you end up with a replication factor of 2 (dfs.replication). A three way replication is more reliable and the defacto used by big players like Google in their GFS. So that will make your minimum four or five machines.

If the DN/TT machines are virtualized VMs it is recommended to have at least the first dfs.replication machines in different hosts. The same applies for storage, their disks should be in different hardware. In fact Hadoop works better with Just One Bunch Of Disks (JBOD) rather than RAID or SAN storage. Remember, Hadoop concept is to use commodity machines which means machines that are shipped with common available components (not necessarily unreliable but definitely cheaper than high end hardware) but if those are VMs in the same host you will probably end up having a limitation that you could avoid distributing in different physical machine.

VMWARE claims with their new open sourced project named serengeti to resolve the single point of failure problems inherent to Hadoop (read below) and at the same time claims ESXi-VM-deployed-Hadoop has performance which is comparable with that of native (physical) deployments. Local disks against traditional SAN will be needed in any case so the discussions on the topic will continue I guess for a while. One thing for sure is that the competition is furious and Hadoop Appliances, optimized servers and custom built rackmount servers configured as Hadoop appliances will be considered as well by CTOs.

Take a look at this image gallery for an idea of how big Hadoop clusters can be. Clearly configuration here plays an important role and tweaking without documenting would be a big mistake but if you can configure from POB with Remote-IT then you get both, the documentation and the automated configuration at the same time.

Backup considerations

Hadoop is so far a single point of failure system. If the NameNode goes down then you will need to try to restart it, create a new one from its latest image which will hopefully be still available in the crashed NameNode or will need to be pulled from the "Checkpoint Node" or from a "Backup Node.

Rebalancing maintenance tasks could be needed depending on how uniformly data is placed across the DataNode. Rack awareness is important as all DataNodes involved in a job should ideally be collocated in the same rack because of network traffic, however replicas should live in separated racks for proper redundancy, it is important then to tell hadoop which rack the node is. There might be safemode conditions that should be understood and manual action could be required. Tools like fsck for hadoop will need to be run to check for the health of the cluster and then manually take actions if required. Upgrade and rollback are important procedures that must be considered up front to avoid losing data. This means you will probably have 5 initial machines (or 6 if you decide to go with three-way replication (as I already mentioned)

Some details of Hadoop

Hadoop offers a programming paradigm to resolve tasks faster through a Distributed File System (HDFS) and MapReduce jobs.

HDFS uses NameNode (Master node which stores the files metadata) to find block locations (dir, mkdir) and Datanodes (Slaves nodes that store the real data blocks) to write and retrieve the data from the specific blocks (cat, get, put…).

MapReduce pattern allows any data processing problem to be divided in a mapping step where just a key and a value is pooled from the whole data set and a second step which later reduces the result (where data gets aggregated to provide a total value like an average through grouping for example). It uses a JobTracker (Master node in charge of scheduling, rescheduling and keeping track of all map reduce jobs running in all nodes of the cluster) and TaskTrackers (Slaves running the map reduce jobs and reporting back progress to the JobTracker).

An HDFS master node will run a NameNode daemon and will control slaves (Configured in $HADOOP_HOME/conf/slaves which hosts a line per slave node) running DataNode daemons. In addition a MapRed master node will run JobTracker daemon and will control slaves (configured in $HADOOP_HOME/conf/slaves as well) running TaskTracker daemons.

Some important concepts that support the work of HDFS and MapReduce are:
  1. Combiner: Optionally if you can provide some aggregation in the same node on a Map result a combiner allows to free network resources as less data gets transferred to the Reduce nodes. Clearly if you are just getting a total as a summation you can save time summing up within the Map step to save network resources later when applying a Reduce step.
  2. Distributed Data Flow Engine: Supporting the execution of the MapReduce algorithm across different machines through the use of input splits (dividing the input data in chunks)
  3. Partition: Reducing through different Reducers which are in charge of applying the reduce algorithm to a particular set of keys.
A typical Hadoop project needs two main different concerns: A person wearing the software engineer hat must write two functions (a Map and a Reduce Function) and another wearing the systems engineer hat must offer a production environment optimized for handling the totality of the data within certain time constraints.

Typically Hadoop nodes will have 4 to 12 drives per machine, Non-RAID configuration and the disks themselves will be commodity hardware, no need to use top-notch high performance disks.

A sample problem

Imagine you have to combine the authorization logs coming from different servers and respond two questions:
  1. List how many failed login attempts we have per user and type (wrong password, wrong username for example)
  2. What exact day, hour and minute we had the most bad-user-login requests (potential unknown-user brute force attack)
To solve this problem we will need to learn how to install hadoop, how to write map and reduce jobs, how to run a hadoop job and how to retrieve the results.

Installing Hadoop

The Hadoop documentation is pretty straightforward (Look always for the specific version of hadoop you are trying to install, in my case for example I read We should try to automate the installation using POB recipes though.

I have shared a project to aid on automatically installing hadoop in standalone, pseudo-distributed and fully distributed modes. You just need to look into and to use custom configuration. I have tried to make it as generic as possible so it can be used by the community. I have open sourced it with the hope that others will make it better while forking of course.


By default you get a Standalone Operation Mode configuration (for development purposes) but for Production you will need a Fully Distributed Mode Configuration. A Pseudo Distributed Mode Configuration is an alternative in the middle, while it is not suitable for production it simulates real production.


Use this mode to rapidly test MapReduce jobs. There is nothing to configure. As per the original documentation here is how you can test Hadoop running a MapReduce example that will search a regex in several XML files:
mkdir -p ~/hadoop-tests/input
cd ~/hadoop-tests/
cp $HADOOP_HOME/conf/*.xml input
hadoop jar $HADOOP_HOME/hadoop-examples*.jar grep input output 'dfs[a-z.]+' 

Pseudo Distributed

As a precondition we need to authorize our public key in our own host. If "ssh localhost" does not login without prompting for password then install the ssh public key using the recipe below the hadoop directory. It should be included anyway in the host recipe for the box where you are configuring hadoop in this mode.

After installation you should be able to access NameNode from http://localhost:50070/ and JobTracker from http://localhost:50030/

To run now the same example we run for the Standalone installation we will need to have the input folder in HDFS (instead of the local file system):
hadoop fs -mkdir input
hadoop fs -put $HADOOP_HOME/conf/*.xml input
hadoop jar $HADOOP_HOME/hadoop-examples*.jar grep input output 'dfs[a-z.]+' 
The result will be exactly the same you got from the standalone installation. You can inspect now the input (/user/hadoop/input) and output (/user/hadoop/output) directory using "Browse the filesystem" link from http://localhost:50070 or from command line:
hadoop fs -cat output/part-00000
Hadoop uses 4 daemons you can start separately using the commands below: start namenode start jobtracker start tasktracker start datanode
Or as we saw in the recipe you can just start them all:
To stop use just 'stop instead of 'start'
Troubleshooting dev environment
I found myself running into lot of issues related to corrupted file system. Look at the page http://localhost:50070/dfshealth.jsp and make sure you have "DFS Remaining", if you do not, then try freeing DHFS space or just recreate the file system. As this is a development environment that option is reasonable of course. Notice an alternative to below. I had to use this approach because I found hadoop java processes running even after running the stop all command:
ps -ef|grep hadoop|grep -v grep|awk '{print $2}'|xargs kill
rm -fr /tmp/hadoop-`whoami`/
hadoop namenode -format 

Fully Distributed

There is a lot that can be tweaked in configuration files but the pob-recipes project I have shared is the bare minimum to run NameNode+JobTracker (Master) in one machine and DataNode+TaskTracker (Workers) in a couple of other machines. My you will notice uses 3 Ubuntu VMs (hadoop-master, hadoop-slave1 and hadoop-slave2) but you can easily separate hadoop-master into hadoop-namenode and hadoop-jobtracker to get the 4 minimum production machines I was referring to. The only reason for me choosing just one master machine is my current notebook memory resources.

Make no mistake, developing a Hadoop Job is not as big deal as setting correctly your cluster. The cluster administrator must keep an eye on hints, tips and tricks but also must monitor the cluster to make sure it performs the best possible way and NameNode is backed up correctly (and the restore tested of course). We must ensure there is no single point of failure and in the case of an undesired event the cluster can be back up and running ASAP.

The hadoop POB recipes should allow you to configure your details in hadoop/ (1 hadoop master and two hadoop slaves by default). You just need to run the slaves and master recipes in that order and you should get the below responses (between bunch of other feedback). For the master:
11756 JobTracker
8834 NameNode
For the slaves:
6333 DataNode
9712 TaskTracker
Clearly the cluster is running with the four expected daemons. Running a job in your cluster is as easy as already explained in the pseudo distributed section. Run the job in master while inspecting the log file in both slaves, for example:
tail -f /usr/local/hadoop/logs/hadoop-hadoop-datanode-hadoop-slave2.log

Pointing your browser to http://hadoop-master:50070 should show you two Live Nodes and the amount of disk available (DFS Remaining).

Here are the default web Administrative/Informative URLs to get information about your hadoop cluster:
http://hadoop-master:50070/ – NameNode (HDFS Information)
http://hadoop-master:50030/ – JobTracker (Map Reduce Administration)
http://hadoop-slave1:50060/ – TaskTracker (Task Tracker status)

Problem Solution

Our authentication auditing exercise can be for sure solved with some unix power tools run from command line as I show below however the power of Hadoop will allow the tasks to run faster for big files coming from different servers in addition to ensure the data is kept saved thanks to the HDFS redundancy. So from command line:
root@hadoop-master:~# cat /var/log/auth.log | grep "Failed password" | awk '{if($9 == "invalid") print $1"-"$2"-"substr($3,1,5)" "$11" "$9; else print $1"-"$2"-"substr($3,1,5)" "$9" WrongPassword"}'
Jul-17-15:42 hadoop WrongPassword
Jul-17-15:45 hadoop WrongPassword
Jul-17-15:46 hadoop WrongPassword
Jul-17-17:14 hadoop WrongPassword
Jul-17-17:32 hadoop WrongPassword
Jul-17-17:33 hadoop WrongPassword
Jul-17-17:47 hadoop WrongPassword
Jul-18-03:57 hadoop WrongPassword
Jul-19-19:43 nestor invalid
Jul-19-19:44 root WrongPassword
Jul-19-20:29 attacker invalid
Jul-19-20:29 attacker invalid
Jul-19-20:29 attacker invalid
Jul-19-20:29 attacker invalid
Hadoop by default will use a tab character (\t) as deliniter for key value pairs. We will cheat for this exercise and instead we will generate from the mapper just lines of fields separated by spaces, hence hadoop will see the lines as keys with null values.

Let us then put in a shell script our map code to extract the fields we are interested in:
root@hadoop-master:~# vi
grep "Failed password" | awk '{if($9 == "invalid") print $1"-"$2"-"substr($3,1,5)" "$11" "$9; else print $1"-"$2"-"substr($3,1,5)" "$9" WrongPassword"}'
root@hadoop-master:~# chmod +x 
root@hadoop-master:~# cat /var/log/auth.log | ./
root@hadoop-master:~# cat /var/log/auth.log | ./mapper.shtime, user failure
Jul-17-15:42 hadoop WrongPassword
Jul-17-15:45 hadoop WrongPassword
Jul-17-15:46 hadoop WrongPassword
Jul-17-17:14 hadoop WrongPassword
Jul-17-17:32 hadoop WrongPassword
Jul-17-17:33 hadoop WrongPassword
Jul-17-17:47 hadoop WrongPassword
Jul-18-03:57 hadoop WrongPassword
Jul-19-19:43 nestor invalid
Jul-19-19:44 root WrongPassword
Jul-19-20:29 attacker invalid
Jul-19-20:29 attacker invalid
Jul-19-20:29 attacker invalid
Jul-19-20:29 attacker invalid
Now with a reduce function we can obtain the number of failed attempts per user and failure:
root@hadoop-master:~# cat /var/log/auth.log | ./ | awk 'BEGIN{print "UserFailure count"} {userFailures[$2"-"$3]++}END{for (userFailure in userFailures) printf("%s %i\n", userFailure, userFailures[userFailure])}'
UserFailure count
root-WrongPassword 1
hadoop-WrongPassword 8
attacker-invalid 4
nestor-invalid 1
root@hadoop-master:~# vi
awk 'BEGIN{print "UserFailure count"} NR!=1 {userFailures[$2"-"$3]++}END{for (userFailure in userFailures) printf("%s %i\n", userFailure, userFailures[userFailure])}'
root@hadoop-master:~# chmod +x 
root@hadoop-master:~# cat /var/log/auth.log | ./ | ./ 
UserFailure count
root-WrongPassword 1
hadoop-WrongPassword 8
attacker-invalid 4
nestor-invalid 1
Our second exercise can be solved with another reduce example (to detect during which period of time we had the most attack attempts):
root@hadoop-master:~# cat /var/log/auth.log | ./ | awk '{if($3 == "invalid") invalids[$1]++}END{for (invalid in invalids) print invalids[invalid]" "invalid}'| sort -n -r | head -1
4 Jul-19-20:29
root@hadoop-master:~# vi
awk 'NR!=1{if($3 == "invalid") invalids[$1]++}END{for (invalid in invalids) print invalids[invalid]" "invalid}'| sort -n -r | head -1
root@hadoop-master:~# chmod +x 
root@hadoop-master:~# cat /var/log/auth.log | ./ | ./ 
4 Jul-19-20:29
Now let's run both map and reduce jobs in a hadoop cluster. The first thing is we need to copy the file in HDFS:
hadoop fs -put auth.log input/authLog/auth.log
Then we need to delete any results in case we are running our jobs again:
hadoop fs -rmr output/authLog
Finally we run our jobs:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming* -file ./ -mapper "" -file ./ -reducer "" -input input/authLog/auth.log -output output/authLog/failedAttemptsPerUserAndFailure
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming* -file ./ -mapper "" -file ./ -reducer "" -input input/authLog/auth.log -output output/authLog/maxAttacksInAMinute
We can get the results as explained before from command line or web interface.

We could sync our log files from different servers and copy them into HDFS later on, then run the jobs to obtain a result across all of our servers but there should be a more direct way.

What about putting the files in HDFS directly from remote machines? To query across multiple server logs this would be ideal. In bash this is easy indeed. Using ssh you can pipe any stream into a command:
echo "Hello" | ssh hadoop@hadoop-master 'while read data; do echo $data from `hostname`; done'
Then use the hadoop fs put command to push the log file from the remote server into the hadoop-master NameNode. You will need first authorize the server key for a particular user using the POB recipe I previously posted like:
ssh-keygen -t rsa -N '' -f /root/.ssh/id_rsa
common/tools/ root hadoop hadoop-master /root/.ssh/ /root/.ssh/id_rsa
Then push logs from that server like in:
cat /var/log/auth.log | ssh hadoop@hadoop-master "hadoop fs -rm input/authLogs/`hostname``date "+%Y%m%d"`; hadoop fs -put - input/authLogs/`hostname``date "+%Y%m%d"`" 
Finally run the jobs:
hadoop fs -rmr output/authLogs/failedAttemptsPerUserAndFailure
hadoop fs -rmr output/authLogs/maxAttacksInAMinute
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming* -file ./ -mapper "" -file ./ -reducer "" -input input/authLogs -output output/authLogs/failedAttemptsPerUserAndFailure
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming* -file ./ -mapper "" -file ./ -reducer "" -input input/authLogs -output output/authLogs/maxAttacksInAMinute
POB has allowed us to automate hadoop installations and to build and run remote hadoop jobs that can be used to improve our Security Intelligence, an aspect that directly affects Business and so indirectly becomes part of Business Intelligence as well. Not to mention Hadoop power to aid directly on Business Intelligence for example while analyzing Server Application logs and identifying user trends.

No comments: