# Map Reduce – tf-idf

tf-idf is the approach of determine relevant documents by the count of words they contain. While this would emphasis common words like ‘the’, tf-idf takes for each word it’s ratio of the overall appearence in a set of documents – the inverse-doucment-frequence. Here I’ll try to give a simple MapReduce implemention. As a little quirk Avro will be used to model the representation of a document. We are going to need secondary sorting to reach an effective implementation.

## tf-idf

tf-idf can be calculated with tfidf(t,d,D) = tf(t,d) * idf(t, D) where tf denotes the term frequency and idf the inverse frequency. tf can be calculated in different ways. It can be calculated using a boolean or logarithmic scaled frequency. Also a normalized form where you divide by the maximum of frequency found by any word in the documents is quite often used. Refer to the Wikipedia article linked in this post for further details. For simplicity in this example tf is simply the count of a term in a document denoted as tf(t,d) = f(t,d) .

idf is the inverse document frequency of a term. For example if your corpus would consist of 3 documents with each of them containing the word ‘such‘ than the inverse document frequency would be 0, as it is calculated by log(N/|Dt|) N: Total number of documents |Dt|: number of documents containing t which would be in our case log(3/3) = 0. The inverse frequency therefor tends to filter out very frequency terms with little relevance.

## A MapReduce Implementation

To implement tf-idf using MapReduce we will divide it into 2 steps. In a first step we are going to count the documents and in the next we will count the terms in each document and then calculate the tf-idf as with both we have enough information to do so. So the general JobConfig consists of two jobs countTotalDocuments and calculateTfIdf.

The input for your calculation will be a collection of documents modeled in Avro with the following simple schema:

```{
"namespace": "de.microlution.mr.model",
"type":"record",
"name":"Document",
"fields":[
{"name":"id","type":["string","null"],"columnName":"id","sqlType":"12"},
{"name":"author","type":["string","null"],"columnName":"author","sqlType":"12"},
{"name":"title","type":["string","null"],"columnName":"title","sqlType":"12"},
{"name":"text","type":["string","null"],"columnName":"text","sqlType":"12"}
]}```

From this input the tf-idf for terms in title and text will be calculated and writen into a text file.

To count the documents for our calculation we’ll use Hadoop Counters and read the result from our job runner. We only need a map phase and deactivate reducers for that purpose job.setNumReduceTasks(0); .

```package de.microlution.mr;

import java.io.IOException;

public class DocumentCountMapper extends Mapper<AvroKey<Document>, NullWritable, Text, NullWritable> {

@Override
public void map(AvroKey<Document> key, NullWritable value, Context context) throws IOException, InterruptedException {

public enum Count {
TOTAL_DOCUMENTS
}
context.getCounter(Count.TOTAL_DOCUMENTS).increment(1);
}
}```

This phase seems almost like a waste, as we are only counting the documents. But we need that number and as a consequence this step can only be omitted if we know the count of documents already. Our job runner can read the result from the job config:

```return job.getCounters()
.findCounter(DocumentCountReducer.Count.TOTAL_DOCUMENTS)
.getValue();```

## tf-idf with Hadoop secondary sorting

The next step will already be calculating the tf-idf score for each term in a document. The overall idea of using secondary sorting for this is to have the values in the reduce phase grouped and sorted by the document id and the term itself. Like this:

```doc1:a
doc1:a
doc1:b
doc1:c
doc2:a
doc2:a
doc2:b```

Here we have to make sure that each word gets partitioned to the same reducer and at the reducer we have to assure that the terms are sorted and grouped by the document id. If we would not use secondary sorting we would either partition and sort by term or document id. But that would be undesirable in our case as we would like to count each word by document. A simple and maybe valid workaround would be to use the concatenation of document key and term as the reduce key like doc1_term this might work but brings it’s own complexity as we would have to split and/or escape by _ for example.

To make this work we would have to define our own key class NgramFreqKey.class. And we will also create our own NgramFreq model. The key and model can later be used by the partitioner and group comparator at the reducer.

```package de.microlution.mr.model;
import *;

public class NgramFreqKey implements WritableComparable<NgramFreqKey> {

private Text ngram;
private Text clusterId;

public NgramFreqKey(){
set(new Text(), new Text());
}

public NgramFreqKey (Text ngram, Text clusterId) {
set(ngram, clusterId);
}

public NgramFreqKey (String ngram, String clusterId) {
set(new Text(ngram), new Text(clusterId));
}

public void set(Text ngram, Text clusterid){
this.ngram = ngram;
this.clusterId = clusterid;
}

@Override
public void readFields(DataInput in) throws IOException {
}

@Override
public void write(DataOutput out) throws IOException {
ngram.write(out);
clusterId.write(out);
}

public Text getNgram() {
return ngram;
}

public Text getClusterId() {
return clusterId;
}

@Override
public int compareTo(NgramFreqKey o) {
if (ngram.compareTo(o.getNgram()) != 0) {
return ngram.compareTo(o.getNgram());
} else {
return clusterId.compareTo(o.getClusterId());
}
}

}```

```package de.microlution.mr.model;
import *:

public class NgramFreq implements Writable{

private Text ngram;
private Text clusterId;
private IntWritable count;

public NgramFreq(){
set(new Text(), new Text(), new IntWritable());
}

public NgramFreq(Text ngram, Text clusterId, IntWritable count) {
set(ngram, clusterId, count);
}

public NgramFreq(String ngram, String clusterId, int count) {
set(new Text(ngram), new Text(clusterId), new IntWritable(count));
}

public void set(Text ngram, Text clusterid, IntWritable count){
this.ngram = ngram;
this.clusterId = clusterid;
this.count = count;
}

@Override
public void readFields(DataInput in) throws IOException {
}

@Override
public void write(DataOutput out) throws IOException {
ngram.write(out);
clusterId.write(out);
count.write(out);
}

public Text getNgram() {
return ngram;
}

public Text getClusterId() {
return clusterId;
}

public IntWritable getCount(){
return count;
}
}```

In cases where you need to control the way Hadoop partitions and sorts at reduce level it offers the possibility of using a custom partitioning and group comparator. We are going to use this in our last step to calculate tf-idf.

Our partitioner will make sure that we partition by the term itself only and not by the document id contained in the key. By this we achieve a fairly good distribution and the possibility to count the occurrence of a term at the reducer.

```package de.microlution.mr;

public class NgramFreqKeyPartitioner extends Partitioner<NgramFreqKey, NgramFreq>{

@Override
public int getPartition(NgramFreqKey key, NgramFreq value, int numPartitions) {
return (key.getNgram().hashCode() & Integer.MAX_VALUE) % numPartitions;
}

}```

To achieve the sorting at the reducer as of stated above Hadoop allows us to override the grouping and sorting. We write our own group comparator like this:

```package de.microlution.mr;

import java.io.Serializable;

public class NgramFreqKeyGroupComparator extends WritableComparator implements Serializable {

NgramFreqKeyGroupComparator() {
super(NgramFreqKey.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
NgramFreqKey gka = (NgramFreqKey) a;
NgramFreqKey gkb = (NgramFreqKey) b;

return gka.getNgram().compareTo(gkb.getNgram());
}
}```

This guarantees that the reducer receives sorts the values in expected order, first by the document id and then by the term. This gives us the possibility to write a reducer that counts the frequency of each term based on the document in appears in.  Just what we would want for a tf-idf calculation. Here is what our reducer could look like:

```package de.microlution.mr;

import *;

public class TfIDFReducer extends Reducer<NgramFreqKey, NgramFreq, ...> {

public static final String TOTAL_DOCUMENTS = "TOTAL_DOCUMENTS";
private long totalDocuments;

@Override
public void reduce(NgramFreqKey key, Iterable<NgramFreq> values,
Context context) throws IOException, InterruptedException {

String currentNgram = key.getNgram().toString();
String currentDocId = null;

HashMap<String, Long> termFreqs = new HashMap<>();
long documentCount = 0;
long termCount = 0;
long totalFreq = 0;

while(values.iterator().hasNext()) {
NgramFreq ngramFreq = values.iterator().next();
long freq = ((Number)ngramFreq.getCount().get()).longValue();
String docId = ngramFreq.getDocId().toString();

if (!ngramFreq.getNgram().toString().equals(currentNgram)) {
throw new IllegalStateException("current: " + currentNgram
+ "tthis: " + ngramFreq.getNgram().toString());
}

if (currentDocId == null || !currentDocId.equals(docId)) {
if(currentDocId != null){
termFreqs.put(currentDocId, termCount);
}
currentDocId = docId;
documentCount += 1;
termCount = freq;
totalFreq += freq;

} else {
termCount += freq;
totalFreq += freq;
}
}
......
}

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
this.totalDocuments = conf.getLong(TOTAL_DOCUMENTS, -1);
}
}```

The reducer receives the total count from the job configuration and afterwards simply iterates through the emitted terms. In the reducer we can safely assume the order we have implemented by using our custom Group Comparator implementation. For completeness the driver class:

```package de.microlution.mr;

import *;

public class TfIdfDriver extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: TfIdfDriver <input> <output>");
return -1;
}

Path input = new Path(args);
Path output = new Path(args);

long docCount = countTotalDocuments(input, 7);

return calculateTfIdf(input, 7, docCount);
}

public long countTotalDocuments(Path input, int reduceTasks) throws IOException,
InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();

Job job = new Job(conf);
job.setJobName(TfIdfDriver.class.getSimpleName() + ".countTotalDocuments: " + input);
job.setJarByClass(TfIdfDriver .class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

job.setCombinerClass(DocumentCountCombiner.class);

FileInputFormat.setInputPaths(job, input);
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, Review.getClassSchema());
job.setMapperClass(DocumentCountMapper.class);

job.setOutputFormatClass(NullOutputFormat.class);
job.setReducerClass(DocumentCountReducer.class);

boolean succeeded = job.waitForCompletion(true);
if (!succeeded) {
throw new IllegalStateException("Job failed!");
}

return job.getCounters().findCounter(DocumentCountReducer.Count.TOTAL_DOCUMENTS)
.getValue();
}

private int calculateTfIdf(Path input, int reduceTasks, long docCount)
throws IOException, InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();
conf.setLong(TfIdfDriver.TOTAL_DOCUMENTS, docCount);

Job job = new Job(conf);
job.setJobName(TfIdfDriver.class.getSimpleName() + ".calcTfIdf: " + input);
job.setJarByClass(TfIdfDriver.class);

job.setMapOutputKeyClass(NgramFreqKey.class);
job.setMapOutputValueClass(NgramFreq.class);
job.setPartitionerClass(NgramFreqKeyPartitioner.class);
job.setGroupingComparatorClass(NgramFreqKeyGroupComparator.class);

FileInputFormat.setInputPaths(job, input);
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, Document.getClassSchema());
job.setMapperClass(TfIDFMapper.class);

job.setOutput....

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new G2KDriver(), args);
System.exit(res);
}
}```