Skip to main content

MapReduce: the Second Program

Setting up a Single Node Cluster - install and start Hadoop.

Change user to hadoop:

su - hadoop

Start Hadoop:

sbin/start-dfs.sh

Create a new directory with name MapReduceTutorial:

mkdir MapReduceTutorial

Give permissions:

chmod -R 777 MapReduceTutorial

Create a new directory with name inputMapReduce:

hdfs dfs -mkdir /inputMapReduce

Verify whether a file is actually copied

hdfs dfs -ls /inputMapReduce

Use copyFromLocal command to copy SalesJan2009.csv to HDFS /inputMapReduce directory.

hdfs dfs -copyFromLocal /home/hadoop/MapReduceTutorial/SalesJan2009.csv /inputMapReduce

Verify:

hdfs dfs -ls /inputMapReduce

Output:


-rw-r--r-- 1 hadoop supergroup 123637 2025-03-05 18:35 /inputMapReduce/SalesJan2009.csv

Java Application

download

Program Structure

├── dependency-reduced-pom.xml
├── pom.xml
├── SalesJan2009.csv
└── src
├── main
│   ├── java
│   │   └── SalesCountry
│   │   ├── SalesCountryDriver.java
│   │   ├── SalesCountryReducer.java
│   │   └── SalesMapper.java
│   └── resources
└── test
└── java

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>mapreduce-first-program</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.4.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>

<build>
<plugins>

<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>

<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>SalesCountry.SalesCountryDriver</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

</build>
</project>

SalesCountryDriver.java


package SalesCountry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class SalesCountryDriver {
public static void main(String[] args) {
JobClient my_client = new JobClient();
// Create a configuration object for the job
JobConf job_conf = new JobConf(SalesCountryDriver.class);

// Set a name of the Job
job_conf.setJobName("SalePerCountry");

// Specify data type of output key and value
job_conf.setOutputKeyClass(Text.class);
job_conf.setOutputValueClass(IntWritable.class);

// Specify names of Mapper and Reducer Class
job_conf.setMapperClass(SalesCountry.SalesMapper.class);
job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);

// Specify formats of the data type of Input and output
job_conf.setInputFormat(TextInputFormat.class);
job_conf.setOutputFormat(TextOutputFormat.class);

// Set input and output directories using command line arguments,
//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.

FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

my_client.setConf(job_conf);
try {
// Run the job
JobClient.runJob(job_conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}

SalesMapper.java

package SalesCountry;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");
output.collect(new Text(SingleCountryData[7]), one);
}
}

SalesCountryReducer.java

package SalesCountry;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
Text key = t_key;
int frequencyForCountry = 0;
while (values.hasNext()) {
// replace type of value with the actual type of our value
IntWritable value = (IntWritable) values.next();
frequencyForCountry += value.get();

}
output.collect(key, new IntWritable(frequencyForCountry));
}
}

Running SalesCountryDriver

mvn clean package

Copy:

/target/mapreduce-first-program-1.0-SNAPSHOT.jar
to the hadoop
/MapReduceTutorial

Verify:

hdfs dfs -ls /inputMapReduce

Run MapReduce job:


hadoop jar /home/hadoop/MapReduceTutorial/mapreduce-first-program-1.0-SNAPSHOT.jar /inputMapReduce /mapreduce_output_sales

Output:


./hadoop jar /home/hadoop/MapReduceTutorial/mapreduce-first-program-1-1.0-SNAPSHOT.jar /inputMapReduce /mapreduce_output_sales
2025-03-05 18:43:07,175 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2025-03-05 18:43:07,215 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2025-03-05 18:43:07,215 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2025-03-05 18:43:07,223 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!

...
...

Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=123637
File Output Format Counters
Bytes Written=661

The result can be seen through command interface as:

hdfs dfs -cat /mapreduce_output_sales/part-00000

Output:

Argentina	1
Australia 38
Austria 7
Bahrain 1
Belgium 8
Bermuda 1
Brazil 5
Bulgaria 1
CO 1
Canada 76
Cayman Isls 1
China 1
Costa Rica 1
Country 1
Czech Republic 3
Denmark 15
Dominican Republic 1
Finland 2
France 27
Germany 25
Greece 1
Guatemala 1
Hong Kong 1
Hungary 3
Iceland 1
India 2
Ireland 49
Israel 1
Italy 15
Japan 2
Jersey 1
Kuwait 1
Latvia 1
Luxembourg 1
Malaysia 1
Malta 2
Mauritius 1
Moldova 1
Monaco 2
Netherlands 22
New Zealand 6
Norway 16
Philippines 2
Poland 2
Romania 1
Russia 1
South Africa 5
South Korea 1
Spain 12
Sweden 13
Switzerland 36
Thailand 2
The Bahamas 2
Turkey 6
Ukraine 1
United Arab Emirates 6
United Kingdom 100
United States 462

Results can also be seen via a web interface

browse-directory-01.png