0%

Machine Learning Pipeline

Scikit-learn Pipeline

Pipeline 1

In most machine learning projects the data that you have to work with is unlikely to be in the ideal format for producing the best performing model. There are quite often a number of transformational steps such as encoding categorical variables, feature scaling and normalisation that need to be performed. Scikit-learn has built in functions for most of these commonly used transformations in it’s preprocessing package.

However, in a typical machine learning workflow you will need to apply all these transformations at least twice. Once when training the model and again on any new data you want to predict on. Of course you could write a function to apply them and reuse that but you would still need to run this first and then call the model separately. Scikit-learn pipelines are a tool to simplify this process. They have several key benefits: - They make your workflow much easier to read and understand. - They enforce the implementation and order of steps in your project. - These in turn make your work much more reproducible.

Before building the pipeline I am splitting the training data into a train and test set so that I can validate the performance of the model.

1
2
3
4
X = train.drop('Target', axis=1)
y = train['Target']
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

The first step in building the pipeline is to define each transformer type. The convention here is generally to create transformers for the different variable types.

1
2
3
4
5
6
7
8
9
10
11
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder

numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())])

categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))])

Next we use the ColumnTransformer to apply the transformations to the correct columns in the dataframe. Before building this I have stored lists of the numeric and categorical columns using the pandas dtype method.

1
2
3
4
5
6
7
8
9
10
numeric_features = train.select_dtypes(include=['int64', 'float64']).columns

categorical_features = train.select_dtypes(include=['object']).drop(['Loan_Status'], axis=1).columns

from sklearn.compose import ColumnTransformer

preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)])

The next step is to create a pipeline that combines the preprocessor created above with a classifier. In this case I have used a simple RandomForestClassifier to start with.

1
2
3
from sklearn.ensemble import RandomForestClassifier
rf = Pipeline(steps=[('preprocessor', preprocessor),
('classifier', RandomForestClassifier())])

Fitting the classifier

1
2
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)

A pipeline can also be used during the model selection process. The following example code loops through a number of scikit-learn classifiers applying the transformations and training the model.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from sklearn.metrics import accuracy_score, log_loss
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC, LinearSVC, NuSVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
classifiers = [
KNeighborsClassifier(3),
SVC(kernel="rbf", C=0.025, probability=True),
NuSVC(probability=True),
DecisionTreeClassifier(),
RandomForestClassifier(),
AdaBoostClassifier(),
GradientBoostingClassifier()
]
for classifier in classifiers:
pipe = Pipeline(steps=[('preprocessor', preprocessor),
('classifier', classifier)])
pipe.fit(X_train, y_train)
print(classifier)
print("model score: %.3f" % pipe.score(X_test, y_test))

The pipeline can also be used in grid search to find the best performing parameters. To do this you first need to create a parameter grid for your chosen model. One important thing to note is that you need to append the name that you have given the classifier part of your pipeline to each parameter name. In my code above I have called this ‘classifier’ so I have added classifier__ to each parameter. Next I created a grid search object which includes the original pipeline. When I then call fit, the transformations are applied to the data, before a cross-validated grid-search is performed over the parameter grid.

1
2
3
4
5
6
7
8
9
10
11
param_grid = { 
'classifier__n_estimators': [200, 500],
'classifier__max_features': ['auto', 'sqrt', 'log2'],
'classifier__max_depth' : [4,5,6,7,8],
'classifier__criterion' :['gini', 'entropy']}
from sklearn.model_selection import GridSearchCV
CV = GridSearchCV(rf, param_grid, n_jobs= 1)

CV.fit(X_train, y_train)
print(CV.best_params_)
print(CV.best_score_)

Pipeline 2

The example below demonstrates the pipeline defined with four steps:

  • Feature Extraction with Principal Component Analysis (3 features)
  • Feature Extraction with Statistical Selection (6 features)
  • Feature Union
  • Learn a Logistic Regression Model
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pandas import read_csv
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.pipeline import FeatureUnion
from sklearn.linear_model import LogisticRegression
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest
# load data
url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"
names = ['preg', 'plas', 'pres', 'skin', 'test', 'mass', 'pedi', 'age', 'class']
dataframe = read_csv(url, names=names)
array = dataframe.values
X = array[:,0:8]
Y = array[:,8]
# create feature union
features = []
features.append(('pca', PCA(n_components=3)))
features.append(('select_best', SelectKBest(k=6)))
feature_union = FeatureUnion(features)
# create pipeline
estimators = []
estimators.append(('feature_union', feature_union))
estimators.append(('logistic', LogisticRegression()))
model = Pipeline(estimators)
# evaluate pipeline
seed = 7
kfold = KFold(n_splits=10, random_state=seed)
results = cross_val_score(model, X, Y, cv=kfold)
print(results.mean())

Building Scikit-Learn transformers

Scikit-Learn's API uses duck typing: if you want to write your own custom estimators (including transformers and predictors), you only need to implement the right methods, you don't have to inherit from any particular class.

For example, all estimators must implement a fit() method, and get_params() and set_params() methods. All transformers must also implement transform() and fit_transform() methods. All predictors must implement a predict() method. And so on.

The most basic implementation of the fit_transform() method is just this:

1
2
def fit_transform(self, X, y=None):
return self.fit(X, y).transform(X, y)

You don't have to inherit from the TransformerMixin class, but that's what you get if you do: if you implement the fit() method and the transform() method, it gives you the fit_transform() method for free, just like the above.

Similarly, the BaseEstimator class just gives you the get_params() and set_params() methods for free. By default, get_params() does some introspection to get the parameters of the constructor init(), and it assumes that the class has corresponding instance variables. For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from sklearn.base import BaseEstimator

class MyEstimator(BaseEstimator):
def __init__(self, a, b=2):
self.a = a
self.b = b

# >>> m = MyEstimator(1, 2)
# >>> m.get_params()
# {'a': 1, 'b': 2}
# >>> m.set_params(a=5, b=10)
# MyEstimator(a=5, b=10)
# >>> m.a
# 5
# >>> m.b
# 10

Let’s say I have a lot of text and I want to extract certain data from it. I’m going to build a featurizer that takes a list of functions, calls each function with our text, and returns the results of all functions as a feature vector.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import re

def longest_run_of_capitol_letters_feature(text):
"""Find the longest run of capitol letters and return their length."""
runs = sorted(re.findall(r"[A-Z]+", text), key=len)
if runs:
return len(runs[-1])
else:
return 0

def percent_character_feature(char):
"""Return percentage of text that is a particular char compared to total
text length."""
def feature_fn(text):
periods = text.count(char)
return periods / len(text)
return feature_fn

class FunctionFeaturizer(BaseEstimator, TransformerMixin):
def __init__(self, *featurizers):
self.featurizers = featurizers

def fit(self, X, y=None):
"""All SciKit-Learn compatible transformers and classifiers have the
same interface. `fit` always returns the same object."""
return self

def transform(self, X):
"""Given a list of original data, return a list of feature vectors."""
fvs = []
for datum in X:
fv = [f(datum) for f in self.featurizers]
fvs.append(fv)
return np.array(fvs)

from sklearn.tree import DecisionTreeClassifier

sms_featurizer = FunctionFeaturizer(longest_run_of_capitol_letters_feature,
percent_character_feature("."))
# sms_featurizer.transform(sms_data[:10])

X_train, X_test, y_train, y_test = train_test_split(sms_data, sms_results)

pipe = make_pipeline(sms_featurizer, DecisionTreeClassifier())
pipe.fit(X_train, y_train)
pipe.score(X_test, y_test)
# => 0.91385498923187369

SparkML Pipeline

EDA

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')


from pyspark.ml.feature import StringIndexer


indexer = StringIndexer(inputCol="class", outputCol="classIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
indexed.select('classIndex').distinct().show()



from pyspark.ml.feature import OneHotEncoder, StringIndexer


encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()



from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
outputCol="features")

# For your special case that has string instead of doubles you should cast them first.
# expr = [col(c).cast("Double").alias(c)
# for c in vectorAssembler.getInputCols()]

# df2 = df2.select(*expr)
features_vectorized = vectorAssembler.transform(encoded)
features_vectorized.show()


from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

min_max_scaler = MinMaxScaler(inputCol="features", outputCol="features_norm")
min_max_scaler_model = min_max_scaler.fit(features_vectorized)
normalized_data = min_max_scaler_model.transform(features_vectorized)
normalized_data.show()

df_train = normalized_data.drop("source").drop("class").drop("classIndex").drop("features").drop("x").drop("y").drop("z")

df_train.show()

Pipeline

1
2
3
4
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, min_max_scaler_model])
model = pipeline.fit(df)
prediction = model.transform(df)

Reference

  • https://medium.com/vickdata/a-simple-guide-to-scikit-learn-pipelines-4ac0d974bdcf