Hadoop supports multiple file formats as input for MapReduce workflows, including programs executed with Apache Spark. Defining custom InputFormats is a common practice among Hadoop Data Engineers and will be discussed here based on publicly available data set.
The approach demonstrated in this post does not provide means for a general MATLAB™ InputFormat for Hadoop. This would require significant effort in finding a general purpose mapping of MATLAB™’s file format and type system to the ones of HDFS.
FDR Data
The data set given here is provided by the NASA and contains multiple so called Flight Data Records (FDR), which are recordings of airplane data collected before, after, and during a flight. The NASA has published a sample data set of multiple flights here. You can use this download script or take this adapted script which tracks download progress.
Flight data are measured by hundreds to thousands of sensors across an aircraft during flights. While regulators like the FAA only define a minimum of parameters that have to be collected at a minimum of sampling rates, today’s airplanes have the capacity of collecting a much larger amount of data at even higher frequency. The table below compares the number of parameters based on the airplane type:
Aircraft | Time into Service | FDR Type | Number of Parameters |
Boeing 707 | 1958 | Analogue | 5 |
Airbus 330 | 1993 | Digital | 280 |
Embraer 170 | 2004 | Digital Combi-recorder | 774 |
Airbus 380 | 2007 | Digital | > 1000 |
Boeing 787 | 2009 | Digital EAFR | > 1000 |
Data recording units, so called FDAU (Flight Data Acquisition Units) are responsible of producing a constant flow of converted data retrieved form connected sensors. The data is then send to the FDR (Flight Data Recoder).
As data can be measured at different rates the FDAUs output a binary file sequenced in four-second frames. Each frame is further divided into four one-second-subframes of which each can hold 64, 128, 256, or 512 dates of 12 bit values.

MATLAB File Format
The data provided by the NASA is organized into MATLAB™ files that preserves the basic mechanism of a FDR data collection as described above. Each file contains 186 parameters that contain a sequence of data points. For each data point the Rate at which the data was collected, a Description, the Name, Unit, and Alpha value are additionally provided.
VAR_1107: Name: char[] Alpha: char[] Description: char[] Rate: double[] data: double[]
In the MATLAB™ file format these parameters are represented as multi-dimensional typed arrays. A detailed specification of the MATLAB format can be found here.
MATLAB data type mapping:
0 = mxUNKNOWN_CLASS 1 = mxCELL_CLASS 2 = mxSTRUCT_CLASS 3 = mxOBJECT_CLASS 4 = mxCHAR_CLASS 5 = mxSPARSE_CLASS 6 = mxDOUBLE_CLASS 7 = mxSINGLE_CLASS 8 = mxINT8_CLASS 9 = mxUINT8_CLASS 10 = mxINT16_CLASS 11 = mxUINT16_CLASS 12 = mxINT32_CLASS 13 = mxUINT32_CLASS 14 = mxINT64_CLASS 15 = mxUINT64_CLASS 16 = mxFUNCTION_CLASS 17 = mxOPAQUE_CLASS
In order to be able to read the files in Java, we are using the JMatIO library that is an actively maintained fork of the matfilerw project.
A Custom InputFormat
The motivation behind the creation of a custom InputFormat usually is to provide a common access to files across a HDP platform that can easily be reused. Another motivation could be the optimization of data storage or data access, which is the primary focus of In-/OutputFormats like ORC or Parquet. Understanding InputFormats is an important skill for any Hadoop Data Engineer.
Besides the InputFormat custom data types to represent a FDR records are created. The FDRWritable is the container for 186 parameters collected from the FDR record file. The FDR data only contains two type of data array, a double and integer array. A parameter will either be represented as a FDRDoubleField or a FDRIntField. Both fields hold the additional value of the data rate, name, description, alpha, and units.
public class FDRDoubleField { public ArrayWritable data; public Text name; public Text description; public Text unit; public DoubleWritable rate; public IntWritable mlType; public FDRDoubleField(){ set(new Text(), new Text(), new Text(), new DoubleWritable(), new IntWritable(), new ArrayWritable(DoubleWritable.class) ); } ... }
Input Splits
Input splits are the fragment of a file that is processed by a Mapper. So the amount of mappers is based on the amount of input splits for a file. The default size of an input split is the same as the overall block size, so most often 64MB or 128MB.
But the input split can also be customized based on a minimum and maximum amount of bytes. Further is it possible to combine input splits with the CombineFileInputFormat InputFomat.
In our case we can not risk reading just a chunk or multiple files. Therefor our InputFormat is marked as not being split able ensuring that each map task is assigned to one complete file.
/** * Read whole file. This does not support spliting MatLab files. */ @Override protected boolean isSplitable(JobContext context, Path file) { return false; }
FDR Record Reader
Each InputFormat holds a RecordReader for reading out the split in the right format. Additionally each mapper expects a typed key value pair as input for each map step.
public RecordReader<Text, FDRWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FDRRecordReader fdrRecordReader = new FDRRecordReader(); fdrRecordReader.initialize(split, context); return fdrRecordReader; }
The RecordReader here returns Text and the specified FDRWritable key value pair to each mapper.
The record reader receives as input the file split of a complete MATLAB™ file containing FDR records. It parses the content of the file using the JMatIO library and returns a FDRWritable to the mapper. As key the file name is being used.
Testing the InputFormat with Spark
Since Spark can easily be executed in a local running mode on a single machine, we are taking advantage of this by using it inside of our unit test for our FDRInputFormat<Text, FDRWritable>.
We will also need to setup HDFS for in local mode, by not pointing to a NameNode address but instead to the local file system with file:///. The file system in setup during the @Before routine of our unit test:
@Before public void setUp() throws Exception { fileUrl = Thread.currentThread() .getContextClassLoader() .getResource("652200101111519.mat"); conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); conf.set("mapred.input.format.class", FDRInputFormat.class.getName()); fs = FileSystem.get(conf); }
Setting the Spark execution mode to “local” guaranties the execution as a local thread during our unit test. SparkContext provides an API to access Hadoop files based on a given InputFormat. We use the newAPIHadoopFile in this case to read the sample MATLAB™ file. Using the HDFS configuration from above ensures that we can access the file form the local file system.
@Test public void testFileFormat() throws Exception{ SparkConf sparkConf = new SparkConf() .setAppName("Read FDRInputFormat Test") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaPairRDD<Text, FDRWritable> fdrFile = sc.newAPIHadoopFile( fileUrl.getPath(), FDRInputFormat.class, Text.class, FDRWritable.class, conf); JavaPairRDD<String, String> testResult = fdrFile.mapToPair(new TestFunc()); // Applying test to the input } class TestFunc implements PairFunction<Tuple2<Text,FDRWritable>, String, String> { public Tuple2<String, String> call(Tuple2<Text, FDRWritable> inTuple) throws Exception { FDRWritable fdrRecord = inTuple._2(); FDRDoubleField VAR_1107 = fdrRecord.VAR_1107; assertTrue(VAR_1107.getName().equals("VAR_1107")); } }
New API in this context always referrers to the Hadoop org.apache.hadoop.mapreduce API instead of the old org.apache.hadoop.mapred API.