Skip to main content

Advanced BCI Applications

This section demonstrates sophisticated applications of Nimbus for complex BCI scenarios. Examples shown in both Python and Julia.
Python SDK: See Streaming Inference and sklearn Integration for Python-specific advanced patterns.

Cross-Subject Model Training

Train models that generalize across multiple subjects.

Multi-Subject Training Dataset

from nimbus_bci import NimbusLDA
from nimbus_bci import estimate_normalization_params, apply_normalization
import numpy as np

# Collect data from multiple subjects
subjects = ["S001", "S002", "S003", "S004", "S005"]
all_features = []
all_labels = []

for subject_id in subjects:
    print(f"Loading subject {subject_id}...")
    subj_features, subj_labels = load_subject_data(subject_id)
    all_features.append(subj_features)
    all_labels.append(subj_labels)

# Concatenate all subject data
X = np.vstack(all_features)
y = np.concatenate(all_labels)

print(f"\nCombined dataset:")
print(f"  Total trials: {X.shape[0]}")
print(f"  Features: {X.shape[1]}")
print(f"  Subjects: {len(subjects)}")

# Normalize across all subjects
norm_params = estimate_normalization_params(X, method="zscore")
X_norm = apply_normalization(X, norm_params)

# Train cross-subject model
clf = NimbusLDA(mu_scale=5.0)  # Higher regularization for cross-subject
clf.fit(X_norm, y)

print(f"Cross-subject model trained")

Transfer to New Subject

Use cross-subject model as baseline for new subjects:
from nimbus_bci import NimbusLDA
import numpy as np
import pickle

# Load cross-subject baseline
with open("cross_subject_model.pkl", "rb") as f:
    baseline_clf = pickle.load(f)

# Collect minimal calibration from new subject (Subject 006)
X_calib, y_calib = collect_calibration_trials(
    subject_id="S006",
    trials_per_class=12  # Only 12 trials per class needed!
)

# Personalize using online learning
personalized_clf = NimbusLDA()
personalized_clf.fit(X_baseline, y_baseline)  # Initialize with baseline

# Fine-tune on new subject's calibration data
for _ in range(10):  # Multiple passes for adaptation
    personalized_clf.partial_fit(X_calib, y_calib)

# Test on new subject
X_test_new, y_test_new = load_subject_test_data("S006")
predictions = personalized_clf.predict(X_test_new)

accuracy = np.mean(predictions == y_test_new)
print(f"New subject accuracy: {accuracy * 100:.1f}%")
print(f"(With only 48 calibration trials total!)")

Hybrid BCI: Combining Multiple Paradigms

Combine motor imagery and P300 for robust control.

Dual-Paradigm System

from nimbus_bci import NimbusLDA, NimbusGMM
from dataclasses import dataclass
from typing import Optional
import numpy as np
import pickle

# Load both models
with open("motor_imagery_4class.pkl", "rb") as f:
    mi_clf = pickle.load(f)
with open("p300_binary.pkl", "rb") as f:
    p300_clf = pickle.load(f)

@dataclass
class HybridBCIResult:
    mi_prediction: int
    mi_confidence: float
    p300_confirmed: bool
    p300_confidence: float
    final_action: Optional[int]

def hybrid_inference(mi_features, p300_features):
    """
    Combined motor imagery + P300 for high-confidence control.
    
    Args:
        mi_features: Motor imagery features (1, n_features)
        p300_features: P300 ERP features (1, n_features)
    
    Returns:
        HybridBCIResult with combined inference
    """
    # Motor imagery: Determine intended direction
    mi_pred = mi_clf.predict(mi_features)[0]
    mi_probs = mi_clf.predict_proba(mi_features)[0]
    mi_conf = np.max(mi_probs)
    
    # P300: Confirm intention
    p300_pred = p300_clf.predict(p300_features)[0]
    p300_probs = p300_clf.predict_proba(p300_features)[0]
    p300_confirmed = (p300_pred == 1)  # Target detected
    p300_conf = p300_probs[1] if p300_confirmed else p300_probs[0]
    
    # Decision logic: Require both high confidence
    final_action = None
    if mi_conf > 0.75 and p300_confirmed and p300_conf > 0.75:
        final_action = mi_pred
    
    return HybridBCIResult(
        mi_prediction=mi_pred,
        mi_confidence=mi_conf,
        p300_confirmed=p300_confirmed,
        p300_confidence=p300_conf,
        final_action=final_action
    )

# Example usage: Wheelchair control
def hybrid_wheelchair_control():
    while is_active():
        # Collect motor imagery trial (direction intent)
        mi_features = collect_mi_trial()
        
        # Collect P300 confirmation trial
        p300_features = collect_p300_confirmation()
        
        result = hybrid_inference(mi_features, p300_features)
        
        if result.final_action is not None:
            print(f"✓ Confirmed action: {result.final_action}")
            execute_wheelchair_command(result.final_action)
        else:
            print("⚠️  Low confidence - no action")
            print(f"  MI: {result.mi_confidence:.2f}")
            print(f"  P300: {result.p300_confirmed} ({result.p300_confidence:.2f})")

Continuous Control with Smoothing

Implement smooth continuous control for cursor/prosthetics.

Exponential Moving Average

from nimbus_bci import StreamingSession
from collections import deque
import numpy as np

# Setup streaming
session = StreamingSession(clf.model_, metadata)

# Smoothing filter class
class MovingAverageFilter:
    def __init__(self, window_size=5, n_classes=4):
        self.predictions = deque(maxlen=window_size)
        self.confidences = deque(maxlen=window_size)
        self.window_size = window_size
        self.n_classes = n_classes
    
    def smooth_prediction(self, new_pred, new_conf):
        """Apply moving average smoothing with confidence weighting."""
        self.predictions.append(new_pred)
        self.confidences.append(new_conf)
        
        # Weighted voting by confidence
        class_scores = np.zeros(self.n_classes)
        for pred, conf in zip(self.predictions, self.confidences):
            class_scores[pred] += conf
        
        # Smoothed prediction
        smoothed_pred = np.argmax(class_scores)
        smoothed_conf = np.max(class_scores) / np.sum(class_scores)
        
        return smoothed_pred, smoothed_conf

# Real-time control with smoothing
filter = MovingAverageFilter(window_size=5, n_classes=4)

for chunk in eeg_stream:
    # Get raw prediction
    raw_result = session.process_chunk(chunk)
    
    # Apply smoothing
    smoothed_pred, smoothed_conf = filter.smooth_prediction(
        raw_result.prediction, raw_result.confidence
    )
    
    # Execute smoothed command
    if smoothed_conf > 0.7:
        execute_smooth_control(smoothed_pred, smoothed_conf)
    
    session.reset()

Velocity-Based Control

Control velocity instead of discrete actions:
from nimbus_bci import StreamingSession
import numpy as np

def velocity_control(clf, metadata, eeg_stream):
    """Continuous velocity-based BCI control."""
    session = StreamingSession(clf.model_, metadata)
    
    velocity = np.array([0.0, 0.0])  # [vx, vy]
    max_velocity = 1.0
    acceleration = 0.1
    friction = 0.95
    
    for chunk in eeg_stream:
        result = session.process_chunk(chunk)
        
        if result.confidence > 0.7:
            # Map predictions to velocity changes
            if result.prediction == 0:  # Left
                velocity[0] -= acceleration * result.confidence
            elif result.prediction == 1:  # Right
                velocity[0] += acceleration * result.confidence
            elif result.prediction == 2:  # Up
                velocity[1] += acceleration * result.confidence
            elif result.prediction == 3:  # Down
                velocity[1] -= acceleration * result.confidence
        
        # Apply friction
        velocity *= friction
        
        # Clamp velocity
        velocity = np.clip(velocity, -max_velocity, max_velocity)
        
        # Update position
        update_cursor_velocity(velocity)
        
        session.reset()

Adaptive Learning During Use

Update model parameters online based on implicit feedback.

Online Model Adaptation

from nimbus_bci import NimbusLDA
import numpy as np

# Start with baseline classifier
clf = NimbusLDA()
clf.fit(X_baseline, y_baseline)

# Collect online data with implicit labels
online_features = []
online_labels = []

adaptation_interval = 20  # Adapt every 20 trials

for trial_idx in range(1, 201):
    # Collect trial
    trial_features = collect_trial()  # (1, n_features)
    
    # Get prediction
    prediction = clf.predict(trial_features)[0]
    proba = clf.predict_proba(trial_features)[0]
    confidence = np.max(proba)
    
    # Implicit labeling: Assume high-confidence predictions are correct
    if confidence > 0.85:
        online_features.append(trial_features[0])
        online_labels.append(prediction)
        
        print(f"Trial {trial_idx}: Confident prediction added to adaptation set")
    
    # Periodic adaptation using partial_fit
    if trial_idx % adaptation_interval == 0 and len(online_labels) >= 10:
        print(f"\n=== Adapting model at trial {trial_idx} ===")
        
        # Create adaptation dataset
        X_adapt = np.array(online_features)
        y_adapt = np.array(online_labels)
        
        # Online learning: update model with new data
        for _ in range(5):  # Multiple passes for better adaptation
            clf.partial_fit(X_adapt, y_adapt)
        
        print(f"Model adapted with {len(online_labels)} trials")
        
        # Keep sliding window (keep last 10 trials)
        if len(online_features) > 10:
            online_features = online_features[-10:]
            online_labels = online_labels[-10:]

Multi-Session Experiment Design

Run systematic experiments across multiple sessions.

Experiment Framework

from nimbus_bci import NimbusLDA, NimbusGMM
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import numpy as np
import pickle

@dataclass
class ExperimentSession:
    session_id: str
    subject_id: str
    date: datetime
    model_type: str  # 'lda' or 'gmm'
    n_trials: int
    predictions: Optional[np.ndarray] = None
    probabilities: Optional[np.ndarray] = None
    accuracy: Optional[float] = None
    mean_confidence: Optional[float] = None

def run_experiment_session(
    subject_id: str,
    session_id: str,
    model_type: str,
    n_trials: int
):
    """Run a single experiment session."""
    print("\n" + "=" * 60)
    print(f"Experiment Session: {session_id}")
    print(f"Subject: {subject_id}")
    print(f"Model: {model_type}")
    print(f"Date: {datetime.now()}")
    print("=" * 60)
    
    # Load appropriate model
    if model_type == 'lda':
        with open("motor_imagery_4class_lda.pkl", "rb") as f:
            clf = pickle.load(f)
    else:
        with open("motor_imagery_4class_gmm.pkl", "rb") as f:
            clf = pickle.load(f)
    
    # Collect data
    print(f"\nCollecting {n_trials} trials...")
    X, y = collect_experiment_trials(n_trials)
    
    # Run inference
    print("Running inference...")
    predictions = clf.predict(X)
    probabilities = clf.predict_proba(X)
    
    # Calculate metrics
    accuracy = np.mean(predictions == y)
    mean_confidence = np.mean(np.max(probabilities, axis=1))
    
    print("\n" + "=" * 60)
    print("Session Results")
    print("=" * 60)
    print(f"Accuracy: {accuracy * 100:.1f}%")
    print(f"Mean confidence: {mean_confidence:.3f}")
    print("=" * 60)
    
    # Return session data
    return ExperimentSession(
        session_id=session_id,
        subject_id=subject_id,
        date=datetime.now(),
        model_type=model_type,
        n_trials=n_trials,
        predictions=predictions,
        probabilities=probabilities,
        accuracy=accuracy,
        mean_confidence=mean_confidence
    )
Example: Run multi-session experiment
import numpy as np

def run_multi_session_experiment():
    """Run experiments across multiple subjects and sessions."""
    subjects = ["S001", "S002", "S003"]
    sessions = []
    
    for subject in subjects:
        # Session 1: NimbusLDA baseline
        session1 = run_experiment_session(
            subject_id=subject,
            session_id=f"{subject}_session1_lda",
            model_type="lda",
            n_trials=80
        )
        sessions.append(session1)
        
        # Session 2: NimbusGMM comparison
        session2 = run_experiment_session(
            subject_id=subject,
            session_id=f"{subject}_session2_gmm",
            model_type="gmm",
            n_trials=80
        )
        sessions.append(session2)
    
    # Aggregate results
    print("\n\n" + "=" * 60)
    print("Experiment Summary")
    print("=" * 60)
    
    for session in sessions:
        print(f"\n{session.session_id}:")
        print(f"  Accuracy: {session.accuracy * 100:.1f}%")
        print(f"  Mean confidence: {session.mean_confidence:.3f}")
    
    # Statistical analysis
    lda_accs = [s.accuracy for s in sessions if s.model_type == 'lda']
    gmm_accs = [s.accuracy for s in sessions if s.model_type == 'gmm']
    
    print("\n" + "=" * 60)
    print("Model Comparison")
    print("=" * 60)
    print(f"NimbusLDA mean accuracy: {np.mean(lda_accs) * 100:.1f}% ± {np.std(lda_accs) * 100:.1f}%")
    print(f"NimbusGMM mean accuracy: {np.mean(gmm_accs) * 100:.1f}% ± {np.std(gmm_accs) * 100:.1f}%")
    
    return sessions

# Run the experiment
sessions = run_multi_session_experiment()

Error Recovery and Robustness

Handle errors and maintain reliable operation.

Automatic Retry with Fallback

from nimbus_bci import NimbusLDA
from dataclasses import dataclass
from typing import Optional
import time
import numpy as np

@dataclass
class InferenceResult:
    success: bool
    predictions: Optional[np.ndarray]
    probabilities: Optional[np.ndarray]
    attempts: int

def robust_inference(clf, X, max_retries=3, confidence_threshold=0.65):
    """Inference with automatic retry and fallback."""
    
    for attempt in range(1, max_retries + 1):
        try:
            predictions = clf.predict(X)
            probabilities = clf.predict_proba(X)
            
            # Quality check
            mean_confidence = np.mean(np.max(probabilities, axis=1))
            
            if mean_confidence > confidence_threshold:
                return InferenceResult(
                    success=True,
                    predictions=predictions,
                    probabilities=probabilities,
                    attempts=attempt
                )
            else:
                print(f"⚠️  Low quality results (confidence: {mean_confidence:.3f}) on attempt {attempt}")
                if attempt < max_retries:
                    print("Retrying...")
                    continue
            
        except Exception as e:
            print(f"❌ Inference failed: {e} (attempt {attempt})")
            
            if attempt < max_retries:
                print("Retrying after error...")
                time.sleep(0.1)  # Brief pause before retry
                continue
            else:
                # All retries exhausted
                return InferenceResult(
                    success=False,
                    predictions=None,
                    probabilities=None,
                    attempts=attempt
                )
    
    # If we get here, all retries failed
    return InferenceResult(
        success=False,
        predictions=None,
        probabilities=None,
        attempts=max_retries
    )

# Usage
result = robust_inference(clf, X)

if result.success:
    print(f"✓ Inference successful after {result.attempts} attempt(s)")
    process_results(result.predictions)
else:
    print(f"⚠️  Inference failed after {result.attempts} attempts")
    handle_failure()

Session Health Monitoring

from nimbus_bci import NimbusLDA
from dataclasses import dataclass, field
from collections import deque
import numpy as np

@dataclass
class SessionHealth:
    trial_count: int = 0
    success_count: int = 0
    failure_count: int = 0
    mean_confidence: float = 0.0
    recent_accuracies: list = field(default_factory=list)

def monitor_session_health(clf, eeg_stream, health_window=10):
    """Monitor BCI session health with real-time metrics."""
    health = SessionHealth()
    recent_acc_window = deque(maxlen=health_window)
    
    for trial in eeg_stream:
        health.trial_count += 1
        
        try:
            # Run inference
            X_trial = trial['features'].reshape(1, -1)
            y_trial = trial.get('label')
            
            prediction = clf.predict(X_trial)[0]
            proba = clf.predict_proba(X_trial)[0]
            confidence = np.max(proba)
            
            # Update health metrics
            health.success_count += 1
            health.mean_confidence = (
                (health.mean_confidence * (health.success_count - 1) + confidence) /
                health.success_count
            )
            
            # Track recent accuracy
            if y_trial is not None:
                correct = (prediction == y_trial)
                recent_acc_window.append(float(correct))
            
            # Health checks
            if health.mean_confidence < 0.6:
                print(f"⚠️  Mean confidence dropping: {health.mean_confidence:.3f}")
            
            if len(recent_acc_window) >= health_window:
                recent_acc = np.mean(list(recent_acc_window))
                if recent_acc < 0.65:
                    print(f"⚠️  Recent accuracy low - consider recalibration: {recent_acc:.3f}")
            
        except Exception as e:
            health.failure_count += 1
            print(f"❌ Trial failed: {e} (trial #{health.trial_count})")
            
            if health.failure_count / health.trial_count > 0.1:
                raise RuntimeError("Session failure rate too high - aborting")
        
        # Periodic health report
        if health.trial_count % 20 == 0:
            print("\n=== Session Health Report ===")
            print(f"Trials: {health.trial_count}")
            print(f"Success rate: {100 * health.success_count / health.trial_count:.1f}%")
            print(f"Mean confidence: {health.mean_confidence:.3f}")
            if len(recent_acc_window) > 0:
                print(f"Recent accuracy: {np.mean(list(recent_acc_window)) * 100:.1f}%")
            print("=" * 30)
    
    return health

Next Steps


All examples use actual NimbusSDK.jl functions and real Bayesian LDA (RxLDA) and Bayesian GMM (RxGMM) models. These are production-ready patterns used in real BCI applications.