Hadoop
What is Hadoop?
Apache Hadoop is a framework for running applications on large clusters built of commodity hardware. The Hadoop framework transparently provides applications for both reliability and data motion. Hadoop implements a computational paradigm named Map/Reduce, where the application is divided into many small fragments of work, each of which may be executed or re-executed on any node in the cluster. In addition, it provides a distributed file system (HDFS) that stores data on the compute nodes, providing very high aggregate bandwidth across the cluster. Both MapReduce and the Hadoop Distributed File System are designed so that node failures are automatically handled by the framework.
This walkthrough assumes the user is familiar with working with the Java programming language. Note: Mapreduce and Hadoop jobs can also be ran using python and other programming languages. See the link below for the comprehensive tutorial on how to run applications with the Hadoop MapReduce framework.
Step 1: Confirm the version of Hadoop running on the cluster.
-bash-4.2$ hadoop version
At the time of documentation, the output shows that Hadoop version 3.0.0 is installed.
Hadoop 3.0.0-cdh6.2.0 Source code repository http://github.com/cloudera/hadoop -r d1dff3d3a126da44e3458bbf148c3bc16ff55bd8 Compiled by jenkins on 2019-03-14T06:39Z Compiled with protoc 2.5.0 From source with checksum 7fd065792597e9cd1f12e1a7c7a0 This command was run using /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/jars/hadoop-common-3.0.0-cdh6.2.0.jar -bash-4.2$
Step 2: Confirm the version of Java running on the cluster.
-bash-4.2$ javac -version
The output below shows version 1.8.0_181
javac 1.8.0_181 -bash-4.2$
Next: Set the HADOOP_CLASSPATH as shown below
-bash-4.2$ export HADOOP_CLASSPATH=$(hadoop classpath) -bash-4.2$
Step 3: Create a directory on HDFS.
hdfs dfs -mkdir hdfs://worker2.hdp-internal:8020/user/YOUR_UMBC_ID/FOLDER/input_folder
Step 4: Move the files to HDFS. Refer to the ‘Accessing files and folders on the Big Data Cluster section‘ for more hdfs command
hdfs dfs -put <files> <hdfs input directory>
hdfs dfs -put file.txt hdfs:///user/YOUR_UMBC_ID/FOLDER/input_folder/
Example:
hdfs dfs -put file4_with_text.txt hdfs:///user/oodunsi1/folder_on_hdfs/
Step 5: How to run Hadoop and MapReduce program on the cluster
Example: The “file4_with_text” created here will be used to run a MapReduce wordcount program.
Below is the MapReduce wordcount program in Java
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; public class WordCount2 { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ static enum CountersEnum { INPUT_WORDS } private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean caseSensitive; private Set<String> patternsToSkip = new HashSet<String>(); private Configuration conf; private BufferedReader fis; @Override public void setup(Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); caseSensitive = conf.getBoolean("wordcount.case.sensitive", true); if (conf.getBoolean("wordcount.skip.patterns", false)) { URI[] patternsURIs = Job.getInstance(conf).getCacheFiles(); for (URI patternsURI : patternsURIs) { Path patternsPath = new Path(patternsURI.getPath()); String patternsFileName = patternsPath.getName().toString(); parseSkipFile(patternsFileName); } } } private void parseSkipFile(String fileName) { try { fis = new BufferedReader(new FileReader(fileName)); String pattern = null; while ((pattern = fis.readLine()) != null) { patternsToSkip.add(pattern); } } catch (IOException ioe) { System.err.println("Caught exception while parsing the cached file '" + StringUtils.stringifyException(ioe)); } } @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase(); for (String pattern : patternsToSkip) { line = line.replaceAll(pattern, ""); } StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); Counter counter = context.getCounter(CountersEnum.class.getName(), CountersEnum.INPUT_WORDS.toString()); counter.increment(1); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) { System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount2.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); List<String> otherArgs = new ArrayList<String>(); for (int i=0; i < remainingArgs.length; ++i) { if ("-skip".equals(remainingArgs[i])) { job.addCacheFile(new Path(remainingArgs[++i]).toUri()); job.getConfiguration().setBoolean("wordcount.skip.patterns", true); } else { otherArgs.add(remainingArgs[i]); } } FileInputFormat.addInputPath(job, new Path(otherArgs.get(0))); FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1))); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
This program is available in the scratch directory called “WordCount2.java”, the next step is to make a copy of the WordCount2.java in /scratch/newfolder directory
cp /scratch/WordCount2.java /scratch/newfolder
Next, create a Class folder using the “mkdir” command
mkdir /scratch/newfolder/classes_folder
After this, the WordCount2 java code with the map-reduce program will be complied as shown below
javac -classpath ${HADOOP_CLASSPATH} -d /classes_folder /java_file path
Example:
-bash-4.2$ javac -classpath ${HADOOP_CLASSPATH} -d /scratch/newfolder/classes_folder/ /scratch/newfolder/WordCount2.java
To verify this, use the ‘ls’ command to view the content of the classes_folder, it should contain four java class files. Next step is to make a jar file as shown below
jar -cvf JAR_FILE_NAME -C /classes_folder path
Example
-bash-4.2$ jar -cvf wc.jar -C /scratch/newfolder/classes_folder/ . added manifest adding: WordCount2$TokenizerMapper$CountersEnum.class(in = 1030) (out= 509)(deflated 50%) adding: WordCount2$TokenizerMapper.class(in = 4545) (out= 2084)(deflated 54%) adding: WordCount2$IntSumReducer.class(in = 1742) (out= 742)(deflated 57%) adding: WordCount2.class(in = 2473) (out= 1344)(deflated 45%) -bash-4.2$
This will produce a .jar file called wc.jar, use the ls command as shown below to verify
-bash-4.2$ ls /scratch/newfolder/
file1 file2 file3 file4_with_text classes_folder wc.jar WordCount2.java
-bash-4.2$
Now, the Hadoop job can run as shown below
hadoop jar (jar file name) (className_along_with_packageName) (input file on HDFS) (output folderpath on HDFS)
-bash-4.2$ hadoop jar /scratch/newfolder/wc.jar WordCount2 hdfs:///user/oodunsi1/folder_on_hdfs/file4_with_text hdfs:///user/oodunsi1/output_folde
Output
WARNING: Use "yarn jar" to launch YARN applications.
20/07/20 02:11:43 INFO client.RMProxy: Connecting to ResourceManager at worker2.hdp-internal/10.3.0.2:8032
20/07/20 02:11:44 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/oodunsi1/.staging/job_1587490087127_1420
20/07/20 02:11:44 INFO input.FileInputFormat: Total input files to process : 1
20/07/20 02:11:44 INFO mapreduce.JobSubmitter: number of splits:1
20/07/20 02:11:44 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
20/07/20 02:11:44 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1587490087127_1420
20/07/20 02:11:44 INFO mapreduce.JobSubmitter: Executing with tokens: []
20/07/20 02:11:44 INFO conf.Configuration: resource-types.xml not found
20/07/20 02:11:44 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/07/20 02:11:44 INFO impl.YarnClientImpl: Submitted application application_1587490087127_1420
20/07/20 02:11:44 INFO mapreduce.Job: The url to track the job: http://worker2.hdp-internal:8088/proxy/application_1587490087127_1420/
20/07/20 02:11:44 INFO mapreduce.Job: Running job: job_1587490087127_1420
20/07/20 02:11:50 INFO mapreduce.Job: Job job_1587490087127_1420 running in uber mode : false
20/07/20 02:11:50 INFO mapreduce.Job: map 0% reduce 0%
20/07/20 02:11:54 INFO mapreduce.Job: map 100% reduce 0%
20/07/20 02:12:01 INFO mapreduce.Job: map 100% reduce 50%
20/07/20 02:12:02 INFO mapreduce.Job: map 100% reduce 72%
20/07/20 02:12:03 INFO mapreduce.Job: map 100% reduce 100%
20/07/20 02:12:04 INFO mapreduce.Job: Job job_1587490087127_1420 completed successfully
20/07/20 02:12:04 INFO mapreduce.Job: Counters: 55
File System Counters
FILE: Number of bytes read=2780
FILE: Number of bytes written=29910659
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=326
HDFS: Number of bytes written=64
HDFS: Number of read operations=673
HDFS: Number of large read operations=0
HDFS: Number of write operations=268
HDFS: Number of bytes read erasure-coded=0
Job Counters
Launched map tasks=1
Launched reduce tasks=134
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2113
Total time spent by all reduces in occupied slots (ms)=494964
Total time spent by all map tasks (ms)=2113
Total time spent by all reduce tasks (ms)=494964
Total vcore-milliseconds taken by all map tasks=2113
Total vcore-milliseconds taken by all reduce tasks=494964
Total megabyte-milliseconds taken by all map tasks=2163712
Total megabyte-milliseconds taken by all reduce tasks=506843136
Map-Reduce Framework
Map input records=4
Map output records=36
Map output bytes=328
Map output materialized bytes=2244
Input split bytes=142
Combine input records=36
Combine output records=9
Reduce input groups=9
Reduce shuffle bytes=2244
Reduce input records=9
Reduce output records=9
Spilled Records=18
Shuffled Maps =134
Failed Shuffles=0
Merged Map outputs=134
GC time elapsed (ms)=31705
CPU time spent (ms)=93950
Physical memory (bytes) snapshot=44972204032
Virtual memory (bytes) snapshot=365198626816
Total committed heap usage (bytes)=83797475328
Peak Map Physical memory (bytes)=622841856
Peak Map Virtual memory (bytes)=2696945664
Peak Reduce Physical memory (bytes)=351854592
Peak Reduce Virtual memory (bytes)=2711781376
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
WordCount2$TokenizerMapper$CountersEnum
INPUT_WORDS=36
File Input Format Counters
Bytes Read=184
File Output Format Counters
Bytes Written=64
-bash-4.2$
Below is the result of the word count program on the file4_with_text
-bash-4.2$ hdfs dfs -cat output_folde/* over 4 the 4 brown 4 lazy 4 fox 4 Quick 4 The 4 dogs 4 jumped 4 -bash-4.2$
To view the submitted Hadoop jobs running on the cluster, visit the Web UI for Hadoop here.