How to run Hadoop on the Big Data Cluster

Hadoop

What is Hadoop?

Apache Hadoop is a framework for running applications on large clusters built of commodity hardware. The Hadoop framework transparently provides applications for both reliability and data motion. Hadoop implements a computational paradigm named Map/Reduce, where the application is divided into many small fragments of work, each of which may be executed or re-executed on any node in the cluster. In addition, it provides a distributed file system (HDFS) that stores data on the compute nodes, providing very high aggregate bandwidth across the cluster. Both MapReduce and the Hadoop Distributed File System are designed so that node failures are automatically handled by the framework.

This walkthrough assumes the user is familiar with working with the Java programming language. Note: Mapreduce and Hadoop jobs can also be ran using python and other programming languages. See the link below for the comprehensive tutorial on how to run applications with the Hadoop MapReduce framework.

https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

Step 1: Confirm the version of Hadoop running on the cluster.

-bash-4.2$ hadoop version

At the time of documentation, the output shows that Hadoop version 3.0.0 is installed.

Hadoop 3.0.0-cdh6.2.0
Source code repository http://github.com/cloudera/hadoop -r d1dff3d3a126da44e3458bbf148c3bc16ff55bd8
Compiled by jenkins on 2019-03-14T06:39Z
Compiled with protoc 2.5.0
From source with checksum 7fd065792597e9cd1f12e1a7c7a0
This command was run using /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/jars/hadoop-common-3.0.0-cdh6.2.0.jar
-bash-4.2$ 

Step 2: Confirm the version of Java running on the cluster.

-bash-4.2$ javac -version

The output below shows version 1.8.0_181

javac 1.8.0_181
-bash-4.2$ 

Next: Set the HADOOP_CLASSPATH as shown below

-bash-4.2$ export HADOOP_CLASSPATH=$(hadoop classpath)
-bash-4.2$ 

Step 3: Create a directory on HDFS.

hdfs dfs -mkdir hdfs://worker2.hdp-internal:8020/user/YOUR_UMBC_ID/FOLDER/input_folder

Step 4: Move the files to HDFS. Refer to the ‘Accessing files and folders on the Big Data Cluster section‘ for more hdfs command

hdfs dfs -put <files> <hdfs input directory>
hdfs dfs -put file.txt hdfs:///user/YOUR_UMBC_ID/FOLDER/input_folder/

Example:

hdfs dfs -put file4_with_text.txt hdfs:///user/oodunsi1/folder_on_hdfs/

Step 5: How to run Hadoop and MapReduce program on the cluster

Example: The “file4_with_text” created here will be used to run a MapReduce wordcount program.

Below is the MapReduce wordcount program in Java

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", false)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

This program is available in the scratch directory called “WordCount2.java”, the next step is to make a copy of the WordCount2.java in /scratch/newfolder directory

cp /scratch/WordCount2.java /scratch/newfolder

Next, create a Class folder using the “mkdir” command

mkdir /scratch/newfolder/classes_folder

After this, the WordCount2 java code with the map-reduce program will be complied as shown below

javac -classpath ${HADOOP_CLASSPATH} -d  /classes_folder /java_file path

Example:

-bash-4.2$ javac -classpath ${HADOOP_CLASSPATH} -d /scratch/newfolder/classes_folder/ /scratch/newfolder/WordCount2.java

To verify this, use the ‘ls’ command to view the content of the classes_folder, it should contain four java class files. Next step is to make a jar file as shown below

jar -cvf JAR_FILE_NAME -C /classes_folder path

Example

-bash-4.2$ jar -cvf wc.jar -C /scratch/newfolder/classes_folder/ .
added manifest
adding: WordCount2$TokenizerMapper$CountersEnum.class(in = 1030) (out= 509)(deflated 50%)
adding: WordCount2$TokenizerMapper.class(in = 4545) (out= 2084)(deflated 54%)
adding: WordCount2$IntSumReducer.class(in = 1742) (out= 742)(deflated 57%)
adding: WordCount2.class(in = 2473) (out= 1344)(deflated 45%)
-bash-4.2$ 

This will produce a .jar file called wc.jar, use the ls command as shown below to verify

-bash-4.2$ ls /scratch/newfolder/
file1  file2  file3  file4_with_text  classes_folder  wc.jar  WordCount2.java
-bash-4.2$ 

Now, the Hadoop job can run as shown below

hadoop jar (jar file name) (className_along_with_packageName) (input file on HDFS) (output folderpath on HDFS)
-bash-4.2$ hadoop jar /scratch/newfolder/wc.jar WordCount2 hdfs:///user/oodunsi1/folder_on_hdfs/file4_with_text hdfs:///user/oodunsi1/output_folde

Output

WARNING: Use "yarn jar" to launch YARN applications.
20/07/20 02:11:43 INFO client.RMProxy: Connecting to ResourceManager at worker2.hdp-internal/10.3.0.2:8032
20/07/20 02:11:44 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/oodunsi1/.staging/job_1587490087127_1420
20/07/20 02:11:44 INFO input.FileInputFormat: Total input files to process : 1
20/07/20 02:11:44 INFO mapreduce.JobSubmitter: number of splits:1
20/07/20 02:11:44 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
20/07/20 02:11:44 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1587490087127_1420
20/07/20 02:11:44 INFO mapreduce.JobSubmitter: Executing with tokens: []
20/07/20 02:11:44 INFO conf.Configuration: resource-types.xml not found
20/07/20 02:11:44 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/07/20 02:11:44 INFO impl.YarnClientImpl: Submitted application application_1587490087127_1420
20/07/20 02:11:44 INFO mapreduce.Job: The url to track the job: http://worker2.hdp-internal:8088/proxy/application_1587490087127_1420/
20/07/20 02:11:44 INFO mapreduce.Job: Running job: job_1587490087127_1420
20/07/20 02:11:50 INFO mapreduce.Job: Job job_1587490087127_1420 running in uber mode : false
20/07/20 02:11:50 INFO mapreduce.Job:  map 0% reduce 0%
20/07/20 02:11:54 INFO mapreduce.Job:  map 100% reduce 0%
20/07/20 02:12:01 INFO mapreduce.Job:  map 100% reduce 50%
20/07/20 02:12:02 INFO mapreduce.Job:  map 100% reduce 72%
20/07/20 02:12:03 INFO mapreduce.Job:  map 100% reduce 100%
20/07/20 02:12:04 INFO mapreduce.Job: Job job_1587490087127_1420 completed successfully
20/07/20 02:12:04 INFO mapreduce.Job: Counters: 55
	File System Counters
		FILE: Number of bytes read=2780
		FILE: Number of bytes written=29910659
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=326
		HDFS: Number of bytes written=64
		HDFS: Number of read operations=673
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=268
		HDFS: Number of bytes read erasure-coded=0
	Job Counters 
		Launched map tasks=1
		Launched reduce tasks=134
		Rack-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=2113
		Total time spent by all reduces in occupied slots (ms)=494964
		Total time spent by all map tasks (ms)=2113
		Total time spent by all reduce tasks (ms)=494964
		Total vcore-milliseconds taken by all map tasks=2113
		Total vcore-milliseconds taken by all reduce tasks=494964
		Total megabyte-milliseconds taken by all map tasks=2163712
		Total megabyte-milliseconds taken by all reduce tasks=506843136
	Map-Reduce Framework
		Map input records=4
		Map output records=36
		Map output bytes=328
		Map output materialized bytes=2244
		Input split bytes=142
		Combine input records=36
		Combine output records=9
		Reduce input groups=9
		Reduce shuffle bytes=2244
		Reduce input records=9
		Reduce output records=9
		Spilled Records=18
		Shuffled Maps =134
		Failed Shuffles=0
		Merged Map outputs=134
		GC time elapsed (ms)=31705
		CPU time spent (ms)=93950
		Physical memory (bytes) snapshot=44972204032
		Virtual memory (bytes) snapshot=365198626816
		Total committed heap usage (bytes)=83797475328
		Peak Map Physical memory (bytes)=622841856
		Peak Map Virtual memory (bytes)=2696945664
		Peak Reduce Physical memory (bytes)=351854592
		Peak Reduce Virtual memory (bytes)=2711781376
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	WordCount2$TokenizerMapper$CountersEnum
		INPUT_WORDS=36
	File Input Format Counters 
		Bytes Read=184
	File Output Format Counters 
		Bytes Written=64
-bash-4.2$

Below is the result of the word count program on the file4_with_text

-bash-4.2$ hdfs dfs -cat output_folde/*
over	4
the	4
brown	4
lazy	4
fox	4
Quick	4
The	4
dogs	4
jumped	4
-bash-4.2$ 

To view the submitted Hadoop jobs running on the cluster, visit the Web UI for Hadoop here.