Basic BCI Examples
This section provides fundamental examples of using Nimbus for brain-computer interface applications. Examples are shown in both Python and Julia.Choose Your SDK:
- Python SDK: sklearn-compatible API with MNE integration → Python Quickstart
- Julia SDK: RxInfer.jl-based with pre-trained models → Julia Quickstart
Motor Imagery Classification
Motor imagery BCI allows users to control devices by imagining movements. This is one of the most common BCI paradigms.Basic 2-Class Motor Imagery
Classify left vs right hand motor imagery:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
from nimbus_bci.compat import extract_csp_features
import mne
# Load EEG data
raw = mne.io.read_raw_gdf("motor_imagery.gdf", preload=True)
raw.filter(8, 30) # Mu + Beta bands
# Create epochs
events = mne.find_events(raw)
event_id = {'left_hand': 1, 'right_hand': 2}
epochs = mne.Epochs(raw, events, event_id, tmin=0, tmax=4, preload=True)
# Extract CSP features
csp_features, csp = extract_csp_features(epochs, n_components=8)
labels = epochs.events[:, 2]
# Train classifier
clf = NimbusLDA()
clf.fit(csp_features, labels)
# Evaluate
from sklearn.model_selection import cross_val_score
scores = cross_val_score(clf, csp_features, labels, cv=5)
print(f"Accuracy: {scores.mean():.2%} (+/- {scores.std():.2%})")
Copy
using NimbusSDK
# 1. Setup (one-time)
NimbusSDK.install_core("your-api-key")
# 2. Load pre-trained 2-class model
model = load_model(NimbusLDA, "motor_imagery_2class_v1")
# 3. Prepare your preprocessed CSP features
# Features must be CSP-transformed EEG (8-30 Hz bandpass)
# Shape: (n_features × n_samples × n_trials)
csp_features = load_your_csp_features() # (8, 250, 100) - 8 CSP features, 1s trials, 100 trials
labels = [1, 2, 1, 2, ...] # 1 = left hand, 2 = right hand
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 8,
n_classes = 2,
chunk_size = nothing
)
data = BCIData(csp_features, metadata, labels)
# 4. Run batch inference
results = predict_batch(model, data; iterations=10)
- Python: Uses MNE for preprocessing, sklearn-compatible API
- Julia: Uses pre-trained models with RxInfer.jl inference
Accuracy Tip
For 2-class motor imagery, expect 65-85% accuracy after proper CSP preprocessing and calibration.
4-Class Motor Imagery
Classify left hand, right hand, feet, and tongue movements:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
from nimbus_bci.compat import extract_csp_features
import numpy as np
import mne
# Load and preprocess EEG
raw = mne.io.read_raw_gdf("motor_imagery_4class.gdf", preload=True)
raw.filter(8, 30)
# Create epochs for 4 classes
events = mne.find_events(raw)
event_id = {
'left_hand': 1,
'right_hand': 2,
'feet': 3,
'tongue': 4
}
epochs = mne.Epochs(raw, events, event_id, tmin=0, tmax=4, preload=True)
# Extract CSP features (more components for 4-class)
csp_features, csp = extract_csp_features(epochs, n_components=16)
labels = epochs.events[:, 2]
# Train classifier
clf = NimbusLDA()
clf.fit(csp_features, labels)
# Predict
predictions = clf.predict(csp_features)
probabilities = clf.predict_proba(csp_features)
# Per-class analysis
for class_id in range(1, 5):
class_name = list(event_id.keys())[class_id - 1]
class_mask = labels == class_id
class_acc = np.mean(predictions[class_mask] == class_id)
print(f"{class_name} accuracy: {class_acc * 100:.1f}%")
# Overall metrics
accuracy = np.mean(predictions == labels)
print(f"\nOverall accuracy: {accuracy * 100:.1f}%")
print(f"Mean confidence: {np.mean(np.max(probabilities, axis=1)):.3f}")
Copy
using NimbusSDK
# Authenticate and load 4-class model
NimbusSDK.install_core("your-api-key")
model = load_model(NimbusLDA, "motor_imagery_4class_v1")
# Prepare 4-class data
# Classes: 1=left hand, 2=right hand, 3=feet, 4=tongue
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16, # More features for 4-class problem
n_classes = 4
)
data = BCIData(csp_features, metadata, labels)
# Run inference
results = predict_batch(model, data)
# Per-class analysis
for class_id in 1:4
class_mask = labels .== class_id
class_acc = sum(results.predictions[class_mask] .== class_id) / sum(class_mask)
println("Class $class_id accuracy: $(round(class_acc * 100, digits=1))%")
end
# Overall metrics
accuracy = sum(results.predictions .== labels) / length(labels)
itr = calculate_ITR(accuracy, 4, 4.0)
println("\nOverall accuracy: $(round(accuracy * 100, digits=1))%")
println("ITR: $(round(itr, digits=1)) bits/minute")
Training Custom Motor Imagery Model
Train a model on your own data:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA, NimbusQDA
from nimbus_bci.compat import extract_csp_features
from sklearn.model_selection import train_test_split
import numpy as np
import mne
import pickle
# Load and preprocess training data
# Minimum: 50 trials per class recommended
# Optimal: 200+ trials per class
raw = mne.io.read_raw_gdf("training_data.gdf", preload=True)
raw.filter(8, 30)
events = mne.find_events(raw)
event_id = {'left_hand': 1, 'right_hand': 2, 'feet': 3, 'tongue': 4}
epochs = mne.Epochs(raw, events, event_id, tmin=0, tmax=4, preload=True)
# Extract features
csp_features, csp = extract_csp_features(epochs, n_components=16)
labels = epochs.events[:, 2]
# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(
csp_features, labels, test_size=0.2, random_state=42, stratify=labels
)
# Train NimbusLDA model (faster, shared covariance)
clf_lda = NimbusLDA(mu_scale=3.0)
clf_lda.fit(X_train, y_train)
# Or train NimbusQDA model (more flexible, class-specific covariances)
clf_gmm = NimbusQDA(mu_scale=3.0)
clf_gmm.fit(X_train, y_train)
# Save trained model
with open("my_motor_imagery_lda.pkl", "wb") as f:
pickle.dump(clf_lda, f)
# Evaluate on test data
test_predictions = clf_lda.predict(X_test)
test_accuracy = np.mean(test_predictions == y_test)
print(f"Test accuracy: {test_accuracy * 100:.1f}%")
# Cross-validation for robust estimate
from sklearn.model_selection import cross_val_score
cv_scores = cross_val_score(clf_lda, csp_features, labels, cv=5)
print(f"CV accuracy: {cv_scores.mean() * 100:.1f}% (+/- {cv_scores.std() * 100:.1f}%)")
Copy
using NimbusSDK
NimbusSDK.install_core("your-api-key")
# Collect training data
# Minimum: 50 trials per class recommended
# Optimal: 200+ trials per class
train_features, train_labels = collect_training_data()
train_data = BCIData(
train_features,
BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4
),
train_labels
)
# Train NimbusLDA model (faster, shared covariance)
rxlda_model = train_model(
NimbusLDA,
train_data;
iterations = 50,
showprogress = true,
name = "my_motor_imagery",
description = "Custom 4-class motor imagery"
)
# Or train NimbusQDA model (more flexible, class-specific covariances)
rxgmm_model = train_model(
NimbusQDA,
train_data;
iterations = 50,
showprogress = true,
name = "my_motor_imagery_gmm"
)
# Save trained model
save_model(rxlda_model, "my_motor_imagery.jld2")
# Evaluate on test data
test_results = predict_batch(rxlda_model, test_data)
test_accuracy = sum(test_results.predictions .== test_labels) / length(test_labels)
println("Test accuracy: $(round(test_accuracy * 100, digits=1))%")
P300 Event-Related Potential Detection
P300 BCIs detect attention-related brain responses to visual stimuli.Binary P300 Classification
Detect target vs non-target stimuli:- Python
- Julia
Copy
from nimbus_bci import NimbusQDA # GMM often better for P300
import numpy as np
import mne
# Load P300 EEG data
raw = mne.io.read_raw_fif("p300_oddball.fif", preload=True)
raw.filter(0.5, 10) # P300 band
# Create epochs around stimulus
events = mne.find_events(raw)
event_id = {'target': 1, 'non-target': 2}
epochs = mne.Epochs(raw, events, event_id, tmin=-0.2, tmax=0.8,
baseline=(-0.2, 0), preload=True)
# Extract ERP features (mean amplitude in time windows)
# Focus on P300 window (250-500ms)
data = epochs.get_data() # (n_epochs, n_channels, n_times)
labels = epochs.events[:, 2]
# Extract mean amplitude from key electrodes (Pz, Cz, Fz)
ch_names = ['Pz', 'Cz', 'Fz']
ch_indices = [epochs.ch_names.index(ch) for ch in ch_names]
# Time window: 250-500ms (P300 peak)
times = epochs.times
p300_window = (times >= 0.25) & (times <= 0.5)
# Extract features: mean amplitude per channel in P300 window
features = np.mean(data[:, ch_indices, :][:, :, p300_window], axis=2)
# Train classifier (GMM better for P300's overlapping distributions)
clf = NimbusQDA()
clf.fit(features, labels)
# Predict
predictions = clf.predict(features)
probabilities = clf.predict_proba(features)
# Analyze target detection
target_mask = labels == 1
target_detected = np.sum(predictions[target_mask] == 1)
target_total = np.sum(target_mask)
print(f"Target detection rate: {100 * target_detected / target_total:.1f}%")
print(f"Mean target confidence: {np.mean(probabilities[target_mask, 0]):.3f}")
Copy
using NimbusSDK
# Setup and load P300 model
NimbusSDK.install_core("your-api-key")
model = load_model(NimbusLDA, "p300_binary_v1")
# Prepare P300 ERP features
# Features extracted from 0.2-0.8s post-stimulus window
# Typical: Bandpower or ERP amplitudes from Pz, Cz, Fz
erp_features = load_p300_features() # (12, 150, 200) - 12 features, 0.6s @ 250Hz, 200 trials
labels = [1, 2, 2, 2, 1, 2, ...] # 1=target, 2=non-target
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :p300,
feature_type = :erp,
n_features = 12,
n_classes = 2
)
data = BCIData(erp_features, metadata, labels)
# Run inference
results = predict_batch(model, data)
# Analyze target detection
target_mask = labels .== 1
target_detected = sum(results.predictions[target_mask] .== 1)
target_total = sum(target_mask)
println("Target detection rate: $(round(100 * target_detected / target_total, digits=1))%")
println("Mean target confidence: $(round(mean(results.confidences[target_mask]), digits=3))")
P300 Speller Application
Implement a P300-based speller:- Python
- Julia
Copy
from nimbus_bci import NimbusQDA
import numpy as np
# Train P300 speller model
clf = NimbusQDA()
# Assume we have training data from calibration phase
clf.fit(X_calib, y_calib) # X_calib: ERP features, y_calib: target/non-target
# Spelling matrix (6x6 for 36 characters)
matrix = np.array([
['A', 'B', 'C', 'D', 'E', 'F'],
['G', 'H', 'I', 'J', 'K', 'L'],
['M', 'N', 'O', 'P', 'Q', 'R'],
['S', 'T', 'U', 'V', 'W', 'X'],
['Y', 'Z', '1', '2', '3', '4'],
['5', '6', '7', '8', '9', '0']
])
def spell_character(row_features, col_features):
"""
Spell one character using row/column paradigm.
Args:
row_features: ERP features from 6 row flashes (6, n_features)
col_features: ERP features from 6 column flashes (6, n_features)
Returns:
character, confidence
"""
# Predict for rows
row_probs = clf.predict_proba(row_features)[:, 1] # P(target)
target_row = np.argmax(row_probs)
# Predict for columns
col_probs = clf.predict_proba(col_features)[:, 1] # P(target)
target_col = np.argmax(col_probs)
# Get character at intersection
character = matrix[target_row, target_col]
confidence = (row_probs[target_row] + col_probs[target_col]) / 2
return character, confidence
# Spell a word
word = ""
for i in range(5): # Spell 5 characters
# Collect ERP features from row/column flashes
row_features = collect_row_erps() # Your acquisition function
col_features = collect_col_erps() # Your acquisition function
character, confidence = spell_character(row_features, col_features)
if confidence > 0.7:
word += character
print(f"Spelled: {character} (confidence: {confidence:.2f})")
else:
print("Low confidence - retry")
print(f"\nSpelled word: {word}")
Copy
using NimbusSDK
NimbusSDK.install_core("your-api-key")
model = load_model(NimbusLDA, "p300_speller_v1")
# Spelling matrix (6x6 for 36 characters)
matrix = reshape(collect('A':'Z') ∪ collect('0':'9'), 6, 6)
function spell_character(row_col_trials)
"""
Spell one character using row/column paradigm
row_col_trials: Dict with :rows and :cols EEG data
"""
# Analyze row flashes
row_data = BCIData(row_col_trials[:rows], metadata)
row_results = predict_batch(model, row_data)
target_row = argmax([mean(row_results.posteriors[1, i:i+5]) for i in 1:6:30])
# Analyze column flashes
col_data = BCIData(row_col_trials[:cols], metadata)
col_results = predict_batch(model, col_data)
target_col = argmax([mean(col_results.posteriors[1, i:i+5]) for i in 1:6:30])
# Get character at intersection
character = matrix[target_row, target_col]
confidence = (row_results.confidences[target_row] + col_results.confidences[target_col]) / 2
return (character=character, confidence=confidence)
end
# Spell a word
word = ""
for _ in 1:5 # Spell 5 characters
trials = collect_p300_trials() # Your acquisition function
result = spell_character(trials)
if result.confidence > 0.7
word *= result.character
println("Spelled: $(result.character) (confidence: $(round(result.confidence, digits=2)))")
else
println("Low confidence - retry")
end
end
println("\nSpelled word: $word")
Real-Time Streaming Examples
Streaming Motor Imagery Control
Real-time cursor control with streaming inference:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA, StreamingSession
from nimbus_bci.data import BCIMetadata
import numpy as np
# Train or load model
clf = NimbusLDA()
clf.fit(X_train, y_train)
# Configure streaming metadata
metadata = BCIMetadata(
sampling_rate=250.0,
paradigm="motor_imagery",
feature_type="csp",
n_features=16,
n_classes=4,
chunk_size=250 # 1-second chunks
)
# Initialize streaming session
session = StreamingSession(clf.model_, metadata)
# Process real-time EEG chunks
cursor_position = [0.0, 0.0] # [x, y]
movement_speed = 0.05
for chunk in eeg_stream: # Your real-time acquisition loop
# Process chunk (16 features × 250 samples)
chunk_result = session.process_chunk(chunk)
# Update cursor based on prediction
if chunk_result.confidence > 0.7:
if chunk_result.prediction == 0: # Left hand
cursor_position[0] -= movement_speed
elif chunk_result.prediction == 1: # Right hand
cursor_position[0] += movement_speed
elif chunk_result.prediction == 2: # Feet
cursor_position[1] -= movement_speed
elif chunk_result.prediction == 3: # Tongue
cursor_position[1] += movement_speed
update_cursor_display(cursor_position)
# Reset for next chunk
session.reset()
Copy
using NimbusSDK
# Setup
NimbusSDK.install_core("your-api-key")
model = load_model(NimbusLDA, "motor_imagery_4class_v1")
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4,
chunk_size = 250 # 1-second chunks
)
# Initialize streaming session
session = init_streaming(model, metadata)
# Process real-time EEG chunks
cursor_position = [0.0, 0.0] # [x, y]
movement_speed = 0.05
for chunk in eeg_stream
# Process chunk (16 features × 250 samples)
chunk_result = process_chunk(session, chunk)
# Update cursor based on prediction
if chunk_result.confidence > 0.7
if chunk_result.prediction == 1 # Left hand
cursor_position[1] -= movement_speed
elseif chunk_result.prediction == 2 # Right hand
cursor_position[1] += movement_speed
elseif chunk_result.prediction == 3 # Feet
cursor_position[2] -= movement_speed
elseif chunk_result.prediction == 4 # Tongue
cursor_position[2] += movement_speed
end
update_cursor_display(cursor_position)
end
end
Adaptive Threshold Control
Adjust control sensitivity based on performance:- Python
- Julia
Copy
from nimbus_bci import StreamingSession
from collections import deque
import numpy as np
# Setup streaming
session = StreamingSession(clf.model_, metadata)
# Performance tracker
class OnlinePerformanceTracker:
def __init__(self, window_size=20):
self.window_size = window_size
self.predictions = deque(maxlen=window_size)
self.ground_truths = deque(maxlen=window_size)
def update(self, prediction, ground_truth):
self.predictions.append(prediction)
self.ground_truths.append(ground_truth)
if len(self.predictions) > 0:
accuracy = np.mean(np.array(self.predictions) == np.array(self.ground_truths))
return accuracy
return 0.0
tracker = OnlinePerformanceTracker(window_size=20)
# Adaptive parameters
confidence_threshold = 0.7
update_interval = 10
trial_count = 0
for trial in trials:
trial_count += 1
# Process trial chunks
for chunk in collect_trial_chunks():
result = session.process_chunk(chunk)
# Execute command if confidence exceeds threshold
if result.confidence > confidence_threshold:
execute_command(result.prediction)
final_result = session.finalize_trial()
# Update tracker with ground truth (if available)
if ground_truth is not None:
accuracy = tracker.update(final_result.prediction, ground_truth)
# Adapt threshold every 10 trials
if trial_count % update_interval == 0:
if accuracy > 0.85:
# High accuracy - can lower threshold for faster response
confidence_threshold = max(0.6, confidence_threshold * 0.95)
print(f"Lowering threshold to {confidence_threshold:.2f}")
elif accuracy < 0.65:
# Low accuracy - raise threshold for reliability
confidence_threshold = min(0.85, confidence_threshold * 1.05)
print(f"Raising threshold to {confidence_threshold:.2f}")
session.reset()
Copy
using NimbusSDK
# Setup streaming
session = init_streaming(model, metadata)
tracker = OnlinePerformanceTracker(window_size=20)
# Adaptive parameters
confidence_threshold = 0.7
update_interval = 10
trial_count = 0
for trial in trials
trial_count += 1
# Process trial
for chunk in collect_trial_chunks()
result = process_chunk(session, chunk)
# Execute command if confidence exceeds threshold
if result.confidence > confidence_threshold
execute_command(result.prediction)
end
end
final_result = finalize_trial(session)
# Update tracker with ground truth (if available)
if !isnothing(ground_truth)
metrics = update_and_report!(tracker, final_result.prediction, ground_truth, final_result.confidence)
# Adapt threshold every 10 trials
if trial_count % update_interval == 0
if metrics.accuracy > 0.85
# High accuracy - can lower threshold for faster response
confidence_threshold = max(0.6, confidence_threshold * 0.95)
println("Lowering threshold to $(round(confidence_threshold, digits=2))")
elseif metrics.accuracy < 0.65
# Low accuracy - raise threshold for reliability
confidence_threshold = min(0.85, confidence_threshold * 1.05)
println("Raising threshold to $(round(confidence_threshold, digits=2))")
end
end
end
end
Subject-Specific Calibration
Quick Calibration for New Users
Personalize a model with minimal calibration data:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
from sklearn.model_selection import train_test_split
import numpy as np
import pickle
# Load or use a pre-trained baseline model
# For Python SDK, we can use online learning (partial_fit)
baseline_clf = NimbusLDA()
# Train baseline on large dataset
baseline_clf.fit(X_baseline, y_baseline)
# Collect quick calibration (10-20 trials per class)
print("Collecting calibration data...")
X_calib, y_calib = collect_calibration_trials(trials_per_class=15)
# Split for validation
X_calib_train, X_val, y_calib_train, y_val = train_test_split(
X_calib, y_calib, test_size=0.3, stratify=y_calib, random_state=42
)
# Personalize model using online learning
print("Calibrating model...")
personalized_clf = NimbusLDA()
personalized_clf.fit(X_baseline, y_baseline) # Start with baseline
# Fine-tune on calibration data
for _ in range(5): # Multiple passes through calibration
personalized_clf.partial_fit(X_calib_train, y_calib_train)
# Save personalized model
with open("subject_123_personalized.pkl", "wb") as f:
pickle.dump(personalized_clf, f)
# Test improvement
print("\nTesting on validation data...")
baseline_pred = baseline_clf.predict(X_val)
personalized_pred = personalized_clf.predict(X_val)
baseline_acc = np.mean(baseline_pred == y_val)
personalized_acc = np.mean(personalized_pred == y_val)
print(f"Baseline accuracy: {baseline_acc * 100:.1f}%")
print(f"Personalized accuracy: {personalized_acc * 100:.1f}%")
print(f"Improvement: +{(personalized_acc - baseline_acc) * 100:.1f}%")
Copy
using NimbusSDK
NimbusSDK.install_core("your-api-key")
# Load baseline model
baseline_model = load_model(NimbusLDA, "motor_imagery_baseline_v1")
# Collect quick calibration (10-20 trials per class)
println("Collecting calibration data...")
calib_features, calib_labels = collect_calibration_trials(trials_per_class=15)
calib_data = BCIData(
calib_features,
BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4
),
calib_labels
)
# Calibrate model (much faster than training from scratch)
println("Calibrating model...")
personalized_model = calibrate_model(
baseline_model,
calib_data;
iterations = 20 # Fewer iterations needed
)
# Save personalized model
save_model(personalized_model, "subject_123_personalized.jld2")
# Test improvement
println("\nTesting on validation data...")
baseline_results = predict_batch(baseline_model, validation_data)
personalized_results = predict_batch(personalized_model, validation_data)
baseline_acc = sum(baseline_results.predictions .== val_labels) / length(val_labels)
personalized_acc = sum(personalized_results.predictions .== val_labels) / length(val_labels)
println("Baseline accuracy: $(round(baseline_acc * 100, digits=1))%")
println("Personalized accuracy: $(round(personalized_acc * 100, digits=1))%")
println("Improvement: $(round((personalized_acc - baseline_acc) * 100, digits=1))%")
Quality Assessment and Diagnostics
Identify Poor Quality Trials
- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
import numpy as np
# Run inference
predictions = clf.predict(X)
probabilities = clf.predict_proba(X)
confidences = np.max(probabilities, axis=1)
# Identify low-confidence trials
low_conf_threshold = 0.65
low_conf_indices = np.where(confidences < low_conf_threshold)[0]
print(f"Low confidence trials: {len(low_conf_indices)}/{len(predictions)}")
print(f"Indices: {low_conf_indices}")
# Analyze patterns in low-confidence trials
if len(low_conf_indices) > 0:
print("\nLow confidence trial analysis:")
for class_id, class_name in enumerate(["Left", "Right", "Feet", "Tongue"]):
class_low_conf = np.sum(y[low_conf_indices] == class_id)
print(f" {class_name}: {class_low_conf} trials")
# Overall quality assessment
mean_confidence = np.mean(confidences)
print(f"\nMean confidence: {mean_confidence:.2f}")
if mean_confidence > 0.8:
print("Recommendation: High quality - proceed with deployment")
elif mean_confidence > 0.65:
print("Recommendation: Acceptable quality - monitor performance")
else:
print("Recommendation: Poor quality - recalibrate or retrain")
Copy
using NimbusSDK
# Run inference
results = predict_batch(model, data)
# Identify low-confidence trials
low_conf_threshold = 0.65
low_conf_indices = findall(results.confidences .< low_conf_threshold)
println("Low confidence trials: $(length(low_conf_indices))/$(length(results.predictions))")
println("Indices: $low_conf_indices")
# Analyze patterns in low-confidence trials
if !isempty(low_conf_indices)
println("\nLow confidence trial analysis:")
for (class_id, class_name) in enumerate(["Left", "Right", "Feet", "Tongue"])
class_low_conf = sum(labels[low_conf_indices] .== class_id)
println(" $class_name: $class_low_conf trials")
end
end
# Overall quality assessment
quality = assess_trial_quality(results)
println("\nOverall quality score: $(round(quality.overall_score, digits=2))")
println("Recommendation: $(quality.recommendation)")
Preprocessing Quality Check
- Python
- Julia
Copy
import numpy as np
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
# Check for common preprocessing issues
def diagnose_preprocessing(X, y):
"""
Diagnose common preprocessing quality issues.
Args:
X: Feature matrix (n_samples, n_features)
y: Labels (n_samples,)
Returns:
Dictionary with quality metrics and recommendations
"""
errors = []
warnings = []
recommendations = []
# Check for NaN/Inf
if np.any(np.isnan(X)) or np.any(np.isinf(X)):
errors.append("NaN or Inf values detected in features")
# Check feature variance
variances = np.var(X, axis=0)
low_var_features = np.sum(variances < 1e-6)
if low_var_features > 0:
warnings.append(f"{low_var_features} features have very low variance")
recommendations.append("Consider removing low-variance features")
# Check feature scale
means = np.mean(X, axis=0)
stds = np.std(X, axis=0)
if np.max(stds) / np.min(stds[stds > 0]) > 100:
warnings.append("Features have very different scales")
recommendations.append("Apply feature normalization (z-score or min-max)")
# Feature discriminability (using LDA)
lda = LinearDiscriminantAnalysis()
lda.fit(X, y)
feature_scores = np.abs(lda.coef_).mean(axis=0)
# Quality score
quality_score = 1.0
quality_score -= 0.5 * (len(errors) > 0)
quality_score -= 0.1 * len(warnings)
return {
'errors': errors,
'warnings': warnings,
'recommendations': recommendations,
'feature_scores': feature_scores,
'quality_score': quality_score
}
# Run diagnosis
report = diagnose_preprocessing(X, y)
print("Preprocessing Quality Report")
print("=" * 50)
print(f"Overall score: {report['quality_score'] * 100:.1f}%")
if report['errors']:
print("\n⚠️ ERRORS:")
for error in report['errors']:
print(f" • {error}")
if report['warnings']:
print("\n⚠️ WARNINGS:")
for warning in report['warnings']:
print(f" • {warning}")
if report['recommendations']:
print("\n💡 RECOMMENDATIONS:")
for rec in report['recommendations']:
print(f" • {rec}")
# Top discriminative features
print("\nTop discriminative features:")
top_features = np.argsort(report['feature_scores'])[-5:][::-1]
for rank, feat_idx in enumerate(top_features, 1):
print(f" {rank}. Feature {feat_idx} (score: {report['feature_scores'][feat_idx]:.3f})")
Copy
using NimbusSDK
# Diagnose preprocessing quality
report = diagnose_preprocessing(data)
println("Preprocessing Quality Report")
println("="^50)
println("Overall score: $(round(report.quality_score * 100, digits=1))%")
if !isempty(report.errors)
println("\n⚠️ ERRORS:")
for error in report.errors
println(" • $error")
end
end
if !isempty(report.warnings)
println("\n⚠️ WARNINGS:")
for warning in report.warnings
println(" • $warning")
end
end
if !isempty(report.recommendations)
println("\n💡 RECOMMENDATIONS:")
for rec in report.recommendations
println(" • $rec")
end
end
# Feature discriminability analysis
if hasfield(typeof(report), :feature_scores)
println("\nTop discriminative features:")
top_features = sortperm(report.feature_scores, rev=true)[1:5]
for (rank, feat_idx) in enumerate(top_features)
println(" $rank. Feature $feat_idx (score: $(round(report.feature_scores[feat_idx], digits=3)))")
end
end
Performance Comparison
NimbusLDA vs NimbusQDA
Compare the two models:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA, NimbusQDA
import numpy as np
# Train both models on same data
print("Training NimbusLDA...")
clf_lda = NimbusLDA()
clf_lda.fit(X_train, y_train)
print("\nTraining NimbusQDA...")
clf_gmm = NimbusQDA()
clf_gmm.fit(X_train, y_train)
# Test both models
print("\n" + "=" * 50)
print("Model Comparison")
print("=" * 50)
for clf, name in [(clf_lda, "NimbusLDA"), (clf_gmm, "NimbusQDA")]:
predictions = clf.predict(X_test)
probabilities = clf.predict_proba(X_test)
accuracy = np.mean(predictions == y_test)
mean_conf = np.mean(np.max(probabilities, axis=1))
print(f"\n{name}:")
print(f" Accuracy: {accuracy * 100:.1f}%")
print(f" Mean confidence: {mean_conf:.3f}")
# Per-class performance
from sklearn.metrics import classification_report
print("\n Per-class metrics:")
print(classification_report(y_test, predictions,
target_names=["Left", "Right", "Feet", "Tongue"],
zero_division=0))
print("\nRecommendation:")
print(" • NimbusLDA: Faster, good for well-separated classes")
print(" • NimbusQDA: More flexible, better for overlapping distributions")
Copy
using NimbusSDK
NimbusSDK.install_core("your-api-key")
# Train both models on same data
println("Training NimbusLDA...")
rxlda = train_model(NimbusLDA, train_data; iterations=50, showprogress=true)
println("\nTraining NimbusQDA...")
rxgmm = train_model(NimbusQDA, train_data; iterations=50, showprogress=true)
# Test both models
println("\n" * "="^50)
println("Model Comparison")
println("="^50)
for (model, name) in [(rxlda, "NimbusLDA"), (rxgmm, "NimbusQDA")]
results = predict_batch(model, test_data)
accuracy = sum(results.predictions .== test_labels) / length(test_labels)
mean_conf = mean(results.confidences)
itr = calculate_ITR(accuracy, 4, 4.0)
println("\n$name:")
println(" Accuracy: $(round(accuracy * 100, digits=1))%")
println(" Mean confidence: $(round(mean_conf, digits=3))")
println(" ITR: $(round(itr, digits=1)) bits/min")
end
println("\nRecommendation:")
println(" • NimbusLDA: Faster, good for well-separated classes")
println(" • NimbusQDA: More flexible, better for overlapping distributions")
NimbusSTS for Non-Stationary Data
Compare adaptive vs static models on drifting data:- Python
Copy
from nimbus_bci import NimbusLDA, NimbusSTS
import numpy as np
def generate_drifting_data(n_samples, n_features, n_classes=4, drift_rate=0.03):
"""Generate data with temporal drift (simulates fatigue/electrode changes)."""
X, y = [], []
means = np.random.randn(n_classes, n_features) * 2.0
for t in range(n_samples):
means += np.random.randn(n_classes, n_features) * drift_rate # Drift!
label = t % n_classes
x = np.random.randn(n_features) + means[label]
X.append(x)
y.append(label)
return np.array(X), np.array(y)
# Generate non-stationary data
print("Generating non-stationary data with drift...")
X_train, y_train = generate_drifting_data(200, 16, n_classes=4, drift_rate=0.03)
X_test, y_test = generate_drifting_data(100, 16, n_classes=4, drift_rate=0.03)
# Static model (NimbusLDA)
print("\nTraining static model (NimbusLDA)...")
clf_static = NimbusLDA()
clf_static.fit(X_train, y_train)
acc_static = clf_static.score(X_test, y_test)
# Adaptive model (NimbusSTS)
print("Training adaptive model (NimbusSTS)...")
clf_sts = NimbusSTS(
transition_cov=0.05, # Moderate drift tracking
num_steps=100
)
clf_sts.fit(X_train, y_train)
# Evaluate with state propagation
preds_sts = []
for x in X_test:
clf_sts.propagate_state() # Advance time
pred = clf_sts.predict(x.reshape(1, -1))[0]
preds_sts.append(pred)
acc_sts = np.mean(preds_sts == y_test)
# Results
print("\n" + "=" * 50)
print("Non-Stationary Data Performance")
print("=" * 50)
print(f"\nNimbusLDA (static): {acc_static * 100:.1f}%")
print(f"NimbusSTS (adaptive): {acc_sts * 100:.1f}%")
print(f"Improvement: {(acc_sts - acc_static) * 100:+.1f}%")
print("\nWhen to use NimbusSTS:")
print(" ✓ Long sessions (>30 minutes)")
print(" ✓ Cross-day experiments")
print(" ✓ Observable drift or fatigue effects")
print(" ✓ Adaptive BCI with online learning")
State Inspection and Monitoring
Monitor how NimbusSTS adapts over time:- Python
Copy
from nimbus_bci import NimbusSTS
import numpy as np
import matplotlib.pyplot as plt
# Train model
clf = NimbusSTS(transition_cov=0.05)
clf.fit(X_train, y_train)
# Track state evolution
state_history = []
uncertainty_history = []
predictions = []
for x in X_test:
# Propagate state
clf.propagate_state()
# Get state before prediction
z_mean, z_cov = clf.get_latent_state()
state_history.append(z_mean.copy())
uncertainty_history.append(np.trace(z_cov))
# Predict
pred = clf.predict(x.reshape(1, -1))[0]
predictions.append(pred)
# Visualize state evolution
state_history = np.array(state_history)
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
# State components
axes[0].plot(state_history)
axes[0].set_xlabel('Sample')
axes[0].set_ylabel('Latent State Components')
axes[0].set_title('State Evolution Over Time')
axes[0].grid(True)
# Uncertainty
axes[1].plot(uncertainty_history)
axes[1].set_xlabel('Sample')
axes[1].set_ylabel('Uncertainty (trace of cov)')
axes[1].set_title('State Uncertainty')
axes[1].grid(True)
# Accuracy over time (windowed)
window_size = 10
acc_over_time = []
for i in range(len(predictions) - window_size):
window_acc = np.mean(predictions[i:i+window_size] == y_test[i:i+window_size])
acc_over_time.append(window_acc)
axes[2].plot(acc_over_time)
axes[2].axhline(y=0.25, color='r', linestyle='--', label='Chance (25%)')
axes[2].set_xlabel('Sample')
axes[2].set_ylabel(f'Accuracy (window={window_size})')
axes[2].set_title('Accuracy Over Time')
axes[2].legend()
axes[2].grid(True)
plt.tight_layout()
plt.show()
print(f"Final state mean: {z_mean}")
print(f"Final uncertainty: {np.trace(z_cov):.3f}")
Next Steps
Advanced Examples
More sophisticated BCI applications
Basic Examples
Complete working code examples
NimbusLDA Model
Detailed NimbusLDA documentation
NimbusQDA Model
Detailed NimbusQDA documentation
NimbusSTS Model
Adaptive model for non-stationary data
Batch Processing
Offline batch inference guide
Streaming Inference
Real-time streaming guide
All examples use actual NimbusSDK.jl functions and the real NimbusLDA/NimbusQDA models. No mocked or simulated code.