Training Multiple SVM Classifiers with Apache Pig

Inspired by Twitter‘s publication about “Large Scale Machine Learning” I turned to Pig when it came to implement a SVM classifier for Record Linkage. Searching for different solutions I also came across a presentation of the Huffington Post using a similar approach to training multiple SVM models. The overall idea is to use Hadoop to train multiple models with different parameters at the same time, selecting the best model for the actual classification. There are some limitations to this approach, which I’ll try to address at the end of this post, but first let me describe my approach to training multiple SVM classifiers with Pig.

Disclaimer: This post does not describe the process of training one model in parallel but training multiple models at the same time on multiple machines.

SVM Parameters

Depending on the kernel you use a SVM comes with a few or multiple parameters to tweak. These are usually not very verbose nor intuitive, but on the other hand have the potential to distinguish a good classifier from a bad one.

//platform.twitter.com/widgets.js

Selecting the right parameters commonly ends up being some sort of grid search, where a combination of parameters is continuously adjusted until a local (ideally global) optimum is reached, for example by applying gradient decent. There are two main reasons to distribute this on to a cluster of multiple machines. For once the training of one model on a single machine can take a few minutes. Secondly the complexity of a search for optimal parameters can be of exponential growth.

Let’s for a minute assume you have a SVM classifier with two parameters a and b. For a you now want to try 3 different settings while at the same time you apply 2 for b. Let’s assume further that training a model on your current machine already takes 10 minutes, including cross validation. You would then end up with a hour of compute time to train all 6 (a x b x10 min = 60 min) models.

Rarely you will end up being satisfied with the result of this first run, which will lead you to conducting multiple runs with different parameters. While not an exponential problem, this already will hold you up quite some time until you reach a satisfying state. And what will happen once you have newly labeled data for training? Will you want to do this again on your machine? How about testing different features?

We haven’t even touched feature selection so far! Automating this approach and being able to test multiple or more settings is an obvious advantage.

Using Pig

Having read Twitter‘s publication about “Large Scale Machine Learning” the here described approach will use Pig to achieve a similar setup as of the Huffington Post for training multiple SVM models in parallel. The implementation consists of two main parts one of them being a UDF using Libsvm from within Weka, the other being a execution script to orchestrated the execution.

Input / Ouput

As input I was able to rely on tens of thousands manually labeled instances. In case of Record LInkage we are talking about a binary classification, so that the instances were either labeled as match or no-match.

For a binary classification having more than ten or twenty thousand training instances seems reasonable enough for good results. The instances consisted of all relevant attributes. In the hospitality domain (hotels) this usually means name, contact information (phone, email, etc.), location (street, LatLng, etc.), and amenities (description, stars, etc.).

Still, taking all this together is not enough data to really scale well in the aspect of Map-Reduce. A first approach even evolved loading the training data from distributed cache.

{"source_name": "Hotel INN", "source_city": "Leipzig", "target_name": "H. INN", "city": "Leipzig", "match": 1}
{"source_name": "Motel 24", "source_city": "Leipzig", "target_name": "Hotel 23th", "city": "Leipzig", "match": 0}
{"source_name": "Holiday Hotel", "source_city": "Berlin", "target_name": "Holiday Hotel", "city": None, "match": 1}

Additionally to the training data the process takes the complexity and gamma parameter of a sigmoid kernel as well as the features. These are provided as file input to Pig rather than execution parameters of a wrapper script.

Features used by the SVM model can be parametrized by a enumeration of strings. For example there exist multiple string similarity measures for the name attribute of a hotel, of which all, some, or none can be included as features for the model. For example this three feature names name_dice, name_editDistance, and name_euclidean denote the similarity between the source and target name by either using Dice, Edit- or Euclidean-Distance measurements.

For the purpose of evaluating the trained models the training data would be split into an evaluation set and a set for training. This currently does not happen within the training script but in a separate pre-processing step.

Pig Cube for Feature Selection

Doing feature selection by trying all possible variants leads to a complexity of 2^|features|. A recursive algorithm to calculate all possible variations can quickly out grow the memory of your machine even for small amount of features. An approach to use Pig Cube to calculate all possible features in our case also let to some issues. Pig Cube is not doing any magic tricks but gives you the ability to quickly test different subsets of your feature space. Here we are going to use Cube to try different subsets of features for our training.

Pig UDF to Train SVM Models

Once we have a selection of features and parameters created we would need to serve them to some kind of training function. Obviously this is not a fundamental part of an algebraic language like Pig. Fortunately Pig provides the ability to build on top of it’s execution engine and language a way to implement custom functions – User Defined Functions. As stated before the here described UDF elaborates on LIBSVM in combination with Weka to provide machine learning capabilities.

The here provided parameters, features, and training data is not large enough to make Map-Reduce allocate reasonable enough resources for a parallel execution in a mapping phase. Plans to deliberately set the amount of parallelization by the amount of reducers failed due to the fact that we were not able to evenly partition the input. First runs also did not use the trainings data as an input, but loaded the data in the UDF using the distributed cache.

The workflow finally was changed to the extend that the training and evaluation data was multiplied by each given parameter and feature combination. This data was then again stored to HDFS to reach a fair amount of data that would allocate a reasonable amount of mappers on execution. By this we were able to calculate hundreds of models using tens of machines. This approach also enables linear scalability based on the variations of parameters.

Our UDF ended up taking the complexity parameter, gamma parameter, training data, and evaluation set as it’s input. The following snippet shows the use of the UDF within the Pig script:

svm_input = LOAD 'training/input_tmp' 
USING JsonLoader('
        name_bigram: chararray,
        name_dice: chararray,
        [complete list of features],
        complexity: double, 
        gamma: double,
        train_data: {(
           source_name: chararray, source_city: chararray, 
           target_name: chararray, target_city: chararray,
           match: int
        )},
        eval_data: {(
           source_name: chararray, source_city: chararray, 
           target_name: chararray, target_city: chararray,
           match: int
        )}
;
trained_models = FOREACH svm_input GENERATE FLATTEN(trainSVM(*));

By the means of being able to immediately use the best trained model in an appending classification workflow, the UDF was designed to write each model to HDFS given an unique name. The training function would then return the path to the model by also providing the recall, precision, and f-measure for evaluation.

Tuple t = tf.newTuple(7);
      t.set(0, evalResults[0]);       // precision
      t.set(1, evalResults[1]);       // recall
      t.set(2, evalResults[2]);       // f-measure
      t.set(3, modelPath);            // path
      t.set(4, complexity);           // complexity
      t.set(5, gamma);                // gamma
      t.set(6, featuresStr);          // featuresStr

The models themselves are stored directly to HDFS from within the UDF. This can be achieved by obtaining the required settings from the job configuration and using HDFS as the file system for serialization.

Configuration jobConf = UDFContext.getUDFContext().getJobConf();
FileSystem fs = FileSystem.get(jobConf);
FSDataOutputStream out = fs.create(new Path(modelPath));
SerializationHelper.write(out, cls); // cls = classifier model

General Layout

Having described the main parts of the here given approach I would like to provide you now with a general overview of the process as well as an complete excerpt from the pig script run to train the models.

As an input we have the parameters, features, training, and evaluation data. This is put together as an input for our training UDF. The resulting models of the the training function are stored to HDFS directly, while the path to the model together with the evaluation is the overall output of the job.

Training SVM with Pig
Training Multiple SVM Models with Pig

Putting it all together we end up with the below stated Pig script. As discussed previously Cube is used to do some sort of feature selection. In response of reading gamma and complexity values from separate files we CROSS shuffle features, gamma values, and complexity values together. In the end we join them with our training and evaluation data, which get grouped to single bags. This then completes our input for the training of the SVM models. In order to controll the grade of parallelism execution we temporally store the intermediate data back to HDFS. To be precise we don’t control the grade of parallelization as this solely depends on the size of our temporary input. On the other hand this frees us from the requirement to provide proper partitioning for the reduce phase Pig would end up doing the training in otherwise.

complexities = LOAD 'complexities.tsv' USING PigStorage() AS (complexity: double);
gammas = LOAD 'gammas.tsv' USING PigStorage() AS (gamma: double);

features = LOAD 'complete_features.tsv' USING PigStorage() AS (
  name_dice: chararray,
  name_editDistance: chararray,
  name_euclidean: chararray,
  [complete list of features]
);

features_cube = CUBE features BY ROLLUP(name_dice, name_euclidean);
features_cube = FOREACH features_cube GENERATE FLATTEN(cube);

complete_features = CROSS features_cube, complexities, gammas;
complete_features = FOREACH complete_features GENERATE *, 'all' AS gr;

train_data = LOAD 'matching/training/matching_input_training.csv' USING CSVExcelStorage() AS (
  source_name: chararray, 
  source_city: chararray,
  target_name: chararray, 
  target_city: chararray, 
  match: int
);

eval_data = LOAD 'matching/training/matching_eval_training.csv' USING CSVExcelStorage() AS (
  source_name: chararray, 
  source_city: chararray,
  target_name: chararray, 
  target_city: chararray, 
  match: int
);

eval_data = GROUP eval_data ALL;
train_data = GROUP train_data ALL;

train_input = JOIN complete_features BY gr, train_data BY group, eval_data BY group; -- gr = group = 'ALL'

train_input = FOREACH train_input GENERATE
  name_dice,
  name_editDistance,
  name_euclidean,
  [complete list of features]
  complexity,
  gamma,
  train_data,
  eval_data;

STORE train_input INTO 'training/output_tmp' USING JsonStorage(); -- input at map phase

svm_input = LOAD 'matching/training/output_models_tmp' USING JsonLoader(...);
trained_models = FOREACH svm_input GENERATE FLATTEN(trainSVM(*));

-- output path and eval results incl. feature set
STORE trained_models INTO 'training/trained_models' USING PigStorage();

Limitations

Map-Reduce is a simple computational model for parallel execution. It inherently scales by the amount of data you have. While this is good for quite a lot of problems, especially if you have “Big Data”, it is not the best for some. In particular this model does not fit some machine learning and graph based problems that might have little data with exponential complexity you aim to distribute. The here stated approach “blows up” the provided training data to achieve a reasonable distribution of many compute engines.

The here stated approach does not apply a grid search in finding the optimal parameters but only tries multiple settings defined prior of execution. The achieved automation is therefor limited. On the other hand the script could be easily integrated into such an approach to further improve the here provided implementation.

Further Readings

2 thoughts on “Training Multiple SVM Classifiers with Apache Pig

Leave a comment