Page Banner

Using Counters in MapReduce to Track Bad Records

The MapReduce framework provides Counters as an efficient mechanism for tracking the occurrences of global events within the map and reduces the phases of jobs. For example, a typical MapReduce job will kick off several mapper instances, one for each block of the input data, all running the same code. These instances are part of the same job, but run independent of one another. Counters allow a developer to track aggregated events from all of those separate instances. A more concrete use of Counters can be found in the MapReduce framework itself. Each MapReduce job defines several standard Counters. The output of these Counters can be found in the job details of the Job Tracker web UI.

[table]

[tr][th]Counter[/th] [th]Map[/th] [th]Reduce[/th] [th]Total[/th][/tr]

[tr][td]Map input records[/td] [td]71,228,085,554[/td] [td]0[/td] [td]71,228,085,554[/td][/tr]

[tr][td]Reduce shuffle bytes[/td] [td]0[/td] [td]5,419,119,866,175[/td] [td]5,419,119,866,175[/td][/tr]

[tr][td]Spilled Records[/td] [td]142,174,003,132[/td] [td]55,141,286,320[/td] [td]197,315,289,452[/td][/tr]

[tr][td]Map output bytes[/td] [td]5,513,542,463,958[/td] [td]0[/td] [td]5,513,542,463,958[/td][/tr]

[tr][td]CPU_MILLISECONDS[/td] [td]2,658,192,290[/td] [td]2,352,883,960[/td] [td]5,011,076,250[/td][/tr]

[tr][td]Combine input records[/td] [td]139,937,676,997[/td] [td]69,478,386,276[/td] [td]209,416,063,273[/td][/tr]

[tr][td]SPLIT_RAW_BYTES[/td] [td]1,999,232[/td] [td]0[/td] [td]1,999,232[/td][/tr]

[tr][td]Reduce input records[/td] [td]0[/td] [td]58,006,566,989[/td] [td]58,006,566,989[/td][/tr]

[tr][td]Reduce input groups[/td] [td]0[/td] [td]3,331,430[/td] [td]3,331,430[/td][/tr]

[tr][td]Combine output records[/td] [td]139,937,676,997[/td] [td]6,478,386,262[/td] [td]209,416,063,259[/td][/tr]

[tr][td]PHYSICAL_MEMORY_BYTES[/td] [td]12,647,158,763,520[/td] [td]196,662,726,656[/td] [td]12,843,821,490,176[/td][/tr]

[tr][td]Reduce output records[/td] [td]0[/td] [td]406,433,728[/td] [td]406,433,728[/td][/tr]

[tr][td]VIRTUAL_MEMORY_BYTES[/td] [td]33,014,737,301,504[/td] [td]273,806,381,056[/td] [td]33,288,543,682,560[/td][/tr]

[tr][td]Map output records[/td] [td]71,228,085,554/td] [td]0[/td] [td]71,228,085,554[/td][/tr]

[tr][td]GC time elapsed (ms)[/td] [td]98,623,849[/td] [td]342,650,729[/td] [td]441,274,578[/td][/tr]

[/table]

The UI shows the Counter group, name, mapper totals, reducer totals, and job totals. Counters should be limited to tracking metadata about the job. The standard Counters are good examples of this. The Map input records counter provides useful information about a particular execution of a job. If Counters did not exist, these kinds of statistics would have to be part of the job’s main output, where they don’t belong; or more likely as part of a secondary output, complicating the logic of the job.

The following recipe is a simple map-only job that filters out bad records and uses a counter to log the number of records that were filtered.

Suppose you have written a MapReduce job that contains the following code:

public class Example {
 private enum COUNTERS {
  GOOD,
  BAD
 }
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setMapperClass(Example.MapTask.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.waitForCompletion(false);
  Counters counters = job.getCounters();
  System.out.printf("Good: %d, Bad: %dn",
      counters.findCounter(COUNTERS.GOOD).getValue(),
      counters.findCounter(COUNTERS.BAD).getValue());
 }
 public static class MapTask
   extends Mapper<LongWritable, Text, Text, NullWritable> {
  private Pattern p =
    Pattern.compile("^(["']?)dd:dd1,(["']?)[A-Z]w+2,.*$");
  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
   if (p.matcher(value.toString()).matches()) {
    context.getCounter(COUNTERS.GOOD).increment(1L);
    context.write(value, NullWritable.get());
   } else {
    context.getCounter(COUNTERS.BAD).increment(1L);
   }
  }
 }
}

Assuming the output directory in HDFS does not exist, what would be the command-line output (ignoring log messages) of this application if the contents of the input.csv were as follows:

9:58,Added,Queue,18762
10:23,Added,Queue,7432e01
10:53,"Removed","Queue","7432e01"
'11:01',"Logged Out",

Undoubtedly, the answer would be Good:2, Bad:2. Lets see how:

  1. The first record is marked bad because the hour is single-digit, and the regular expression requires double-digit hour and minute.
  2. The second record is marked good because the time is in the correct format, the word in the second column starts with a capital and contains only letters, and there are more than two columns.
  3. The third record is marked good because the time is in the correct format, the word in the second column starts with a capital, contains only letters, and is wrapped in matching quotes, and there are more than two columns.
  4. The fourth row is marked bad because the second column contains a space. The regex doesn’t allow spaces, quotes or not.

The false in waitForCompletion() turns off the progress output; it has nothing to do with whether the driver will wait for the job to complete.

Assuming the output directory in HDFS does not exist, what would be the command-line output (ignoring log messages) of this application if the contents of the input.csv were as follows:

10:23,Added,,queue 2,7432e01
'10:28','Added','Queue","56845'
11:01,"Logout"
11:01,X,Y,Z

Again, the answer would be Good:2, Bad:2. Let us check how:

  1. The first record is marked good because the time is in the correct format, the word in the second column starts with a capital and contains only letters, and there are more than two columns.
  2. The second record is marked good because the time is in the correct format and is wrapped in matching quotes, the word in the second column starts with a capital, contains only letters, and is wrapped in matching quotes, and there are more than two columns.
  3. The third record is marked bad because it does not have more than two columns.
  4. The fourth row is marked bad because the second column contains only a single character. The regular expression requires a capital letter followed by at least one additional character.

The false in waitForCompletion() turns off the progress output; it has nothing to do with whether the driver will wait for the job to complete.