Skip to main content

sklearn Integration

The nimbus-bci classifiers are fully compatible with scikit-learn, allowing seamless integration with pipelines, cross-validation, hyperparameter tuning, and the broader sklearn ecosystem.

sklearn-Compatible API

All nimbus-bci classifiers implement the sklearn estimator interface:
from nimbus_bci import NimbusLDA

clf = NimbusLDA()

# Standard sklearn methods
clf.fit(X_train, y_train)              # Train
predictions = clf.predict(X_test)       # Predict classes
probabilities = clf.predict_proba(X_test)  # Predict probabilities
score = clf.score(X_test, y_test)      # Accuracy score

Pipelines

Basic Pipeline

Combine preprocessing with classification:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from nimbus_bci import NimbusLDA

# Create pipeline
pipe = make_pipeline(
    StandardScaler(),
    NimbusLDA(mu_scale=3.0)
)

# Fit and predict
pipe.fit(X_train, y_train)
predictions = pipe.predict(X_test)
score = pipe.score(X_test, y_test)
print(f"Accuracy: {score:.2%}")

Multi-Step Pipeline

Add feature selection and normalization:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.feature_selection import SelectKBest, f_classif
from nimbus_bci import NimbusLDA

pipe = Pipeline([
    ('scaler', RobustScaler()),
    ('feature_selection', SelectKBest(f_classif, k=10)),
    ('classifier', NimbusLDA(mu_scale=5.0))
])

pipe.fit(X_train, y_train)
print(f"Selected features: {pipe.named_steps['feature_selection'].get_support()}")

BCI-Specific Pipeline

Complete BCI preprocessing pipeline:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from nimbus_bci import NimbusLDA

# Complete BCI pipeline
bci_pipeline = Pipeline([
    ('normalization', StandardScaler()),
    ('dimensionality_reduction', PCA(n_components=16)),
    ('classifier', NimbusLDA(mu_scale=3.0))
])

# Train on CSP features
bci_pipeline.fit(csp_features, labels)

# Real-time prediction
new_trial = extract_features(eeg_chunk)
prediction = bci_pipeline.predict(new_trial.reshape(1, -1))[0]

Cross-Validation

Basic Cross-Validation

Evaluate model performance:
from sklearn.model_selection import cross_val_score
from nimbus_bci import NimbusLDA

clf = NimbusLDA()
scores = cross_val_score(clf, X, y, cv=5, scoring='accuracy')

print(f"Accuracy: {scores.mean():.2%} (+/- {scores.std():.2%})")

Stratified K-Fold

Maintain class balance in folds:
from sklearn.model_selection import StratifiedKFold, cross_val_score
from nimbus_bci import NimbusGMM

clf = NimbusGMM()
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(clf, X, y, cv=cv, scoring='accuracy')

print(f"Stratified CV Accuracy: {scores.mean():.2%}")

Multiple Metrics

Evaluate multiple metrics simultaneously:
from sklearn.model_selection import cross_validate
from nimbus_bci import NimbusLDA

clf = NimbusLDA()
scoring = ['accuracy', 'precision_macro', 'recall_macro', 'f1_macro']
scores = cross_validate(clf, X, y, cv=5, scoring=scoring)

for metric in scoring:
    print(f"{metric}: {scores[f'test_{metric}'].mean():.3f}")

Leave-One-Subject-Out

For BCI with multiple subjects:
from sklearn.model_selection import LeaveOneGroupOut, cross_val_score
from nimbus_bci import NimbusLDA

clf = NimbusLDA()
logo = LeaveOneGroupOut()

# subject_ids: array indicating which subject each trial belongs to
scores = cross_val_score(clf, X, y, groups=subject_ids, cv=logo)

print(f"LOSO Accuracy: {scores.mean():.2%}")
print(f"Per-subject scores: {scores}")

Hyperparameter Tuning

Exhaustive search over parameter grid:
from sklearn.model_selection import GridSearchCV
from nimbus_bci import NimbusLDA

# Define parameter grid
param_grid = {
    'mu_scale': [1.0, 3.0, 5.0, 7.0],
    'class_prior_alpha': [0.5, 1.0, 2.0]
}

# Grid search with cross-validation
grid = GridSearchCV(
    NimbusLDA(),
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)

grid.fit(X_train, y_train)

print(f"Best parameters: {grid.best_params_}")
print(f"Best CV score: {grid.best_score_:.2%}")

# Use best model
best_clf = grid.best_estimator_
test_score = best_clf.score(X_test, y_test)
print(f"Test accuracy: {test_score:.2%}")
More efficient for large parameter spaces:
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
from nimbus_bci import NimbusGMM

# Define parameter distributions
param_dist = {
    'mu_scale': uniform(1.0, 10.0),
    'class_prior_alpha': uniform(0.1, 5.0)
}

# Random search
random_search = RandomizedSearchCV(
    NimbusGMM(),
    param_dist,
    n_iter=20,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    random_state=42
)

random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")

Pipeline Parameter Tuning

Tune parameters across entire pipeline:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from nimbus_bci import NimbusLDA

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('clf', NimbusLDA())
])

# Tune both preprocessing and classifier
param_grid = {
    'scaler__with_mean': [True, False],
    'scaler__with_std': [True, False],
    'clf__mu_scale': [1.0, 3.0, 5.0],
    'clf__class_prior_alpha': [0.5, 1.0, 2.0]
}

grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)
grid.fit(X_train, y_train)

print(f"Best pipeline: {grid.best_params_}")

Model Selection

Compare Multiple Classifiers

Compare different nimbus-bci classifiers:
from sklearn.model_selection import cross_val_score
from nimbus_bci import NimbusLDA, NimbusGMM, NimbusSoftmax

classifiers = {
    'LDA': NimbusLDA(),
    'GMM': NimbusGMM(),
    'Softmax': NimbusSoftmax()
}

results = {}
for name, clf in classifiers.items():
    scores = cross_val_score(clf, X, y, cv=5, scoring='accuracy')
    results[name] = scores
    print(f"{name}: {scores.mean():.2%} (+/- {scores.std():.2%})")

# Select best
best_clf = max(results.items(), key=lambda x: x[1].mean())
print(f"\nBest classifier: {best_clf[0]}")

Compare with sklearn Classifiers

Benchmark against sklearn models:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
from nimbus_bci import NimbusLDA

classifiers = {
    'NimbusLDA': NimbusLDA(),
    'sklearn LDA': LinearDiscriminantAnalysis(),
    'Random Forest': RandomForestClassifier(n_estimators=100),
    'SVM': SVC(probability=True)
}

for name, clf in classifiers.items():
    scores = cross_val_score(clf, X, y, cv=5)
    print(f"{name}: {scores.mean():.2%} (+/- {scores.std():.2%})")

Feature Selection

Univariate Feature Selection

Select best features before classification:
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.pipeline import Pipeline
from nimbus_bci import NimbusLDA

pipe = Pipeline([
    ('feature_selection', SelectKBest(f_classif, k=10)),
    ('classifier', NimbusLDA())
])

pipe.fit(X_train, y_train)

# Get selected features
selected = pipe.named_steps['feature_selection'].get_support()
print(f"Selected {selected.sum()} features")

Recursive Feature Elimination

Iteratively remove least important features:
from sklearn.feature_selection import RFE
from nimbus_bci import NimbusLDA

clf = NimbusLDA()
rfe = RFE(clf, n_features_to_select=10)
rfe.fit(X_train, y_train)

print(f"Selected features: {rfe.support_}")
print(f"Feature ranking: {rfe.ranking_}")

Ensemble Methods

Voting Classifier

Combine multiple nimbus-bci classifiers:
from sklearn.ensemble import VotingClassifier
from nimbus_bci import NimbusLDA, NimbusGMM, NimbusSoftmax

ensemble = VotingClassifier(
    estimators=[
        ('lda', NimbusLDA()),
        ('gmm', NimbusGMM()),
        ('softmax', NimbusSoftmax())
    ],
    voting='soft'  # Use probabilities
)

ensemble.fit(X_train, y_train)
score = ensemble.score(X_test, y_test)
print(f"Ensemble accuracy: {score:.2%}")

Bagging

Bootstrap aggregating for variance reduction:
from sklearn.ensemble import BaggingClassifier
from nimbus_bci import NimbusLDA

bagging = BaggingClassifier(
    base_estimator=NimbusLDA(),
    n_estimators=10,
    random_state=42
)

bagging.fit(X_train, y_train)
score = bagging.score(X_test, y_test)
print(f"Bagging accuracy: {score:.2%}")

Calibration

Probability Calibration

Calibrate predicted probabilities:
from sklearn.calibration import CalibratedClassifierCV
from nimbus_bci import NimbusLDA

# Calibrate with cross-validation
calibrated = CalibratedClassifierCV(
    NimbusLDA(),
    method='isotonic',
    cv=5
)

calibrated.fit(X_train, y_train)
probs = calibrated.predict_proba(X_test)

# Check calibration
from sklearn.calibration import calibration_curve
prob_true, prob_pred = calibration_curve(y_test, probs[:, 1], n_bins=10)

Custom Transformers

BCI-Specific Transformer

Create custom transformer for BCI preprocessing:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class TemporalAggregator(BaseEstimator, TransformerMixin):
    """Aggregate temporal features for BCI."""
    
    def __init__(self, method='logvar'):
        self.method = method
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        if self.method == 'logvar':
            return np.log(np.var(X, axis=1))
        elif self.method == 'mean':
            return np.mean(X, axis=1)
        elif self.method == 'rms':
            return np.sqrt(np.mean(X**2, axis=1))
        return X

# Use in pipeline
from sklearn.pipeline import Pipeline
from nimbus_bci import NimbusLDA

pipe = Pipeline([
    ('temporal_agg', TemporalAggregator(method='logvar')),
    ('classifier', NimbusLDA())
])

pipe.fit(X_train, y_train)

Performance Evaluation

Comprehensive Metrics

Evaluate with multiple metrics:
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, classification_report
)
from nimbus_bci import NimbusLDA

clf = NimbusLDA()
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

print(f"Accuracy: {accuracy_score(y_test, y_pred):.2%}")
print(f"Precision: {precision_score(y_test, y_pred, average='macro'):.2%}")
print(f"Recall: {recall_score(y_test, y_pred, average='macro'):.2%}")
print(f"F1: {f1_score(y_test, y_pred, average='macro'):.2%}")

print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))

print("\nClassification Report:")
print(classification_report(y_test, y_pred))

ROC and AUC

For binary classification:
from sklearn.metrics import roc_curve, auc, roc_auc_score
import matplotlib.pyplot as plt
from nimbus_bci import NimbusGMM

clf = NimbusGMM()
clf.fit(X_train, y_train)
y_probs = clf.predict_proba(X_test)[:, 1]

# ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_probs)
roc_auc = auc(fpr, tpr)

plt.plot(fpr, tpr, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.show()

print(f"AUC: {roc_auc_score(y_test, y_probs):.3f}")

Saving and Loading

Joblib Persistence

Save entire pipeline:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from nimbus_bci import NimbusLDA
import joblib

# Create and train pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('clf', NimbusLDA())
])
pipe.fit(X_train, y_train)

# Save pipeline
joblib.dump(pipe, 'bci_pipeline.pkl')

# Load pipeline
loaded_pipe = joblib.load('bci_pipeline.pkl')
predictions = loaded_pipe.predict(X_test)

Best Practices

1. Always Use Pipelines

Encapsulate preprocessing with classification:
# Good: Preprocessing in pipeline
pipe = make_pipeline(StandardScaler(), NimbusLDA())
pipe.fit(X_train, y_train)
pipe.predict(X_test)

# Bad: Manual preprocessing
X_train_scaled = StandardScaler().fit_transform(X_train)
X_test_scaled = StandardScaler().fit_transform(X_test)  # Wrong!
clf = NimbusLDA().fit(X_train_scaled, y_train)

2. Use Stratified Splits

Maintain class balance:
from sklearn.model_selection import StratifiedKFold

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(clf, X, y, cv=cv)

3. Tune Hyperparameters

Always optimize hyperparameters:
from sklearn.model_selection import GridSearchCV

param_grid = {'mu_scale': [1.0, 3.0, 5.0]}
grid = GridSearchCV(NimbusLDA(), param_grid, cv=5)
grid.fit(X_train, y_train)
best_clf = grid.best_estimator_

4. Evaluate on Held-Out Test Set

Never tune on test data:
from sklearn.model_selection import train_test_split

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Tune on train set only
grid = GridSearchCV(NimbusLDA(), param_grid, cv=5)
grid.fit(X_train, y_train)

# Final evaluation on test set
test_score = grid.score(X_test, y_test)

Next Steps