Skip to main content

Basic BCI Examples

This section provides fundamental examples of using Nimbus for brain-computer interface applications. Examples are shown in both Python and Julia.
Choose Your SDK:For detailed model information, see Bayesian LDA, Bayesian GMM, and Bayesian Softmax/MPR.

Motor Imagery Classification

Motor imagery BCI allows users to control devices by imagining movements. This is one of the most common BCI paradigms.

Basic 2-Class Motor Imagery

Classify left vs right hand motor imagery:
from nimbus_bci import NimbusLDA
from nimbus_bci.compat import extract_csp_features
import mne

# Load EEG data
raw = mne.io.read_raw_gdf("motor_imagery.gdf", preload=True)
raw.filter(8, 30)  # Mu + Beta bands

# Create epochs
events = mne.find_events(raw)
event_id = {'left_hand': 1, 'right_hand': 2}
epochs = mne.Epochs(raw, events, event_id, tmin=0, tmax=4, preload=True)

# Extract CSP features
csp_features, csp = extract_csp_features(epochs, n_components=8)
labels = epochs.events[:, 2]

# Train classifier
clf = NimbusLDA()
clf.fit(csp_features, labels)

# Evaluate
from sklearn.model_selection import cross_val_score
scores = cross_val_score(clf, csp_features, labels, cv=5)
print(f"Accuracy: {scores.mean():.2%} (+/- {scores.std():.2%})")
Key Points:
  • Python: Uses MNE for preprocessing, sklearn-compatible API
  • Julia: Uses pre-trained models with RxInfer.jl inference

Accuracy Tip

For 2-class motor imagery, expect 65-85% accuracy after proper CSP preprocessing and calibration.

4-Class Motor Imagery

Classify left hand, right hand, feet, and tongue movements:
from nimbus_bci import NimbusLDA
from nimbus_bci.compat import extract_csp_features
import numpy as np
import mne

# Load and preprocess EEG
raw = mne.io.read_raw_gdf("motor_imagery_4class.gdf", preload=True)
raw.filter(8, 30)

# Create epochs for 4 classes
events = mne.find_events(raw)
event_id = {
    'left_hand': 1,
    'right_hand': 2,
    'feet': 3,
    'tongue': 4
}
epochs = mne.Epochs(raw, events, event_id, tmin=0, tmax=4, preload=True)

# Extract CSP features (more components for 4-class)
csp_features, csp = extract_csp_features(epochs, n_components=16)
labels = epochs.events[:, 2]

# Train classifier
clf = NimbusLDA()
clf.fit(csp_features, labels)

# Predict
predictions = clf.predict(csp_features)
probabilities = clf.predict_proba(csp_features)

# Per-class analysis
for class_id in range(1, 5):
    class_name = list(event_id.keys())[class_id - 1]
    class_mask = labels == class_id
    class_acc = np.mean(predictions[class_mask] == class_id)
    print(f"{class_name} accuracy: {class_acc * 100:.1f}%")

# Overall metrics
accuracy = np.mean(predictions == labels)
print(f"\nOverall accuracy: {accuracy * 100:.1f}%")
print(f"Mean confidence: {np.mean(np.max(probabilities, axis=1)):.3f}")

Training Custom Motor Imagery Model

Train a model on your own data:
from nimbus_bci import NimbusLDA, NimbusGMM
from nimbus_bci.compat import extract_csp_features
from sklearn.model_selection import train_test_split
import numpy as np
import mne
import pickle

# Load and preprocess training data
# Minimum: 50 trials per class recommended
# Optimal: 200+ trials per class
raw = mne.io.read_raw_gdf("training_data.gdf", preload=True)
raw.filter(8, 30)

events = mne.find_events(raw)
event_id = {'left_hand': 1, 'right_hand': 2, 'feet': 3, 'tongue': 4}
epochs = mne.Epochs(raw, events, event_id, tmin=0, tmax=4, preload=True)

# Extract features
csp_features, csp = extract_csp_features(epochs, n_components=16)
labels = epochs.events[:, 2]

# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(
    csp_features, labels, test_size=0.2, random_state=42, stratify=labels
)

# Train NimbusLDA model (faster, shared covariance)
clf_lda = NimbusLDA(mu_scale=3.0)
clf_lda.fit(X_train, y_train)

# Or train NimbusGMM model (more flexible, class-specific covariances)
clf_gmm = NimbusGMM(mu_scale=3.0)
clf_gmm.fit(X_train, y_train)

# Save trained model
with open("my_motor_imagery_lda.pkl", "wb") as f:
    pickle.dump(clf_lda, f)

# Evaluate on test data
test_predictions = clf_lda.predict(X_test)
test_accuracy = np.mean(test_predictions == y_test)
print(f"Test accuracy: {test_accuracy * 100:.1f}%")

# Cross-validation for robust estimate
from sklearn.model_selection import cross_val_score
cv_scores = cross_val_score(clf_lda, csp_features, labels, cv=5)
print(f"CV accuracy: {cv_scores.mean() * 100:.1f}% (+/- {cv_scores.std() * 100:.1f}%)")
P300 BCIs detect attention-related brain responses to visual stimuli.

Binary P300 Classification

Detect target vs non-target stimuli:
from nimbus_bci import NimbusGMM  # GMM often better for P300
import numpy as np
import mne

# Load P300 EEG data
raw = mne.io.read_raw_fif("p300_oddball.fif", preload=True)
raw.filter(0.5, 10)  # P300 band

# Create epochs around stimulus
events = mne.find_events(raw)
event_id = {'target': 1, 'non-target': 2}
epochs = mne.Epochs(raw, events, event_id, tmin=-0.2, tmax=0.8, 
                    baseline=(-0.2, 0), preload=True)

# Extract ERP features (mean amplitude in time windows)
# Focus on P300 window (250-500ms)
data = epochs.get_data()  # (n_epochs, n_channels, n_times)
labels = epochs.events[:, 2]

# Extract mean amplitude from key electrodes (Pz, Cz, Fz)
ch_names = ['Pz', 'Cz', 'Fz']
ch_indices = [epochs.ch_names.index(ch) for ch in ch_names]

# Time window: 250-500ms (P300 peak)
times = epochs.times
p300_window = (times >= 0.25) & (times <= 0.5)

# Extract features: mean amplitude per channel in P300 window
features = np.mean(data[:, ch_indices, :][:, :, p300_window], axis=2)

# Train classifier (GMM better for P300's overlapping distributions)
clf = NimbusGMM()
clf.fit(features, labels)

# Predict
predictions = clf.predict(features)
probabilities = clf.predict_proba(features)

# Analyze target detection
target_mask = labels == 1
target_detected = np.sum(predictions[target_mask] == 1)
target_total = np.sum(target_mask)

print(f"Target detection rate: {100 * target_detected / target_total:.1f}%")
print(f"Mean target confidence: {np.mean(probabilities[target_mask, 0]):.3f}")

P300 Speller Application

Implement a P300-based speller:
from nimbus_bci import NimbusGMM
import numpy as np

# Train P300 speller model
clf = NimbusGMM()
# Assume we have training data from calibration phase
clf.fit(X_calib, y_calib)  # X_calib: ERP features, y_calib: target/non-target

# Spelling matrix (6x6 for 36 characters)
matrix = np.array([
    ['A', 'B', 'C', 'D', 'E', 'F'],
    ['G', 'H', 'I', 'J', 'K', 'L'],
    ['M', 'N', 'O', 'P', 'Q', 'R'],
    ['S', 'T', 'U', 'V', 'W', 'X'],
    ['Y', 'Z', '1', '2', '3', '4'],
    ['5', '6', '7', '8', '9', '0']
])

def spell_character(row_features, col_features):
    """
    Spell one character using row/column paradigm.
    
    Args:
        row_features: ERP features from 6 row flashes (6, n_features)
        col_features: ERP features from 6 column flashes (6, n_features)
    
    Returns:
        character, confidence
    """
    # Predict for rows
    row_probs = clf.predict_proba(row_features)[:, 1]  # P(target)
    target_row = np.argmax(row_probs)
    
    # Predict for columns
    col_probs = clf.predict_proba(col_features)[:, 1]  # P(target)
    target_col = np.argmax(col_probs)
    
    # Get character at intersection
    character = matrix[target_row, target_col]
    confidence = (row_probs[target_row] + col_probs[target_col]) / 2
    
    return character, confidence

# Spell a word
word = ""
for i in range(5):  # Spell 5 characters
    # Collect ERP features from row/column flashes
    row_features = collect_row_erps()  # Your acquisition function
    col_features = collect_col_erps()  # Your acquisition function
    
    character, confidence = spell_character(row_features, col_features)
    
    if confidence > 0.7:
        word += character
        print(f"Spelled: {character} (confidence: {confidence:.2f})")
    else:
        print("Low confidence - retry")

print(f"\nSpelled word: {word}")

Real-Time Streaming Examples

Streaming Motor Imagery Control

Real-time cursor control with streaming inference:
from nimbus_bci import NimbusLDA, StreamingSession
from nimbus_bci.data import BCIMetadata
import numpy as np

# Train or load model
clf = NimbusLDA()
clf.fit(X_train, y_train)

# Configure streaming metadata
metadata = BCIMetadata(
    sampling_rate=250.0,
    paradigm="motor_imagery",
    feature_type="csp",
    n_features=16,
    n_classes=4,
    chunk_size=250  # 1-second chunks
)

# Initialize streaming session
session = StreamingSession(clf.model_, metadata)

# Process real-time EEG chunks
cursor_position = [0.0, 0.0]  # [x, y]
movement_speed = 0.05

for chunk in eeg_stream:  # Your real-time acquisition loop
    # Process chunk (16 features × 250 samples)
    chunk_result = session.process_chunk(chunk)
    
    # Update cursor based on prediction
    if chunk_result.confidence > 0.7:
        if chunk_result.prediction == 0:  # Left hand
            cursor_position[0] -= movement_speed
        elif chunk_result.prediction == 1:  # Right hand
            cursor_position[0] += movement_speed
        elif chunk_result.prediction == 2:  # Feet
            cursor_position[1] -= movement_speed
        elif chunk_result.prediction == 3:  # Tongue
            cursor_position[1] += movement_speed
        
        update_cursor_display(cursor_position)
    
    # Reset for next chunk
    session.reset()

Adaptive Threshold Control

Adjust control sensitivity based on performance:
from nimbus_bci import StreamingSession
from collections import deque
import numpy as np

# Setup streaming
session = StreamingSession(clf.model_, metadata)

# Performance tracker
class OnlinePerformanceTracker:
    def __init__(self, window_size=20):
        self.window_size = window_size
        self.predictions = deque(maxlen=window_size)
        self.ground_truths = deque(maxlen=window_size)
    
    def update(self, prediction, ground_truth):
        self.predictions.append(prediction)
        self.ground_truths.append(ground_truth)
        
        if len(self.predictions) > 0:
            accuracy = np.mean(np.array(self.predictions) == np.array(self.ground_truths))
            return accuracy
        return 0.0

tracker = OnlinePerformanceTracker(window_size=20)

# Adaptive parameters
confidence_threshold = 0.7
update_interval = 10
trial_count = 0

for trial in trials:
    trial_count += 1
    
    # Process trial chunks
    for chunk in collect_trial_chunks():
        result = session.process_chunk(chunk)
        
        # Execute command if confidence exceeds threshold
        if result.confidence > confidence_threshold:
            execute_command(result.prediction)
    
    final_result = session.finalize_trial()
    
    # Update tracker with ground truth (if available)
    if ground_truth is not None:
        accuracy = tracker.update(final_result.prediction, ground_truth)
        
        # Adapt threshold every 10 trials
        if trial_count % update_interval == 0:
            if accuracy > 0.85:
                # High accuracy - can lower threshold for faster response
                confidence_threshold = max(0.6, confidence_threshold * 0.95)
                print(f"Lowering threshold to {confidence_threshold:.2f}")
            elif accuracy < 0.65:
                # Low accuracy - raise threshold for reliability
                confidence_threshold = min(0.85, confidence_threshold * 1.05)
                print(f"Raising threshold to {confidence_threshold:.2f}")
    
    session.reset()

Subject-Specific Calibration

Quick Calibration for New Users

Personalize a model with minimal calibration data:
from nimbus_bci import NimbusLDA
from sklearn.model_selection import train_test_split
import numpy as np
import pickle

# Load or use a pre-trained baseline model
# For Python SDK, we can use online learning (partial_fit)
baseline_clf = NimbusLDA()
# Train baseline on large dataset
baseline_clf.fit(X_baseline, y_baseline)

# Collect quick calibration (10-20 trials per class)
print("Collecting calibration data...")
X_calib, y_calib = collect_calibration_trials(trials_per_class=15)

# Split for validation
X_calib_train, X_val, y_calib_train, y_val = train_test_split(
    X_calib, y_calib, test_size=0.3, stratify=y_calib, random_state=42
)

# Personalize model using online learning
print("Calibrating model...")
personalized_clf = NimbusLDA()
personalized_clf.fit(X_baseline, y_baseline)  # Start with baseline

# Fine-tune on calibration data
for _ in range(5):  # Multiple passes through calibration
    personalized_clf.partial_fit(X_calib_train, y_calib_train)

# Save personalized model
with open("subject_123_personalized.pkl", "wb") as f:
    pickle.dump(personalized_clf, f)

# Test improvement
print("\nTesting on validation data...")
baseline_pred = baseline_clf.predict(X_val)
personalized_pred = personalized_clf.predict(X_val)

baseline_acc = np.mean(baseline_pred == y_val)
personalized_acc = np.mean(personalized_pred == y_val)

print(f"Baseline accuracy: {baseline_acc * 100:.1f}%")
print(f"Personalized accuracy: {personalized_acc * 100:.1f}%")
print(f"Improvement: +{(personalized_acc - baseline_acc) * 100:.1f}%")

Quality Assessment and Diagnostics

Identify Poor Quality Trials

from nimbus_bci import NimbusLDA
import numpy as np

# Run inference
predictions = clf.predict(X)
probabilities = clf.predict_proba(X)
confidences = np.max(probabilities, axis=1)

# Identify low-confidence trials
low_conf_threshold = 0.65
low_conf_indices = np.where(confidences < low_conf_threshold)[0]

print(f"Low confidence trials: {len(low_conf_indices)}/{len(predictions)}")
print(f"Indices: {low_conf_indices}")

# Analyze patterns in low-confidence trials
if len(low_conf_indices) > 0:
    print("\nLow confidence trial analysis:")
    for class_id, class_name in enumerate(["Left", "Right", "Feet", "Tongue"]):
        class_low_conf = np.sum(y[low_conf_indices] == class_id)
        print(f"  {class_name}: {class_low_conf} trials")

# Overall quality assessment
mean_confidence = np.mean(confidences)
print(f"\nMean confidence: {mean_confidence:.2f}")

if mean_confidence > 0.8:
    print("Recommendation: High quality - proceed with deployment")
elif mean_confidence > 0.65:
    print("Recommendation: Acceptable quality - monitor performance")
else:
    print("Recommendation: Poor quality - recalibrate or retrain")

Preprocessing Quality Check

import numpy as np
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

# Check for common preprocessing issues
def diagnose_preprocessing(X, y):
    """
    Diagnose common preprocessing quality issues.
    
    Args:
        X: Feature matrix (n_samples, n_features)
        y: Labels (n_samples,)
    
    Returns:
        Dictionary with quality metrics and recommendations
    """
    errors = []
    warnings = []
    recommendations = []
    
    # Check for NaN/Inf
    if np.any(np.isnan(X)) or np.any(np.isinf(X)):
        errors.append("NaN or Inf values detected in features")
    
    # Check feature variance
    variances = np.var(X, axis=0)
    low_var_features = np.sum(variances < 1e-6)
    if low_var_features > 0:
        warnings.append(f"{low_var_features} features have very low variance")
        recommendations.append("Consider removing low-variance features")
    
    # Check feature scale
    means = np.mean(X, axis=0)
    stds = np.std(X, axis=0)
    if np.max(stds) / np.min(stds[stds > 0]) > 100:
        warnings.append("Features have very different scales")
        recommendations.append("Apply feature normalization (z-score or min-max)")
    
    # Feature discriminability (using LDA)
    lda = LinearDiscriminantAnalysis()
    lda.fit(X, y)
    feature_scores = np.abs(lda.coef_).mean(axis=0)
    
    # Quality score
    quality_score = 1.0
    quality_score -= 0.5 * (len(errors) > 0)
    quality_score -= 0.1 * len(warnings)
    
    return {
        'errors': errors,
        'warnings': warnings,
        'recommendations': recommendations,
        'feature_scores': feature_scores,
        'quality_score': quality_score
    }

# Run diagnosis
report = diagnose_preprocessing(X, y)

print("Preprocessing Quality Report")
print("=" * 50)
print(f"Overall score: {report['quality_score'] * 100:.1f}%")

if report['errors']:
    print("\n⚠️  ERRORS:")
    for error in report['errors']:
        print(f"  • {error}")

if report['warnings']:
    print("\n⚠️  WARNINGS:")
    for warning in report['warnings']:
        print(f"  • {warning}")

if report['recommendations']:
    print("\n💡 RECOMMENDATIONS:")
    for rec in report['recommendations']:
        print(f"  • {rec}")

# Top discriminative features
print("\nTop discriminative features:")
top_features = np.argsort(report['feature_scores'])[-5:][::-1]
for rank, feat_idx in enumerate(top_features, 1):
    print(f"  {rank}. Feature {feat_idx} (score: {report['feature_scores'][feat_idx]:.3f})")

Performance Comparison

RxLDA vs RxGMM

Compare the two models:
from nimbus_bci import NimbusLDA, NimbusGMM
import numpy as np

# Train both models on same data
print("Training NimbusLDA...")
clf_lda = NimbusLDA()
clf_lda.fit(X_train, y_train)

print("\nTraining NimbusGMM...")
clf_gmm = NimbusGMM()
clf_gmm.fit(X_train, y_train)

# Test both models
print("\n" + "=" * 50)
print("Model Comparison")
print("=" * 50)

for clf, name in [(clf_lda, "NimbusLDA"), (clf_gmm, "NimbusGMM")]:
    predictions = clf.predict(X_test)
    probabilities = clf.predict_proba(X_test)
    
    accuracy = np.mean(predictions == y_test)
    mean_conf = np.mean(np.max(probabilities, axis=1))
    
    print(f"\n{name}:")
    print(f"  Accuracy: {accuracy * 100:.1f}%")
    print(f"  Mean confidence: {mean_conf:.3f}")
    
    # Per-class performance
    from sklearn.metrics import classification_report
    print("\n  Per-class metrics:")
    print(classification_report(y_test, predictions, 
                                target_names=["Left", "Right", "Feet", "Tongue"],
                                zero_division=0))

print("\nRecommendation:")
print("  • NimbusLDA: Faster, good for well-separated classes")
print("  • NimbusGMM: More flexible, better for overlapping distributions")

Next Steps


All examples use actual NimbusSDK.jl functions and the real RxLDA/RxGMM models. No mocked or simulated code.