How to Get Hadoop Data into a Python Model

big data

Hadoop is an open-source software framework for distributed storage and distributed processing of very large data sets. All the modules in Hadoop are designed with an assumption that hardware failures should be automatically handled by the framework.
The core of Apache Hadoop is a storage component, known as Hadoop Distributed File System (HDFS), and a processing component called MapReduce. Hadoop splits files into large blocks so that they can then be distributed across nodes in a cluster. By distributing the files across many nodes, processing times are significantly improved, because no single node has to handle a large file.
In this article, we’ll walk through the process of integrating Hadoop and Python by moving Hadoop data into a Python program.
 

HDFS And YARN

Let’s start by defining the terms:
 

HDFS

The Hadoop distributed file system (HDFS) is a distributed, scalable, and portable file-system written in Java for the Hadoop framework. It’s the file system supporting Hadoop.
 

YARN

YARN is a resource-management platform responsible for managing computing resources in clusters, and using them for scheduling of a user application. The fundamental objective of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM).
The ResourceManager and the NodeManager form the data computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The NodeManager is the per-machine framework agent responsible for containers, monitoring their resource usage (CPU, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.
The YARN resource manager also offers a web interface:

 

MRJob and Real-World Applications

While Hadoop streaming is a simple way to do map-reduce tasks, it’s complicated to use and not really friendly when things fail, and we have to debug our code. In addition, if we wanted to do a join from two different sets of data, it would be complicated to handle both with a single mapper.
MRJob is a library written and maintained by Yelp that allows us to write MapReduce jobs in Python. It has extensive documentation and allows for serverless application of your code for testing.
Here’s the word count MapReduce, a commonly used example program for demonstrating MapReduce Logic, rewritten using MRJob:


"""The classic MapReduce job which counts the frequency of words."""
from mrjob.job import MRJob
import re
WORD_RE = re.compile(r"[\w']+")
class MRWordFreqCount(MRJob):
	def mapper(self, _, line):
    	for word in WORD_RE.findall(line):
        	yield (word.lower(), 1)
				def reducer(self, word, counts):
    	yield (word, sum(counts))
		if __name__ == '__main__':
 	MRWordFreqCount.run()

What if we wanted to perform a calculation that involves multiple steps? For example, what if we wanted to count the words in documents stored in our database and then find the most common word being used? This would involve the following steps:

  • Map our text to a mapper that outputs pairs of (word, 1)
  • Combine the pairs using the word as key (optional)
  • Reduce the pairs using the word as key
  • Find the word with the maximum count

Here is that logic executed in Python:


from mrjob.job import MRJob
from mrjob.step import MRStep
import re
WORD_RE = re.compile(r"[\w']+")
class MRMostUsedWord(MRJob):
	def steps(self):
    	return [
        	MRStep(mapper=self.mapper_get_words,
               	combiner=self.combiner_count_words,
               	reducer=self.reducer_count_words),
        	MRStep(reducer=self.reducer_find_max_word)
    	]
			def mapper_get_words(self, _, line):
    	# yield each word in the line
    	for word in WORD_RE.findall(line):
        	yield (word.lower(), 1)
				def combiner_count_words(self, word, counts):
    	# optimization: sum the words we've seen so far
    	yield (word, sum(counts))
			def reducer_count_words(self, word, counts):
    	# send all (num_occurrences, word) pairs to the same reducer.
    	# num_occurrences is so we can easily use Python's max() function.
    	yield None, (sum(counts), word)
			# discard the key; it is just None
	def reducer_find_max_word(self, _, word_count_pairs):
    	# each item of word_count_pairs is (count, word),
    	# so yielding one results in key=counts, value=word
    	yield max(word_count_pairs)
		
if __name__ == '__main__':
	MRMostUsedWord.run()

Seeing how powerful MrJob can be, let’s write a class that returns the top 15 most frequent words in our text:


from mrjob.job import MRJob
from mrjob.step import MRStep
import re
from heapq import nlargest
WORD_RE = re.compile(r"[\w']+")
class MRMostUsedWords(MRJob):
	def mapper_get_words(self, _, line):
    	# yield each word in the line
    	for word in WORD_RE.findall(line):
        	yield (word.lower(), 1)
				def combiner_count_words(self, word, counts):
    	# sum the words we've seen so far
    	yield (word, sum(counts))
			def reducer_count_words(self, word, counts):
    	# send all (num_occurrences, word) pairs to the same reducer.
    	# num_occurrences is so we can easily use Python's max() function.
    	yield None, (sum(counts), word)
			# discard the key; it is just None
	def reducer_find_top_15_word(self, _, word_count_pairs):
    	# each item of word_count_pairs is (count, word),
    	# so yielding the top 15 results in key=counts, value=word
    	for val in nlargest(15, word_count_pairs)
        	yield val
				def steps(self):
    	return [
        	MRStep(mapper=self.mapper_get_words,
               	combiner=self.combiner_count_words,
               	reducer=self.reducer_count_words),
        	MRStep(reducer=self.reducer_find_top_15_word)
    	]
		
if __name__ == '__main__':
	MRMostUsedWords.run()

Hadoop and MRJob have plenty of versatility to help you answer numerous questions in your dev environment. There are other mappers you can use, but MRJob’s documentation, logging capacity, and ability to function without converting your Python code make it an ideal option. See the documentation below for inspiration on how you can implement the two into your data science arsenal.
MrJob Documentation
MrJob Examples
Hadoop
Hadoop Command Line
YARN


For a quick way to start using Python, download ActivePython. It includes a number of useful libraries and packages for accessing Hadoop data from your Python application.
Download ActivePython

Recent Posts

Tech Debt Best Practices: Minimizing Opportunity Cost & Security Risk

Tech debt is an unavoidable consequence of modern application development, leading to security and performance concerns as older open-source codebases become more vulnerable and outdated. Unfortunately, the opportunity cost of an upgrade often means organizations are left to manage growing risk the best they can. But it doesn’t have to be this way.

Read More
Scroll to Top