Basic BCI Examples
This section provides fundamental examples of using Nimbus for brain-computer interface applications. Examples are shown in both Python and Julia.Choose Your SDK:
- Python SDK: sklearn-compatible API with MNE integration → Python Quickstart
- Julia SDK: RxInfer.jl-based with pre-trained models → Julia Quickstart
Motor Imagery Classification
Motor imagery BCI allows users to control devices by imagining movements. This is one of the most common BCI paradigms.Basic 2-Class Motor Imagery
Classify left vs right hand motor imagery:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
from nimbus_bci.compat import extract_csp_features
import mne
# Load EEG data
raw = mne.io.read_raw_gdf("motor_imagery.gdf", preload=True)
raw.filter(8, 30) # Mu + Beta bands
# Create epochs
events = mne.find_events(raw)
event_id = {'left_hand': 1, 'right_hand': 2}
epochs = mne.Epochs(raw, events, event_id, tmin=0, tmax=4, preload=True)
# Extract CSP features
csp_features, csp = extract_csp_features(epochs, n_components=8)
labels = epochs.events[:, 2]
# Train classifier
clf = NimbusLDA()
clf.fit(csp_features, labels)
# Evaluate
from sklearn.model_selection import cross_val_score
scores = cross_val_score(clf, csp_features, labels, cv=5)
print(f"Accuracy: {scores.mean():.2%} (+/- {scores.std():.2%})")
Copy
using NimbusSDK
# 1. Setup (one-time)
NimbusSDK.install_core("your-api-key")
# 2. Load pre-trained 2-class model
model = load_model(RxLDAModel, "motor_imagery_2class_v1")
# 3. Prepare your preprocessed CSP features
# Features must be CSP-transformed EEG (8-30 Hz bandpass)
# Shape: (n_features × n_samples × n_trials)
csp_features = load_your_csp_features() # (8, 250, 100) - 8 CSP features, 1s trials, 100 trials
labels = [1, 2, 1, 2, ...] # 1 = left hand, 2 = right hand
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 8,
n_classes = 2,
chunk_size = nothing
)
data = BCIData(csp_features, metadata, labels)
# 4. Run batch inference
results = predict_batch(model, data; iterations=10)
- Python: Uses MNE for preprocessing, sklearn-compatible API
- Julia: Uses pre-trained models with RxInfer.jl inference
Accuracy Tip
For 2-class motor imagery, expect 65-85% accuracy after proper CSP preprocessing and calibration.
4-Class Motor Imagery
Classify left hand, right hand, feet, and tongue movements:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
from nimbus_bci.compat import extract_csp_features
import numpy as np
import mne
# Load and preprocess EEG
raw = mne.io.read_raw_gdf("motor_imagery_4class.gdf", preload=True)
raw.filter(8, 30)
# Create epochs for 4 classes
events = mne.find_events(raw)
event_id = {
'left_hand': 1,
'right_hand': 2,
'feet': 3,
'tongue': 4
}
epochs = mne.Epochs(raw, events, event_id, tmin=0, tmax=4, preload=True)
# Extract CSP features (more components for 4-class)
csp_features, csp = extract_csp_features(epochs, n_components=16)
labels = epochs.events[:, 2]
# Train classifier
clf = NimbusLDA()
clf.fit(csp_features, labels)
# Predict
predictions = clf.predict(csp_features)
probabilities = clf.predict_proba(csp_features)
# Per-class analysis
for class_id in range(1, 5):
class_name = list(event_id.keys())[class_id - 1]
class_mask = labels == class_id
class_acc = np.mean(predictions[class_mask] == class_id)
print(f"{class_name} accuracy: {class_acc * 100:.1f}%")
# Overall metrics
accuracy = np.mean(predictions == labels)
print(f"\nOverall accuracy: {accuracy * 100:.1f}%")
print(f"Mean confidence: {np.mean(np.max(probabilities, axis=1)):.3f}")
Copy
using NimbusSDK
# Authenticate and load 4-class model
NimbusSDK.install_core("your-api-key")
model = load_model(RxLDAModel, "motor_imagery_4class_v1")
# Prepare 4-class data
# Classes: 1=left hand, 2=right hand, 3=feet, 4=tongue
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16, # More features for 4-class problem
n_classes = 4
)
data = BCIData(csp_features, metadata, labels)
# Run inference
results = predict_batch(model, data)
# Per-class analysis
for class_id in 1:4
class_mask = labels .== class_id
class_acc = sum(results.predictions[class_mask] .== class_id) / sum(class_mask)
println("Class $class_id accuracy: $(round(class_acc * 100, digits=1))%")
end
# Overall metrics
accuracy = sum(results.predictions .== labels) / length(labels)
itr = calculate_ITR(accuracy, 4, 4.0)
println("\nOverall accuracy: $(round(accuracy * 100, digits=1))%")
println("ITR: $(round(itr, digits=1)) bits/minute")
Training Custom Motor Imagery Model
Train a model on your own data:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA, NimbusGMM
from nimbus_bci.compat import extract_csp_features
from sklearn.model_selection import train_test_split
import numpy as np
import mne
import pickle
# Load and preprocess training data
# Minimum: 50 trials per class recommended
# Optimal: 200+ trials per class
raw = mne.io.read_raw_gdf("training_data.gdf", preload=True)
raw.filter(8, 30)
events = mne.find_events(raw)
event_id = {'left_hand': 1, 'right_hand': 2, 'feet': 3, 'tongue': 4}
epochs = mne.Epochs(raw, events, event_id, tmin=0, tmax=4, preload=True)
# Extract features
csp_features, csp = extract_csp_features(epochs, n_components=16)
labels = epochs.events[:, 2]
# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(
csp_features, labels, test_size=0.2, random_state=42, stratify=labels
)
# Train NimbusLDA model (faster, shared covariance)
clf_lda = NimbusLDA(mu_scale=3.0)
clf_lda.fit(X_train, y_train)
# Or train NimbusGMM model (more flexible, class-specific covariances)
clf_gmm = NimbusGMM(mu_scale=3.0)
clf_gmm.fit(X_train, y_train)
# Save trained model
with open("my_motor_imagery_lda.pkl", "wb") as f:
pickle.dump(clf_lda, f)
# Evaluate on test data
test_predictions = clf_lda.predict(X_test)
test_accuracy = np.mean(test_predictions == y_test)
print(f"Test accuracy: {test_accuracy * 100:.1f}%")
# Cross-validation for robust estimate
from sklearn.model_selection import cross_val_score
cv_scores = cross_val_score(clf_lda, csp_features, labels, cv=5)
print(f"CV accuracy: {cv_scores.mean() * 100:.1f}% (+/- {cv_scores.std() * 100:.1f}%)")
Copy
using NimbusSDK
NimbusSDK.install_core("your-api-key")
# Collect training data
# Minimum: 50 trials per class recommended
# Optimal: 200+ trials per class
train_features, train_labels = collect_training_data()
train_data = BCIData(
train_features,
BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4
),
train_labels
)
# Train RxLDA model (faster, shared covariance)
rxlda_model = train_model(
RxLDAModel,
train_data;
iterations = 50,
showprogress = true,
name = "my_motor_imagery",
description = "Custom 4-class motor imagery"
)
# Or train RxGMM model (more flexible, class-specific covariances)
rxgmm_model = train_model(
RxGMMModel,
train_data;
iterations = 50,
showprogress = true,
name = "my_motor_imagery_gmm"
)
# Save trained model
save_model(rxlda_model, "my_motor_imagery.jld2")
# Evaluate on test data
test_results = predict_batch(rxlda_model, test_data)
test_accuracy = sum(test_results.predictions .== test_labels) / length(test_labels)
println("Test accuracy: $(round(test_accuracy * 100, digits=1))%")
P300 Event-Related Potential Detection
P300 BCIs detect attention-related brain responses to visual stimuli.Binary P300 Classification
Detect target vs non-target stimuli:- Python
- Julia
Copy
from nimbus_bci import NimbusGMM # GMM often better for P300
import numpy as np
import mne
# Load P300 EEG data
raw = mne.io.read_raw_fif("p300_oddball.fif", preload=True)
raw.filter(0.5, 10) # P300 band
# Create epochs around stimulus
events = mne.find_events(raw)
event_id = {'target': 1, 'non-target': 2}
epochs = mne.Epochs(raw, events, event_id, tmin=-0.2, tmax=0.8,
baseline=(-0.2, 0), preload=True)
# Extract ERP features (mean amplitude in time windows)
# Focus on P300 window (250-500ms)
data = epochs.get_data() # (n_epochs, n_channels, n_times)
labels = epochs.events[:, 2]
# Extract mean amplitude from key electrodes (Pz, Cz, Fz)
ch_names = ['Pz', 'Cz', 'Fz']
ch_indices = [epochs.ch_names.index(ch) for ch in ch_names]
# Time window: 250-500ms (P300 peak)
times = epochs.times
p300_window = (times >= 0.25) & (times <= 0.5)
# Extract features: mean amplitude per channel in P300 window
features = np.mean(data[:, ch_indices, :][:, :, p300_window], axis=2)
# Train classifier (GMM better for P300's overlapping distributions)
clf = NimbusGMM()
clf.fit(features, labels)
# Predict
predictions = clf.predict(features)
probabilities = clf.predict_proba(features)
# Analyze target detection
target_mask = labels == 1
target_detected = np.sum(predictions[target_mask] == 1)
target_total = np.sum(target_mask)
print(f"Target detection rate: {100 * target_detected / target_total:.1f}%")
print(f"Mean target confidence: {np.mean(probabilities[target_mask, 0]):.3f}")
Copy
using NimbusSDK
# Setup and load P300 model
NimbusSDK.install_core("your-api-key")
model = load_model(RxLDAModel, "p300_binary_v1")
# Prepare P300 ERP features
# Features extracted from 0.2-0.8s post-stimulus window
# Typical: Bandpower or ERP amplitudes from Pz, Cz, Fz
erp_features = load_p300_features() # (12, 150, 200) - 12 features, 0.6s @ 250Hz, 200 trials
labels = [1, 2, 2, 2, 1, 2, ...] # 1=target, 2=non-target
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :p300,
feature_type = :erp,
n_features = 12,
n_classes = 2
)
data = BCIData(erp_features, metadata, labels)
# Run inference
results = predict_batch(model, data)
# Analyze target detection
target_mask = labels .== 1
target_detected = sum(results.predictions[target_mask] .== 1)
target_total = sum(target_mask)
println("Target detection rate: $(round(100 * target_detected / target_total, digits=1))%")
println("Mean target confidence: $(round(mean(results.confidences[target_mask]), digits=3))")
P300 Speller Application
Implement a P300-based speller:- Python
- Julia
Copy
from nimbus_bci import NimbusGMM
import numpy as np
# Train P300 speller model
clf = NimbusGMM()
# Assume we have training data from calibration phase
clf.fit(X_calib, y_calib) # X_calib: ERP features, y_calib: target/non-target
# Spelling matrix (6x6 for 36 characters)
matrix = np.array([
['A', 'B', 'C', 'D', 'E', 'F'],
['G', 'H', 'I', 'J', 'K', 'L'],
['M', 'N', 'O', 'P', 'Q', 'R'],
['S', 'T', 'U', 'V', 'W', 'X'],
['Y', 'Z', '1', '2', '3', '4'],
['5', '6', '7', '8', '9', '0']
])
def spell_character(row_features, col_features):
"""
Spell one character using row/column paradigm.
Args:
row_features: ERP features from 6 row flashes (6, n_features)
col_features: ERP features from 6 column flashes (6, n_features)
Returns:
character, confidence
"""
# Predict for rows
row_probs = clf.predict_proba(row_features)[:, 1] # P(target)
target_row = np.argmax(row_probs)
# Predict for columns
col_probs = clf.predict_proba(col_features)[:, 1] # P(target)
target_col = np.argmax(col_probs)
# Get character at intersection
character = matrix[target_row, target_col]
confidence = (row_probs[target_row] + col_probs[target_col]) / 2
return character, confidence
# Spell a word
word = ""
for i in range(5): # Spell 5 characters
# Collect ERP features from row/column flashes
row_features = collect_row_erps() # Your acquisition function
col_features = collect_col_erps() # Your acquisition function
character, confidence = spell_character(row_features, col_features)
if confidence > 0.7:
word += character
print(f"Spelled: {character} (confidence: {confidence:.2f})")
else:
print("Low confidence - retry")
print(f"\nSpelled word: {word}")
Copy
using NimbusSDK
NimbusSDK.install_core("your-api-key")
model = load_model(RxLDAModel, "p300_speller_v1")
# Spelling matrix (6x6 for 36 characters)
matrix = reshape(collect('A':'Z') ∪ collect('0':'9'), 6, 6)
function spell_character(row_col_trials)
"""
Spell one character using row/column paradigm
row_col_trials: Dict with :rows and :cols EEG data
"""
# Analyze row flashes
row_data = BCIData(row_col_trials[:rows], metadata)
row_results = predict_batch(model, row_data)
target_row = argmax([mean(row_results.posteriors[1, i:i+5]) for i in 1:6:30])
# Analyze column flashes
col_data = BCIData(row_col_trials[:cols], metadata)
col_results = predict_batch(model, col_data)
target_col = argmax([mean(col_results.posteriors[1, i:i+5]) for i in 1:6:30])
# Get character at intersection
character = matrix[target_row, target_col]
confidence = (row_results.confidences[target_row] + col_results.confidences[target_col]) / 2
return (character=character, confidence=confidence)
end
# Spell a word
word = ""
for _ in 1:5 # Spell 5 characters
trials = collect_p300_trials() # Your acquisition function
result = spell_character(trials)
if result.confidence > 0.7
word *= result.character
println("Spelled: $(result.character) (confidence: $(round(result.confidence, digits=2)))")
else
println("Low confidence - retry")
end
end
println("\nSpelled word: $word")
Real-Time Streaming Examples
Streaming Motor Imagery Control
Real-time cursor control with streaming inference:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA, StreamingSession
from nimbus_bci.data import BCIMetadata
import numpy as np
# Train or load model
clf = NimbusLDA()
clf.fit(X_train, y_train)
# Configure streaming metadata
metadata = BCIMetadata(
sampling_rate=250.0,
paradigm="motor_imagery",
feature_type="csp",
n_features=16,
n_classes=4,
chunk_size=250 # 1-second chunks
)
# Initialize streaming session
session = StreamingSession(clf.model_, metadata)
# Process real-time EEG chunks
cursor_position = [0.0, 0.0] # [x, y]
movement_speed = 0.05
for chunk in eeg_stream: # Your real-time acquisition loop
# Process chunk (16 features × 250 samples)
chunk_result = session.process_chunk(chunk)
# Update cursor based on prediction
if chunk_result.confidence > 0.7:
if chunk_result.prediction == 0: # Left hand
cursor_position[0] -= movement_speed
elif chunk_result.prediction == 1: # Right hand
cursor_position[0] += movement_speed
elif chunk_result.prediction == 2: # Feet
cursor_position[1] -= movement_speed
elif chunk_result.prediction == 3: # Tongue
cursor_position[1] += movement_speed
update_cursor_display(cursor_position)
# Reset for next chunk
session.reset()
Copy
using NimbusSDK
# Setup
NimbusSDK.install_core("your-api-key")
model = load_model(RxLDAModel, "motor_imagery_4class_v1")
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4,
chunk_size = 250 # 1-second chunks
)
# Initialize streaming session
session = init_streaming(model, metadata)
# Process real-time EEG chunks
cursor_position = [0.0, 0.0] # [x, y]
movement_speed = 0.05
for chunk in eeg_stream
# Process chunk (16 features × 250 samples)
chunk_result = process_chunk(session, chunk)
# Update cursor based on prediction
if chunk_result.confidence > 0.7
if chunk_result.prediction == 1 # Left hand
cursor_position[1] -= movement_speed
elseif chunk_result.prediction == 2 # Right hand
cursor_position[1] += movement_speed
elseif chunk_result.prediction == 3 # Feet
cursor_position[2] -= movement_speed
elseif chunk_result.prediction == 4 # Tongue
cursor_position[2] += movement_speed
end
update_cursor_display(cursor_position)
end
end
Adaptive Threshold Control
Adjust control sensitivity based on performance:- Python
- Julia
Copy
from nimbus_bci import StreamingSession
from collections import deque
import numpy as np
# Setup streaming
session = StreamingSession(clf.model_, metadata)
# Performance tracker
class OnlinePerformanceTracker:
def __init__(self, window_size=20):
self.window_size = window_size
self.predictions = deque(maxlen=window_size)
self.ground_truths = deque(maxlen=window_size)
def update(self, prediction, ground_truth):
self.predictions.append(prediction)
self.ground_truths.append(ground_truth)
if len(self.predictions) > 0:
accuracy = np.mean(np.array(self.predictions) == np.array(self.ground_truths))
return accuracy
return 0.0
tracker = OnlinePerformanceTracker(window_size=20)
# Adaptive parameters
confidence_threshold = 0.7
update_interval = 10
trial_count = 0
for trial in trials:
trial_count += 1
# Process trial chunks
for chunk in collect_trial_chunks():
result = session.process_chunk(chunk)
# Execute command if confidence exceeds threshold
if result.confidence > confidence_threshold:
execute_command(result.prediction)
final_result = session.finalize_trial()
# Update tracker with ground truth (if available)
if ground_truth is not None:
accuracy = tracker.update(final_result.prediction, ground_truth)
# Adapt threshold every 10 trials
if trial_count % update_interval == 0:
if accuracy > 0.85:
# High accuracy - can lower threshold for faster response
confidence_threshold = max(0.6, confidence_threshold * 0.95)
print(f"Lowering threshold to {confidence_threshold:.2f}")
elif accuracy < 0.65:
# Low accuracy - raise threshold for reliability
confidence_threshold = min(0.85, confidence_threshold * 1.05)
print(f"Raising threshold to {confidence_threshold:.2f}")
session.reset()
Copy
using NimbusSDK
# Setup streaming
session = init_streaming(model, metadata)
tracker = OnlinePerformanceTracker(window_size=20)
# Adaptive parameters
confidence_threshold = 0.7
update_interval = 10
trial_count = 0
for trial in trials
trial_count += 1
# Process trial
for chunk in collect_trial_chunks()
result = process_chunk(session, chunk)
# Execute command if confidence exceeds threshold
if result.confidence > confidence_threshold
execute_command(result.prediction)
end
end
final_result = finalize_trial(session)
# Update tracker with ground truth (if available)
if !isnothing(ground_truth)
metrics = update_and_report!(tracker, final_result.prediction, ground_truth, final_result.confidence)
# Adapt threshold every 10 trials
if trial_count % update_interval == 0
if metrics.accuracy > 0.85
# High accuracy - can lower threshold for faster response
confidence_threshold = max(0.6, confidence_threshold * 0.95)
println("Lowering threshold to $(round(confidence_threshold, digits=2))")
elseif metrics.accuracy < 0.65
# Low accuracy - raise threshold for reliability
confidence_threshold = min(0.85, confidence_threshold * 1.05)
println("Raising threshold to $(round(confidence_threshold, digits=2))")
end
end
end
end
Subject-Specific Calibration
Quick Calibration for New Users
Personalize a model with minimal calibration data:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
from sklearn.model_selection import train_test_split
import numpy as np
import pickle
# Load or use a pre-trained baseline model
# For Python SDK, we can use online learning (partial_fit)
baseline_clf = NimbusLDA()
# Train baseline on large dataset
baseline_clf.fit(X_baseline, y_baseline)
# Collect quick calibration (10-20 trials per class)
print("Collecting calibration data...")
X_calib, y_calib = collect_calibration_trials(trials_per_class=15)
# Split for validation
X_calib_train, X_val, y_calib_train, y_val = train_test_split(
X_calib, y_calib, test_size=0.3, stratify=y_calib, random_state=42
)
# Personalize model using online learning
print("Calibrating model...")
personalized_clf = NimbusLDA()
personalized_clf.fit(X_baseline, y_baseline) # Start with baseline
# Fine-tune on calibration data
for _ in range(5): # Multiple passes through calibration
personalized_clf.partial_fit(X_calib_train, y_calib_train)
# Save personalized model
with open("subject_123_personalized.pkl", "wb") as f:
pickle.dump(personalized_clf, f)
# Test improvement
print("\nTesting on validation data...")
baseline_pred = baseline_clf.predict(X_val)
personalized_pred = personalized_clf.predict(X_val)
baseline_acc = np.mean(baseline_pred == y_val)
personalized_acc = np.mean(personalized_pred == y_val)
print(f"Baseline accuracy: {baseline_acc * 100:.1f}%")
print(f"Personalized accuracy: {personalized_acc * 100:.1f}%")
print(f"Improvement: +{(personalized_acc - baseline_acc) * 100:.1f}%")
Copy
using NimbusSDK
NimbusSDK.install_core("your-api-key")
# Load baseline model
baseline_model = load_model(RxLDAModel, "motor_imagery_baseline_v1")
# Collect quick calibration (10-20 trials per class)
println("Collecting calibration data...")
calib_features, calib_labels = collect_calibration_trials(trials_per_class=15)
calib_data = BCIData(
calib_features,
BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4
),
calib_labels
)
# Calibrate model (much faster than training from scratch)
println("Calibrating model...")
personalized_model = calibrate_model(
baseline_model,
calib_data;
iterations = 20 # Fewer iterations needed
)
# Save personalized model
save_model(personalized_model, "subject_123_personalized.jld2")
# Test improvement
println("\nTesting on validation data...")
baseline_results = predict_batch(baseline_model, validation_data)
personalized_results = predict_batch(personalized_model, validation_data)
baseline_acc = sum(baseline_results.predictions .== val_labels) / length(val_labels)
personalized_acc = sum(personalized_results.predictions .== val_labels) / length(val_labels)
println("Baseline accuracy: $(round(baseline_acc * 100, digits=1))%")
println("Personalized accuracy: $(round(personalized_acc * 100, digits=1))%")
println("Improvement: $(round((personalized_acc - baseline_acc) * 100, digits=1))%")
Quality Assessment and Diagnostics
Identify Poor Quality Trials
- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
import numpy as np
# Run inference
predictions = clf.predict(X)
probabilities = clf.predict_proba(X)
confidences = np.max(probabilities, axis=1)
# Identify low-confidence trials
low_conf_threshold = 0.65
low_conf_indices = np.where(confidences < low_conf_threshold)[0]
print(f"Low confidence trials: {len(low_conf_indices)}/{len(predictions)}")
print(f"Indices: {low_conf_indices}")
# Analyze patterns in low-confidence trials
if len(low_conf_indices) > 0:
print("\nLow confidence trial analysis:")
for class_id, class_name in enumerate(["Left", "Right", "Feet", "Tongue"]):
class_low_conf = np.sum(y[low_conf_indices] == class_id)
print(f" {class_name}: {class_low_conf} trials")
# Overall quality assessment
mean_confidence = np.mean(confidences)
print(f"\nMean confidence: {mean_confidence:.2f}")
if mean_confidence > 0.8:
print("Recommendation: High quality - proceed with deployment")
elif mean_confidence > 0.65:
print("Recommendation: Acceptable quality - monitor performance")
else:
print("Recommendation: Poor quality - recalibrate or retrain")
Copy
using NimbusSDK
# Run inference
results = predict_batch(model, data)
# Identify low-confidence trials
low_conf_threshold = 0.65
low_conf_indices = findall(results.confidences .< low_conf_threshold)
println("Low confidence trials: $(length(low_conf_indices))/$(length(results.predictions))")
println("Indices: $low_conf_indices")
# Analyze patterns in low-confidence trials
if !isempty(low_conf_indices)
println("\nLow confidence trial analysis:")
for (class_id, class_name) in enumerate(["Left", "Right", "Feet", "Tongue"])
class_low_conf = sum(labels[low_conf_indices] .== class_id)
println(" $class_name: $class_low_conf trials")
end
end
# Overall quality assessment
quality = assess_trial_quality(results)
println("\nOverall quality score: $(round(quality.overall_score, digits=2))")
println("Recommendation: $(quality.recommendation)")
Preprocessing Quality Check
- Python
- Julia
Copy
import numpy as np
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
# Check for common preprocessing issues
def diagnose_preprocessing(X, y):
"""
Diagnose common preprocessing quality issues.
Args:
X: Feature matrix (n_samples, n_features)
y: Labels (n_samples,)
Returns:
Dictionary with quality metrics and recommendations
"""
errors = []
warnings = []
recommendations = []
# Check for NaN/Inf
if np.any(np.isnan(X)) or np.any(np.isinf(X)):
errors.append("NaN or Inf values detected in features")
# Check feature variance
variances = np.var(X, axis=0)
low_var_features = np.sum(variances < 1e-6)
if low_var_features > 0:
warnings.append(f"{low_var_features} features have very low variance")
recommendations.append("Consider removing low-variance features")
# Check feature scale
means = np.mean(X, axis=0)
stds = np.std(X, axis=0)
if np.max(stds) / np.min(stds[stds > 0]) > 100:
warnings.append("Features have very different scales")
recommendations.append("Apply feature normalization (z-score or min-max)")
# Feature discriminability (using LDA)
lda = LinearDiscriminantAnalysis()
lda.fit(X, y)
feature_scores = np.abs(lda.coef_).mean(axis=0)
# Quality score
quality_score = 1.0
quality_score -= 0.5 * (len(errors) > 0)
quality_score -= 0.1 * len(warnings)
return {
'errors': errors,
'warnings': warnings,
'recommendations': recommendations,
'feature_scores': feature_scores,
'quality_score': quality_score
}
# Run diagnosis
report = diagnose_preprocessing(X, y)
print("Preprocessing Quality Report")
print("=" * 50)
print(f"Overall score: {report['quality_score'] * 100:.1f}%")
if report['errors']:
print("\n⚠️ ERRORS:")
for error in report['errors']:
print(f" • {error}")
if report['warnings']:
print("\n⚠️ WARNINGS:")
for warning in report['warnings']:
print(f" • {warning}")
if report['recommendations']:
print("\n💡 RECOMMENDATIONS:")
for rec in report['recommendations']:
print(f" • {rec}")
# Top discriminative features
print("\nTop discriminative features:")
top_features = np.argsort(report['feature_scores'])[-5:][::-1]
for rank, feat_idx in enumerate(top_features, 1):
print(f" {rank}. Feature {feat_idx} (score: {report['feature_scores'][feat_idx]:.3f})")
Copy
using NimbusSDK
# Diagnose preprocessing quality
report = diagnose_preprocessing(data)
println("Preprocessing Quality Report")
println("="^50)
println("Overall score: $(round(report.quality_score * 100, digits=1))%")
if !isempty(report.errors)
println("\n⚠️ ERRORS:")
for error in report.errors
println(" • $error")
end
end
if !isempty(report.warnings)
println("\n⚠️ WARNINGS:")
for warning in report.warnings
println(" • $warning")
end
end
if !isempty(report.recommendations)
println("\n💡 RECOMMENDATIONS:")
for rec in report.recommendations
println(" • $rec")
end
end
# Feature discriminability analysis
if hasfield(typeof(report), :feature_scores)
println("\nTop discriminative features:")
top_features = sortperm(report.feature_scores, rev=true)[1:5]
for (rank, feat_idx) in enumerate(top_features)
println(" $rank. Feature $feat_idx (score: $(round(report.feature_scores[feat_idx], digits=3)))")
end
end
Performance Comparison
RxLDA vs RxGMM
Compare the two models:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA, NimbusGMM
import numpy as np
# Train both models on same data
print("Training NimbusLDA...")
clf_lda = NimbusLDA()
clf_lda.fit(X_train, y_train)
print("\nTraining NimbusGMM...")
clf_gmm = NimbusGMM()
clf_gmm.fit(X_train, y_train)
# Test both models
print("\n" + "=" * 50)
print("Model Comparison")
print("=" * 50)
for clf, name in [(clf_lda, "NimbusLDA"), (clf_gmm, "NimbusGMM")]:
predictions = clf.predict(X_test)
probabilities = clf.predict_proba(X_test)
accuracy = np.mean(predictions == y_test)
mean_conf = np.mean(np.max(probabilities, axis=1))
print(f"\n{name}:")
print(f" Accuracy: {accuracy * 100:.1f}%")
print(f" Mean confidence: {mean_conf:.3f}")
# Per-class performance
from sklearn.metrics import classification_report
print("\n Per-class metrics:")
print(classification_report(y_test, predictions,
target_names=["Left", "Right", "Feet", "Tongue"],
zero_division=0))
print("\nRecommendation:")
print(" • NimbusLDA: Faster, good for well-separated classes")
print(" • NimbusGMM: More flexible, better for overlapping distributions")
Copy
using NimbusSDK
NimbusSDK.install_core("your-api-key")
# Train both models on same data
println("Training RxLDA...")
rxlda = train_model(RxLDAModel, train_data; iterations=50, showprogress=true)
println("\nTraining RxGMM...")
rxgmm = train_model(RxGMMModel, train_data; iterations=50, showprogress=true)
# Test both models
println("\n" * "="^50)
println("Model Comparison")
println("="^50)
for (model, name) in [(rxlda, "RxLDA"), (rxgmm, "RxGMM")]
results = predict_batch(model, test_data)
accuracy = sum(results.predictions .== test_labels) / length(test_labels)
mean_conf = mean(results.confidences)
itr = calculate_ITR(accuracy, 4, 4.0)
println("\n$name:")
println(" Accuracy: $(round(accuracy * 100, digits=1))%")
println(" Mean confidence: $(round(mean_conf, digits=3))")
println(" ITR: $(round(itr, digits=1)) bits/min")
end
println("\nRecommendation:")
println(" • RxLDA: Faster, good for well-separated classes")
println(" • RxGMM: More flexible, better for overlapping distributions")
Next Steps
Advanced Examples
More sophisticated BCI applications
Basic Examples
Complete working code examples
RxLDA Model
Detailed RxLDA documentation
RxGMM Model
Detailed RxGMM documentation
Batch Processing
Offline batch inference guide
Streaming Inference
Real-time streaming guide
All examples use actual NimbusSDK.jl functions and the real RxLDA/RxGMM models. No mocked or simulated code.