Cross Validation
In [1]:
Copied!
from pypekit import Task
import pandas as pd
class IrisLoader(Task):
input_types = ["source"]
output_types = ["raw"]
def run(self, _):
from sklearn.datasets import load_iris
iris = load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df['target'] = iris.target
return iris_df
class TrainTestSplitter(Task):
input_types = ["raw"]
output_types = ["split"]
def run(self, df):
from sklearn.model_selection import KFold
kf = KFold(n_splits=self.run_config['n_splits'], shuffle=True, random_state=self.run_config['random_state'])
split_index = self.run_config['split_index']
train_indices, test_indices = list(kf.split(df))[split_index]
train_df = df.iloc[train_indices].reset_index(drop=True)
test_df = df.iloc[test_indices].reset_index(drop=True)
train_df['train'] = 1
test_df['train'] = 0
df = pd.concat([train_df, test_df], ignore_index=True)
return df
class Scaler(Task):
input_types = ["split"]
output_types = ["processed"]
def run(self, df):
X = df.drop(columns=['target', 'train'])
X_train = X[df['train'] == 1]
scaler = self.get_scaler()
scaler.fit(X_train)
X_scaled = scaler.transform(X)
scaled_df = pd.DataFrame(data=X_scaled, columns=X.columns)
scaled_df['target'] = df['target']
scaled_df['train'] = df['train']
return scaled_df
def get_scaler(self):
raise NotImplementedError("Subclasses should implement this method.")
class MinMaxScaler(Scaler):
def get_scaler(self):
from sklearn.preprocessing import MinMaxScaler
return MinMaxScaler()
class StandardScaler(Scaler):
def get_scaler(self):
from sklearn.preprocessing import StandardScaler
return StandardScaler()
class PCA(Task):
input_types = ["processed"]
output_types = ["pca"]
def __init__(self, **kwargs):
self.kwargs = kwargs
def run(self, df):
X = df.drop(columns=['target', 'train'])
X_train = X[df['train'] == 1]
from sklearn.decomposition import PCA
pca = PCA(**self.kwargs)
pca.fit(X_train)
X_pca = pca.transform(X)
pca_df = pd.DataFrame(data=X_pca, columns=[
f'PC{i+1}' for i in range(X_pca.shape[1])])
pca_df['target'] = df['target']
pca_df['train'] = df['train']
return pca_df
class Classifier(Task):
input_types = ["split", "processed", "pca"]
output_types = ["predicted"]
def run(self, df):
X = df.drop(columns=['target', 'train'])
y = df['target']
X_train = X[df['train'] == 1]
y_train = y[df['train'] == 1]
classifier = self.get_classifier()
classifier.fit(X_train, y_train)
y_pred = classifier.predict(X)
df['predicted'] = y_pred
return df
def get_scaler(self):
raise NotImplementedError("Subclasses should implement this method.")
class LogisticRegression(Classifier):
def __init__(self, **kwargs):
self.kwargs = kwargs
def get_classifier(self):
from sklearn.linear_model import LogisticRegression
return LogisticRegression(**self.kwargs)
class RandomForestClassifier(Classifier):
def __init__(self, **kwargs):
self.kwargs = kwargs
def get_classifier(self):
from sklearn.ensemble import RandomForestClassifier
return RandomForestClassifier(**self.kwargs)
class SVC(Classifier):
def __init__(self, **kwargs):
self.kwargs = kwargs
def get_classifier(self):
from sklearn.svm import SVC
return SVC(**self.kwargs)
class Evaluator(Task):
input_types = ["predicted"]
output_types = ["sink"]
def run(self, df):
df_test = df[df['train'] == 0]
return (df_test['target'] == df_test['predicted']).mean()
from pypekit import Task
import pandas as pd
class IrisLoader(Task):
input_types = ["source"]
output_types = ["raw"]
def run(self, _):
from sklearn.datasets import load_iris
iris = load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df['target'] = iris.target
return iris_df
class TrainTestSplitter(Task):
input_types = ["raw"]
output_types = ["split"]
def run(self, df):
from sklearn.model_selection import KFold
kf = KFold(n_splits=self.run_config['n_splits'], shuffle=True, random_state=self.run_config['random_state'])
split_index = self.run_config['split_index']
train_indices, test_indices = list(kf.split(df))[split_index]
train_df = df.iloc[train_indices].reset_index(drop=True)
test_df = df.iloc[test_indices].reset_index(drop=True)
train_df['train'] = 1
test_df['train'] = 0
df = pd.concat([train_df, test_df], ignore_index=True)
return df
class Scaler(Task):
input_types = ["split"]
output_types = ["processed"]
def run(self, df):
X = df.drop(columns=['target', 'train'])
X_train = X[df['train'] == 1]
scaler = self.get_scaler()
scaler.fit(X_train)
X_scaled = scaler.transform(X)
scaled_df = pd.DataFrame(data=X_scaled, columns=X.columns)
scaled_df['target'] = df['target']
scaled_df['train'] = df['train']
return scaled_df
def get_scaler(self):
raise NotImplementedError("Subclasses should implement this method.")
class MinMaxScaler(Scaler):
def get_scaler(self):
from sklearn.preprocessing import MinMaxScaler
return MinMaxScaler()
class StandardScaler(Scaler):
def get_scaler(self):
from sklearn.preprocessing import StandardScaler
return StandardScaler()
class PCA(Task):
input_types = ["processed"]
output_types = ["pca"]
def __init__(self, **kwargs):
self.kwargs = kwargs
def run(self, df):
X = df.drop(columns=['target', 'train'])
X_train = X[df['train'] == 1]
from sklearn.decomposition import PCA
pca = PCA(**self.kwargs)
pca.fit(X_train)
X_pca = pca.transform(X)
pca_df = pd.DataFrame(data=X_pca, columns=[
f'PC{i+1}' for i in range(X_pca.shape[1])])
pca_df['target'] = df['target']
pca_df['train'] = df['train']
return pca_df
class Classifier(Task):
input_types = ["split", "processed", "pca"]
output_types = ["predicted"]
def run(self, df):
X = df.drop(columns=['target', 'train'])
y = df['target']
X_train = X[df['train'] == 1]
y_train = y[df['train'] == 1]
classifier = self.get_classifier()
classifier.fit(X_train, y_train)
y_pred = classifier.predict(X)
df['predicted'] = y_pred
return df
def get_scaler(self):
raise NotImplementedError("Subclasses should implement this method.")
class LogisticRegression(Classifier):
def __init__(self, **kwargs):
self.kwargs = kwargs
def get_classifier(self):
from sklearn.linear_model import LogisticRegression
return LogisticRegression(**self.kwargs)
class RandomForestClassifier(Classifier):
def __init__(self, **kwargs):
self.kwargs = kwargs
def get_classifier(self):
from sklearn.ensemble import RandomForestClassifier
return RandomForestClassifier(**self.kwargs)
class SVC(Classifier):
def __init__(self, **kwargs):
self.kwargs = kwargs
def get_classifier(self):
from sklearn.svm import SVC
return SVC(**self.kwargs)
class Evaluator(Task):
input_types = ["predicted"]
output_types = ["sink"]
def run(self, df):
df_test = df[df['train'] == 0]
return (df_test['target'] == df_test['predicted']).mean()
In [2]:
Copied!
from pypekit import Repository, CachedExecutor
repository = Repository([
IrisLoader,
TrainTestSplitter,
MinMaxScaler,
StandardScaler,
PCA,
LogisticRegression,
RandomForestClassifier,
SVC,
Evaluator
])
repository.build_tree()
print(repository.build_tree_string())
from pypekit import Repository, CachedExecutor
repository = Repository([
IrisLoader,
TrainTestSplitter,
MinMaxScaler,
StandardScaler,
PCA,
LogisticRegression,
RandomForestClassifier,
SVC,
Evaluator
])
repository.build_tree()
print(repository.build_tree_string())
└── Root()
└── IrisLoader()
└── TrainTestSplitter()
├── MinMaxScaler()
│ ├── PCA()
│ │ ├── LogisticRegression()
│ │ │ └── Evaluator()
│ │ ├── RandomForestClassifier()
│ │ │ └── Evaluator()
│ │ └── SVC()
│ │ └── Evaluator()
│ ├── LogisticRegression()
│ │ └── Evaluator()
│ ├── RandomForestClassifier()
│ │ └── Evaluator()
│ └── SVC()
│ └── Evaluator()
├── StandardScaler()
│ ├── PCA()
│ │ ├── LogisticRegression()
│ │ │ └── Evaluator()
│ │ ├── RandomForestClassifier()
│ │ │ └── Evaluator()
│ │ └── SVC()
│ │ └── Evaluator()
│ ├── LogisticRegression()
│ │ └── Evaluator()
│ ├── RandomForestClassifier()
│ │ └── Evaluator()
│ └── SVC()
│ └── Evaluator()
├── LogisticRegression()
│ └── Evaluator()
├── RandomForestClassifier()
│ └── Evaluator()
└── SVC()
└── Evaluator()
In [3]:
Copied!
pipelines = repository.build_pipelines()
executor = CachedExecutor(pipelines, verbose=True)
n_splits = 5
results_dfs = []
for i in range(n_splits):
print(f"Running split {i + 1}/{n_splits}")
results = executor.run(run_config={"n_splits": n_splits, "split_index": i, "random_state": 42})
results = pd.DataFrame(results).transpose()
results.reset_index(inplace=True, names="pipeline")
results_dfs.append(results)
pipelines = repository.build_pipelines()
executor = CachedExecutor(pipelines, verbose=True)
n_splits = 5
results_dfs = []
for i in range(n_splits):
print(f"Running split {i + 1}/{n_splits}")
results = executor.run(run_config={"n_splits": n_splits, "split_index": i, "random_state": 42})
results = pd.DataFrame(results).transpose()
results.reset_index(inplace=True, names="pipeline")
results_dfs.append(results)
Running split 1/5 Pipeline 1/15 completed. Runtime: 0.44s. Pipeline 2/15 completed. Runtime: 0.53s. Pipeline 3/15 completed. Runtime: 0.43s. Pipeline 4/15 completed. Runtime: 0.41s. Pipeline 5/15 completed. Runtime: 0.47s. Pipeline 6/15 completed. Runtime: 0.41s. Pipeline 7/15 completed. Runtime: 0.41s. Pipeline 8/15 completed. Runtime: 0.49s. Pipeline 9/15 completed. Runtime: 0.41s. Pipeline 10/15 completed. Runtime: 0.41s. Pipeline 11/15 completed. Runtime: 0.49s. Pipeline 12/15 completed. Runtime: 0.41s. Pipeline 13/15 completed. Runtime: 0.41s. Pipeline 14/15 completed. Runtime: 0.49s. Pipeline 15/15 completed. Runtime: 0.41s. Running split 2/5 Pipeline 1/15 completed. Runtime: 0.01s. Pipeline 2/15 completed. Runtime: 0.08s. Pipeline 3/15 completed. Runtime: 0.01s. Pipeline 4/15 completed. Runtime: 0.01s. Pipeline 5/15 completed. Runtime: 0.08s. Pipeline 6/15 completed. Runtime: 0.01s. Pipeline 7/15 completed. Runtime: 0.01s. Pipeline 8/15 completed. Runtime: 0.09s. Pipeline 9/15 completed. Runtime: 0.01s. Pipeline 10/15 completed. Runtime: 0.01s. Pipeline 11/15 completed. Runtime: 0.08s. Pipeline 12/15 completed. Runtime: 0.01s. Pipeline 13/15 completed. Runtime: 0.01s. Pipeline 14/15 completed. Runtime: 0.08s. Pipeline 15/15 completed. Runtime: 0.00s. Running split 3/5 Pipeline 1/15 completed. Runtime: 0.01s. Pipeline 2/15 completed. Runtime: 0.07s. Pipeline 3/15 completed. Runtime: 0.01s. Pipeline 4/15 completed. Runtime: 0.01s. Pipeline 5/15 completed. Runtime: 0.07s. Pipeline 6/15 completed. Runtime: 0.01s. Pipeline 7/15 completed. Runtime: 0.01s. Pipeline 8/15 completed. Runtime: 0.08s. Pipeline 9/15 completed. Runtime: 0.01s. Pipeline 10/15 completed. Runtime: 0.01s. Pipeline 11/15 completed. Runtime: 0.08s. Pipeline 12/15 completed. Runtime: 0.01s. Pipeline 13/15 completed. Runtime: 0.01s. Pipeline 14/15 completed. Runtime: 0.08s. Pipeline 15/15 completed. Runtime: 0.00s. Running split 4/5 Pipeline 1/15 completed. Runtime: 0.01s. Pipeline 2/15 completed. Runtime: 0.08s. Pipeline 3/15 completed. Runtime: 0.01s. Pipeline 4/15 completed. Runtime: 0.01s. Pipeline 5/15 completed. Runtime: 0.08s. Pipeline 6/15 completed. Runtime: 0.01s. Pipeline 7/15 completed. Runtime: 0.01s. Pipeline 8/15 completed. Runtime: 0.07s. Pipeline 9/15 completed. Runtime: 0.01s. Pipeline 10/15 completed. Runtime: 0.01s. Pipeline 11/15 completed. Runtime: 0.06s. Pipeline 12/15 completed. Runtime: 0.01s. Pipeline 13/15 completed. Runtime: 0.01s. Pipeline 14/15 completed. Runtime: 0.07s. Pipeline 15/15 completed. Runtime: 0.00s. Running split 5/5 Pipeline 1/15 completed. Runtime: 0.01s. Pipeline 2/15 completed. Runtime: 0.07s. Pipeline 3/15 completed. Runtime: 0.01s. Pipeline 4/15 completed. Runtime: 0.01s. Pipeline 5/15 completed. Runtime: 0.06s. Pipeline 6/15 completed. Runtime: 0.00s. Pipeline 7/15 completed. Runtime: 0.01s. Pipeline 8/15 completed. Runtime: 0.06s. Pipeline 9/15 completed. Runtime: 0.01s. Pipeline 10/15 completed. Runtime: 0.01s. Pipeline 11/15 completed. Runtime: 0.06s. Pipeline 12/15 completed. Runtime: 0.00s. Pipeline 13/15 completed. Runtime: 0.01s. Pipeline 14/15 completed. Runtime: 0.06s. Pipeline 15/15 completed. Runtime: 0.00s.
In [4]:
Copied!
result_df = pd.concat(results_dfs)
accuracies_df = result_df.groupby("pipeline").agg(
tasks=("tasks", "first"),
mean_runtime=("runtime", "mean"),
mean_accuracy=("output", "mean"),
)
accuracies_df.sort_values("mean_accuracy", ascending=False, inplace=True)
accuracies_df
result_df = pd.concat(results_dfs)
accuracies_df = result_df.groupby("pipeline").agg(
tasks=("tasks", "first"),
mean_runtime=("runtime", "mean"),
mean_accuracy=("output", "mean"),
)
accuracies_df.sort_values("mean_accuracy", ascending=False, inplace=True)
accuracies_df
Out[4]:
| tasks | mean_runtime | mean_accuracy | |
|---|---|---|---|
| pipeline | |||
| 5e4b774877894039b00f9adf27cc9712 | [IrisLoader(), TrainTestSplitter(), LogisticRe... | 0.090403 | 0.973333 |
| 140880859b6b439ea849670ecd3cbbdb | [IrisLoader(), TrainTestSplitter(), RandomFore... | 0.156145 | 0.966667 |
| 35bb0afb52a544f09b54daccf77e1760 | [IrisLoader(), TrainTestSplitter(), SVC(), Eva... | 0.084914 | 0.966667 |
| 6116787469924beda310f1e67d9e98a1 | [IrisLoader(), TrainTestSplitter(), MinMaxScal... | 0.086613 | 0.96 |
| d2af8d065d12468f895d5a1ce3236a2f | [IrisLoader(), TrainTestSplitter(), MinMaxScal... | 0.151889 | 0.96 |
| 2e978bd2bb8f4a3aa8742547ebd735a5 | [IrisLoader(), TrainTestSplitter(), StandardSc... | 0.088281 | 0.953333 |
| 33b1be8810154717b3962e007767bd01 | [IrisLoader(), TrainTestSplitter(), StandardSc... | 0.153125 | 0.953333 |
| 35fc998ecbbe4e63aac5fff7a932795c | [IrisLoader(), TrainTestSplitter(), StandardSc... | 0.086322 | 0.953333 |
| 45668d2182e840bfa921a054f51cced5 | [IrisLoader(), TrainTestSplitter(), StandardSc... | 0.089047 | 0.953333 |
| 7c136bfbd2cc41418429902ab039dd03 | [IrisLoader(), TrainTestSplitter(), MinMaxScal... | 0.091754 | 0.953333 |
| 87edee5931f344b686ce9a45434a55d9 | [IrisLoader(), TrainTestSplitter(), MinMaxScal... | 0.166397 | 0.953333 |
| b3bca9104757482ca1c7526a8279a125 | [IrisLoader(), TrainTestSplitter(), StandardSc... | 0.158291 | 0.953333 |
| e04e2a370db34f10a942312ad34630b0 | [IrisLoader(), TrainTestSplitter(), StandardSc... | 0.087383 | 0.953333 |
| 442b5d9c378241a78365caa5e9e32d1b | [IrisLoader(), TrainTestSplitter(), MinMaxScal... | 0.094101 | 0.926667 |
| bd5519178a59461baf8b8d92a15174aa | [IrisLoader(), TrainTestSplitter(), MinMaxScal... | 0.087636 | 0.926667 |