Skip to main content

Development Guide

This guide covers the development workflow for building BCI applications with Nimbus BCI in both Python and Julia, from local setup to production deployment.
Choose Your SDK:
  • Python SDK (nimbus-bci): Ideal for sklearn integration, MNE-Python workflows, and rapid prototyping
  • Julia SDK (NimbusSDK.jl): Optimal for research applications requiring RxInfer.jl and custom Bayesian models
Both SDKs provide the same core Bayesian classifiers with language-specific ergonomics.

Development Environment Setup

Prerequisites

Required:
  • Python 3.9+ (3.10+ recommended)
  • pip or conda for package management
  • Git for version control
  • EEG Hardware or sample data for testing
Recommended:
  • VSCode with Python extension or PyCharm
  • Jupyter for interactive development
  • MNE-Python for EEG preprocessing
  • scikit-learn for ML pipelines

Installation

1

Install Python

Ensure Python 3.9+ is installed:
# Verify installation
python --version
# Expected: Python 3.9.0 (or later)
2

Create Virtual Environment

Set up an isolated Python environment for your BCI project:
# Create project directory
mkdir my-bci-project
cd my-bci-project

# Create virtual environment
python -m venv venv

# Activate (Linux/Mac)
source venv/bin/activate

# Activate (Windows)
# venv\Scripts\activate
3

Install nimbus-bci

Install from PyPI:
# Install nimbus-bci
pip install nimbus-bci

# Optional: Install with MNE-Python
pip install "nimbus-bci[mne]"

# Verify installation
python -c "import nimbus_bci; print('✓ nimbus-bci installed')"
4

Install Dependencies

For complete BCI workflow:
# Data science stack
pip install numpy scipy scikit-learn matplotlib

# EEG preprocessing (optional)
pip install mne

# Create requirements.txt
pip freeze > requirements.txt

Project Structure

Organize your Python BCI project:
my-bci-project/
├── requirements.txt      # Python dependencies
├── setup.py              # Package configuration (optional)
├── src/
│   ├── __init__.py
│   ├── preprocessing.py  # Feature extraction
│   ├── training.py       # Model training
│   ├── inference.py      # Real-time inference
│   └── utils.py          # Utility functions
├── data/
│   ├── raw/              # Raw EEG recordings
│   ├── processed/        # Preprocessed features
│   └── models/           # Trained models (pickled)
├── tests/
│   ├── __init__.py
│   ├── test_training.py
│   └── test_inference.py
├── notebooks/            # Jupyter notebooks for exploration
│   └── motor_imagery_demo.ipynb
└── README.md

Development Workflow

1. Data Preparation

Preprocessing Pipeline

# src/preprocessing.py
import numpy as np
from scipy import signal
from sklearn.covariance import LedoitWolf

def extract_csp_features(eeg_data: np.ndarray, csp_filters: np.ndarray) -> np.ndarray:
    """
    Extract CSP features for motor imagery BCI.
    
    Args:
        eeg_data: EEG data [n_channels × n_samples]
        csp_filters: CSP spatial filters [n_components × n_channels]
    
    Returns:
        features: Log-variance features [n_components]
    """
    # Apply spatial filters
    spatial_filtered = csp_filters @ eeg_data
    
    # Compute log-variance features
    features = np.log(np.var(spatial_filtered, axis=1) + 1e-10)
    
    return features

def extract_erp_features(eeg_epoch: np.ndarray, time_windows: list) -> np.ndarray:
    """
    Extract ERP amplitude features for P300 BCI.
    
    Args:
        eeg_epoch: EEG epoch [n_channels × n_samples]
        time_windows: List of (start_sample, end_sample) tuples
    
    Returns:
        features: Mean amplitude features
    """
    features = []
    
    for start_idx, end_idx in time_windows:
        window_data = eeg_epoch[:, start_idx:end_idx]
        # Mean amplitude in window for each channel
        features.extend(np.mean(window_data, axis=1))
    
    return np.array(features)

def bandpass_filter(data: np.ndarray, low_freq: float, high_freq: float, fs: float) -> np.ndarray:
    """
    Apply bandpass filter to EEG data.
    
    Args:
        data: EEG data [n_channels × n_samples]
        low_freq: Low frequency cutoff (Hz)
        high_freq: High frequency cutoff (Hz)
        fs: Sampling frequency (Hz)
    
    Returns:
        filtered_data: Bandpass filtered EEG
    """
    # Design Butterworth bandpass filter
    nyq = fs / 2.0
    low = low_freq / nyq
    high = high_freq / nyq
    b, a = signal.butter(4, [low, high], btype='band')
    
    # Apply filter to each channel
    filtered = np.zeros_like(data)
    for ch in range(data.shape[0]):
        filtered[ch, :] = signal.filtfilt(b, a, data[ch, :])
    
    return filtered

def compute_csp_filters(X_class1: np.ndarray, X_class2: np.ndarray, n_components: int = 6) -> np.ndarray:
    """
    Compute CSP filters from two classes of EEG data.
    
    Args:
        X_class1: EEG trials for class 1 [n_trials × n_channels × n_samples]
        X_class2: EEG trials for class 2 [n_trials × n_channels × n_samples]
        n_components: Number of CSP components to return
    
    Returns:
        csp_filters: CSP spatial filters [n_components × n_channels]
    """
    # Compute covariance matrices
    cov1 = LedoitWolf().fit(X_class1.reshape(X_class1.shape[0], -1)).covariance_
    cov2 = LedoitWolf().fit(X_class2.reshape(X_class2.shape[0], -1)).covariance_
    
    # Solve generalized eigenvalue problem
    eigenvalues, eigenvectors = np.linalg.eig(np.linalg.inv(cov1 + cov2) @ cov1)
    
    # Sort by eigenvalue
    idx = np.argsort(eigenvalues)[::-1]
    eigenvectors = eigenvectors[:, idx]
    
    # Return top and bottom components
    n_half = n_components // 2
    csp_filters = np.vstack([eigenvectors[:, :n_half].T, eigenvectors[:, -n_half:].T])
    
    return csp_filters

2. Model Training

Training Workflow

# src/training.py
from nimbus_bci import NimbusLDA
import numpy as np
import pickle
from datetime import date
from sklearn.model_selection import train_test_split

def train_motor_imagery_model(data_file: str):
    """
    Complete training workflow for motor imagery BCI.
    
    Args:
        data_file: Path to preprocessed feature file
    
    Returns:
        trained_model: Trained NimbusLDA classifier
    """
    print("="*60)
    print("Motor Imagery Model Training")
    print("="*60)
    
    # 1. Load and preprocess data
    print("\n1. Loading data...")
    features, labels = load_features(data_file)
    # features: (n_trials, n_features)
    # labels: (n_trials,)
    
    # 2. Validate data
    print("2. Validating data...")
    check_data_quality(features, labels)
    
    # 3. Split into train/test
    print("3. Splitting train/test...")
    X_train, X_test, y_train, y_test = train_test_split(
        features, labels, test_size=0.2, random_state=42, stratify=labels
    )
    
    # 4. Train model
    print("4. Training NimbusLDA model...")
    clf = NimbusLDA(mu_scale=3.0)
    clf.fit(X_train, y_train)
    
    # 5. Evaluate
    print("\n5. Evaluating on test set...")
    predictions = clf.predict(X_test)
    probabilities = clf.predict_proba(X_test)
    
    accuracy = np.mean(predictions == y_test)
    mean_conf = np.mean(np.max(probabilities, axis=1))
    
    print("\n" + "="*60)
    print("Training Results")
    print("="*60)
    print(f"Test Accuracy: {accuracy * 100:.1f}%")
    print(f"Mean Confidence: {mean_conf:.3f}")
    
    # 6. Save model
    save_path = f"data/models/motor_imagery_{date.today()}.pkl"
    with open(save_path, 'wb') as f:
        pickle.dump(clf, f)
    print(f"\n✓ Model saved to: {save_path}")
    
    return clf

def check_data_quality(features: np.ndarray, labels: np.ndarray):
    """Validate training data quality"""
    # Check for NaN/Inf
    if np.any(np.isnan(features)) or np.any(np.isinf(features)):
        raise ValueError("Features contain NaN or Inf values")
    
    # Check shapes
    n_trials, n_features = features.shape
    assert len(labels) == n_trials, "Labels must match number of trials"
    
    # Check label distribution
    n_classes = len(np.unique(labels))
    print(f"   {n_trials} trials, {n_features} features, {n_classes} classes")
    print(f"   Feature range: [{features.min():.3f}, {features.max():.3f}]")

3. Real-Time Inference

Streaming Application

# src/inference.py
from nimbus_bci import NimbusLDA, StreamingSession
from nimbus_bci.data import BCIMetadata
import numpy as np
import pickle
import time

def run_realtime_bci(model_path: str, duration_seconds: int = 60):
    """
    Run real-time BCI application.
    
    Args:
        model_path: Path to trained model file (.pkl)
        duration_seconds: Duration to run in seconds
    """
    print("="*60)
    print("Real-Time BCI Application")
    print("="*60)
    
    # 1. Load model
    print("\nLoading model...")
    with open(model_path, 'rb') as f:
        clf = pickle.load(f)
    print(f"✓ Model loaded: {model_path}")
    
    # 2. Configure streaming
    metadata = BCIMetadata(
        sampling_rate=250.0,
        paradigm="motor_imagery",
        feature_type="csp",
        n_features=16,
        n_classes=4,
        chunk_size=250  # 1-second chunks
    )
    
    # 3. Initialize streaming session
    print("Initializing streaming session...")
    session = StreamingSession(clf.model_, metadata)
    print("✓ Session initialized")
    
    # 4. Real-time loop
    print("\n" + "="*60)
    print(f"Starting real-time processing ({duration_seconds}s)")
    print("="*60)
    
    chunk_count = 0
    start_time = time.time()
    latencies = []
    confidences = []
    
    while (time.time() - start_time) < duration_seconds:
        # Acquire EEG chunk (replace with your hardware interface)
        raw_chunk = acquire_eeg_chunk()  # Your hardware interface
        
        # Extract features
        feature_chunk = extract_csp_features(raw_chunk)  # (16, 250)
        
        # Time inference
        t_start = time.perf_counter()
        result = session.process_chunk(feature_chunk)
        latency_ms = (time.perf_counter() - t_start) * 1000
        
        chunk_count += 1
        latencies.append(latency_ms)
        confidences.append(result.confidence)
        
        # Execute BCI command
        if result.confidence > 0.75:
            execute_bci_command(result.prediction)
        
        # Report every 10 chunks
        if chunk_count % 10 == 0:
            recent_lat = latencies[-10:]
            recent_conf = confidences[-10:]
            print(f"\nChunk {chunk_count}:")
            print(f"  Mean latency: {np.mean(recent_lat):.1f} ms")
            print(f"  Mean confidence: {np.mean(recent_conf):.3f}")
        
        session.reset()
    
    # 5. Summary
    print("\n" + "="*60)
    print("Session Complete")
    print("="*60)
    print(f"Total chunks: {chunk_count}")
    print(f"Mean latency: {np.mean(latencies):.1f} ms")
    print(f"Mean confidence: {np.mean(confidences):.3f}")

def execute_bci_command(prediction: int):
    """Execute BCI command based on prediction"""
    commands = ["Left", "Right", "Forward", "Stop"]
    print(f"→ Command: {commands[prediction]}")
    
    # Your application logic here
    # e.g., control wheelchair, game, cursor, etc.

4. Testing

Test Suite

# tests/test_training.py
import pytest
import numpy as np
from nimbus_bci import NimbusLDA, NimbusGMM
from nimbus_bci.data import BCIData, BCIMetadata

def test_model_training():
    """Test basic model training"""
    # Create synthetic data
    X_train = np.random.randn(50, 8)
    y_train = np.random.randint(0, 2, 50)
    
    # Train model
    clf = NimbusLDA()
    clf.fit(X_train, y_train)
    
    # Verify model properties
    assert clf.n_features_in_ == 8
    assert len(clf.classes_) == 2

def test_batch_inference():
    """Test batch prediction"""
    # Train model
    clf = NimbusLDA()
    X_train = np.random.randn(30, 8)
    y_train = np.random.randint(0, 2, 30)
    clf.fit(X_train, y_train)
    
    # Test prediction
    X_test = np.random.randn(20, 8)
    predictions = clf.predict(X_test)
    probabilities = clf.predict_proba(X_test)
    
    assert len(predictions) == 20
    assert probabilities.shape == (20, 2)
    assert np.all((probabilities >= 0) & (probabilities <= 1))
    assert np.allclose(probabilities.sum(axis=1), 1.0)

def test_streaming_inference():
    """Test streaming inference"""
    # Train model
    clf = NimbusLDA()
    X_train = np.random.randn(30, 8)
    y_train = np.random.randint(0, 2, 30)
    clf.fit(X_train, y_train)
    
    # Create streaming session
    from nimbus_bci import StreamingSession
    from nimbus_bci.data import BCIMetadata
    
    metadata = BCIMetadata(
        sampling_rate=250.0,
        n_features=8,
        n_classes=2,
        chunk_size=100
    )
    
    session = StreamingSession(clf.model_, metadata)
    
    # Process chunk
    chunk = np.random.randn(8, 100)
    result = session.process_chunk(chunk)
    
    assert result.prediction in [0, 1]
    assert 0 <= result.confidence <= 1

def test_cross_validation():
    """Test cross-validation compatibility"""
    from sklearn.model_selection import cross_val_score
    
    clf = NimbusLDA()
    X = np.random.randn(100, 8)
    y = np.random.randint(0, 2, 100)
    
    # Run cross-validation
    scores = cross_val_score(clf, X, y, cv=5)
    
    assert len(scores) == 5
    assert all(0 <= score <= 1 for score in scores)

if __name__ == "__main__":
    pytest.main([__file__, "-v"])

Best Practices

Code Organization

Module Structure

# src/MyBCIProject.jl
module MyBCIProject

using NimbusSDK
using Statistics

include("preprocessing.jl")
include("training.jl")
include("inference.jl")
include("utils.jl")

export train_motor_imagery_model,
       run_realtime_bci,
       extract_csp_features

end  # module

Error Handling

function safe_inference(model, data::BCIData)
    try
        results = predict_batch(model, data)
        return results
    catch e
        if e isa DimensionMismatch
            @error "Feature dimension mismatch" expected=get_n_features(model) actual=size(data.features, 1)
        elseif e isa ArgumentError
            @error "Invalid argument" exception=e
        else
            @error "Inference failed" exception=e
        end
        return nothing
    end
end

Performance Monitoring

using Statistics

struct PerformanceMonitor
    latencies::Vector{Float64}
    confidences::Vector{Float64}
end

PerformanceMonitor() = PerformanceMonitor(Float64[], Float64[])

function record!(monitor::PerformanceMonitor, latency_ms::Float64, confidence::Float64)
    push!(monitor.latencies, latency_ms)
    push!(monitor.confidences, confidence)
end

function report(monitor::PerformanceMonitor)
    println("Performance Report:")
    println("  Mean latency: $(round(mean(monitor.latencies), digits=1)) ms")
    println("  95th percentile: $(round(quantile(monitor.latencies, 0.95), digits=1)) ms")
    println("  Mean confidence: $(round(mean(monitor.confidences), digits=3))")
end

Debugging Tips

Common Issues

Cause: Julia’s JIT compilationSolution: Run warmup inferences
# Warmup
dummy_data = randn(16, 250, 1)
for _ in 1:10
    predict_batch(model, BCIData(dummy_data, metadata))
end
# Now real inference will be fast
Cause: Feature count doesn’t match modelSolution: Verify dimensions
println("Model expects: $(get_n_features(model)) features")
println("Data has: $(size(data.features, 1)) features")
Cause: Poor preprocessing or model calibrationSolution: Run diagnostics
report = diagnose_preprocessing(data)
println("Quality: $(report.quality_score)")
for warning in report.warnings
    println("  • $warning")
end

Next Steps

Support

Need help?
  • Email: [email protected]
  • Documentation: Browse comprehensive guides
  • GitHub: Report issues and contribute
Happy coding! 🚀 Build amazing BCI applications with NimbusSDK.