Custom MATLAB InputFormat for Apache Spark

Hadoop supports multiple file formats as input for MapReduce workflows, including programs executed with Apache Spark. Defining custom InputFormats is a common practice among Hadoop Data Engineers and will be discussed here based on publicly available data set.

The approach demonstrated in this post does not provide means for a general MATLABInputFormat for Hadoop. This would require significant effort in finding a general purpose mapping of MATLAB™’s file format and type system to the ones of HDFS.

FDR Data

The data set given here is provided by the NASA and contains multiple so called Flight Data Records (FDR), which are recordings of airplane data collected before, after, and during a flight. The NASA has published a sample data set of multiple flights here. You can use this download script or take this adapted script which tracks download progress.

Flight data are measured by hundreds to thousands of sensors across an aircraft during flights. While regulators like the FAA only define a minimum of parameters that have to be collected at a minimum of  sampling rates, today’s airplanes have the capacity of collecting a much larger amount of data at even higher frequency. The table below compares the number of parameters based on the airplane type:

Neil A. H. Campbell 2007
Aircraft Time into Service FDR Type Number of Parameters
Boeing 707 1958 Analogue 5
Airbus 330 1993 Digital 280
Embraer 170 2004 Digital Combi-recorder 774
Airbus 380 2007 Digital > 1000
Boeing 787 2009 Digital EAFR > 1000

Data recording units, so called FDAU (Flight Data Acquisition Units) are responsible of producing a constant flow of converted data retrieved form  connected sensors. The data is then send to the FDR (Flight Data Recoder).

As data can be measured at different rates the FDAUs output a binary file sequenced in four-second frames. Each frame is further divided into four one-second-subframes of which each can hold 64, 128, 256, or 512 dates of 12 bit values.

fdimu_unit_fdr
http://www.safran-electronics-defense.com/file/download/d1473e_ed48.pdf

MATLAB File Format

The data provided by the NASA is organized into MATLAB™ files that preserves the basic mechanism of a FDR data collection as described above. Each file contains 186 parameters that contain a sequence of data points. For each data point the Rate at which the data was collected, a Description, the Name, Unit, and Alpha value are additionally provided.

VAR_1107:
  Name: char[]
  Alpha: char[]
  Description: char[]
  Rate: double[]
  data: double[]

In the MATLAB™ file format these parameters are represented as multi-dimensional typed arrays. A detailed specification of the MATLAB format can be found here.

MATLAB data type mapping:

0  = mxUNKNOWN_CLASS
1  = mxCELL_CLASS
2  = mxSTRUCT_CLASS
3  = mxOBJECT_CLASS
4  = mxCHAR_CLASS
5  = mxSPARSE_CLASS
6  = mxDOUBLE_CLASS
7  = mxSINGLE_CLASS
8  = mxINT8_CLASS
9  = mxUINT8_CLASS
10 = mxINT16_CLASS
11 = mxUINT16_CLASS
12 = mxINT32_CLASS
13 = mxUINT32_CLASS
14 = mxINT64_CLASS
15 = mxUINT64_CLASS
16 = mxFUNCTION_CLASS
17 = mxOPAQUE_CLASS

In order to be able to read the files in Java, we are using the JMatIO library that is an actively maintained fork of the matfilerw project.

A Custom InputFormat

The motivation behind the creation of a custom InputFormat usually is to provide a common access to files across a HDP platform that can easily be reused. Another motivation could be the optimization of data storage or data access, which is the primary focus of In-/OutputFormats like ORC or Parquet. Understanding InputFormats is an important skill for any Hadoop Data Engineer.

Besides the InputFormat  custom data types to represent a FDR records are created. The FDRWritable is the container for 186 parameters collected from the FDR record file. The FDR data only contains two type of data array, a double and integer array. A parameter will either be represented as a FDRDoubleField or a FDRIntField. Both fields hold the additional value of the data rate, name, description, alpha, and units.

public class FDRDoubleField {
  public ArrayWritable data;
  public Text name;
  public Text description;
  public Text unit;
  public DoubleWritable rate;
  public IntWritable mlType;
  
  public FDRDoubleField(){
    set(new Text(),
        new Text(),
        new Text(),
        new DoubleWritable(),
        new IntWritable(),
        new ArrayWritable(DoubleWritable.class)
    );
  }
  ...
}

Input Splits

Input splits are the fragment of a file that is processed by a Mapper. So the amount of mappers is based on the amount of input splits for a file. The default size of an input split is the same as the overall block size, so most often 64MB or 128MB.

But the input split can also be customized based on a minimum and maximum amount of bytes. Further is it possible to combine input splits with the CombineFileInputFormat InputFomat.

In our case we can not risk reading just a chunk or multiple files. Therefor our InputFormat is marked as not being split able ensuring that each map task is assigned to one complete file.

/**
 * Read whole file. This does not support spliting MatLab files.
 */
@Override
protected boolean isSplitable(JobContext context, Path file) {
    return false;
}

FDR Record Reader

Each InputFormat holds a RecordReader for reading out the split in the right format. Additionally each mapper expects a typed key value pair as input for each map step.

public RecordReader<Text, FDRWritable> createRecordReader(
                                                 InputSplit split,
                                                 TaskAttemptContext context) throws IOException, InterruptedException {
  FDRRecordReader fdrRecordReader = new FDRRecordReader();
  fdrRecordReader.initialize(split, context);
  return fdrRecordReader;
}

The RecordReader here returns Text and the specified FDRWritable key value pair to each mapper.

The record reader receives as input the file split of a complete MATLAB™ file containing FDR records. It parses the content of the file using the JMatIO library and returns a FDRWritable to the mapper. As key the file name is being used.

Testing the InputFormat with Spark

Since Spark can easily be executed in a local running mode on a single machine, we are taking advantage of this by using it inside of our unit test for our FDRInputFormat<Text, FDRWritable>.

We will also need to setup HDFS for in local mode, by not pointing to a NameNode address but instead to the local file system with file:///. The file system in setup during the @Before routine of our unit test:

@Before
public void setUp() throws Exception {
  fileUrl = Thread.currentThread()
                  .getContextClassLoader()
                  .getResource("652200101111519.mat");
  conf = new Configuration();
  conf.set("fs.defaultFS", "file:///");
  conf.set("mapred.input.format.class", FDRInputFormat.class.getName());
        
  fs = FileSystem.get(conf);
}

Setting the Spark execution mode to “local” guaranties the execution as a local thread during our unit test. SparkContext provides an API to access Hadoop files based on a given InputFormat. We use the newAPIHadoopFile in this case to read the sample MATLAB™ file. Using the HDFS configuration from above ensures that we can access the file form the local file system.

@Test
public void testFileFormat() throws Exception{
  SparkConf sparkConf = new SparkConf()
                            .setAppName("Read FDRInputFormat Test")
                            .setMaster("local");
  JavaSparkContext sc = new JavaSparkContext(sparkConf);

  JavaPairRDD<Text, FDRWritable> fdrFile = sc.newAPIHadoopFile(
                         fileUrl.getPath(), FDRInputFormat.class,
                         Text.class, FDRWritable.class, conf);

  JavaPairRDD<String, String> testResult = fdrFile.mapToPair(new TestFunc()); // Applying test to the input
}

class TestFunc implements PairFunction<Tuple2<Text,FDRWritable>, String, String> {
  public Tuple2<String, String> call(Tuple2<Text, FDRWritable> inTuple) throws Exception {
    FDRWritable fdrRecord = inTuple._2();
    FDRDoubleField VAR_1107 = fdrRecord.VAR_1107;
    assertTrue(VAR_1107.getName().equals("VAR_1107"));
  }
}

New API in this context always referrers to the Hadoop org.apache.hadoop.mapreduce API instead of the old org.apache.hadoop.mapred API.

Futher Readings

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s