Advanced BCI Applications
This section demonstrates sophisticated applications of Nimbus for complex BCI scenarios. Examples shown in both Python and Julia.Python SDK: See Streaming Inference and sklearn Integration for Python-specific advanced patterns.
Cross-Subject Model Training
Train models that generalize across multiple subjects.Multi-Subject Training Dataset
- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
from nimbus_bci import estimate_normalization_params, apply_normalization
import numpy as np
# Collect data from multiple subjects
subjects = ["S001", "S002", "S003", "S004", "S005"]
all_features = []
all_labels = []
for subject_id in subjects:
print(f"Loading subject {subject_id}...")
subj_features, subj_labels = load_subject_data(subject_id)
all_features.append(subj_features)
all_labels.append(subj_labels)
# Concatenate all subject data
X = np.vstack(all_features)
y = np.concatenate(all_labels)
print(f"\nCombined dataset:")
print(f" Total trials: {X.shape[0]}")
print(f" Features: {X.shape[1]}")
print(f" Subjects: {len(subjects)}")
# Normalize across all subjects
norm_params = estimate_normalization_params(X, method="zscore")
X_norm = apply_normalization(X, norm_params)
# Train cross-subject model
clf = NimbusLDA(mu_scale=5.0) # Higher regularization for cross-subject
clf.fit(X_norm, y)
print(f"Cross-subject model trained")
Copy
using NimbusSDK
NimbusSDK.install_core("your-api-key")
# Collect data from multiple subjects
subjects = ["S001", "S002", "S003", "S004", "S005"]
all_features = []
all_labels = []
for subject_id in subjects
println("Loading subject $subject_id...")
subj_features, subj_labels = load_subject_data(subject_id)
push!(all_features, subj_features)
push!(all_labels, subj_labels)
end
# Concatenate all subject data
combined_features = cat(all_features..., dims=3)
combined_labels = vcat(all_labels...)
println("\nCombined dataset:")
println(" Total trials: $(size(combined_features, 3))")
println(" Features: $(size(combined_features, 1))")
println(" Subjects: $(length(subjects))")
# Train cross-subject model
train_data = BCIData(
combined_features,
BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
Transfer to New Subject
Use cross-subject model as baseline for new subjects:- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
import numpy as np
import pickle
# Load cross-subject baseline
with open("cross_subject_model.pkl", "rb") as f:
baseline_clf = pickle.load(f)
# Collect minimal calibration from new subject (Subject 006)
X_calib, y_calib = collect_calibration_trials(
subject_id="S006",
trials_per_class=12 # Only 12 trials per class needed!
)
# Personalize using online learning
personalized_clf = NimbusLDA()
personalized_clf.fit(X_baseline, y_baseline) # Initialize with baseline
# Fine-tune on new subject's calibration data
for _ in range(10): # Multiple passes for adaptation
personalized_clf.partial_fit(X_calib, y_calib)
# Test on new subject
X_test_new, y_test_new = load_subject_test_data("S006")
predictions = personalized_clf.predict(X_test_new)
accuracy = np.mean(predictions == y_test_new)
print(f"New subject accuracy: {accuracy * 100:.1f}%")
print(f"(With only 48 calibration trials total!)")
Copy
using NimbusSDK
# Load cross-subject baseline
baseline = load_model(RxLDAModel, "cross_subject_model.jld2")
# Collect minimal calibration from new subject (Subject 006)
new_subj_calib = collect_calibration_trials(
subject_id="S006",
trials_per_class=12 # Only 12 trials per class needed!
)
# Calibrate for new subject
personalized = calibrate_model(
baseline,
new_subj_calib;
iterations = 20
)
# Test on new subject
test_data_new = load_subject_test_data("S006")
results = predict_batch(personalized, test_data_new)
accuracy = sum(results.predictions .== test_data_new.labels) / length(test_data_new.labels)
println("New subject accuracy: $(round(accuracy * 100, digits=1))%")
println("(With only 48 calibration trials total!)")
Hybrid BCI: Combining Multiple Paradigms
Combine motor imagery and P300 for robust control.Dual-Paradigm System
- Python
- Julia
Copy
from nimbus_bci import NimbusLDA, NimbusGMM
from dataclasses import dataclass
from typing import Optional
import numpy as np
import pickle
# Load both models
with open("motor_imagery_4class.pkl", "rb") as f:
mi_clf = pickle.load(f)
with open("p300_binary.pkl", "rb") as f:
p300_clf = pickle.load(f)
@dataclass
class HybridBCIResult:
mi_prediction: int
mi_confidence: float
p300_confirmed: bool
p300_confidence: float
final_action: Optional[int]
def hybrid_inference(mi_features, p300_features):
"""
Combined motor imagery + P300 for high-confidence control.
Args:
mi_features: Motor imagery features (1, n_features)
p300_features: P300 ERP features (1, n_features)
Returns:
HybridBCIResult with combined inference
"""
# Motor imagery: Determine intended direction
mi_pred = mi_clf.predict(mi_features)[0]
mi_probs = mi_clf.predict_proba(mi_features)[0]
mi_conf = np.max(mi_probs)
# P300: Confirm intention
p300_pred = p300_clf.predict(p300_features)[0]
p300_probs = p300_clf.predict_proba(p300_features)[0]
p300_confirmed = (p300_pred == 1) # Target detected
p300_conf = p300_probs[1] if p300_confirmed else p300_probs[0]
# Decision logic: Require both high confidence
final_action = None
if mi_conf > 0.75 and p300_confirmed and p300_conf > 0.75:
final_action = mi_pred
return HybridBCIResult(
mi_prediction=mi_pred,
mi_confidence=mi_conf,
p300_confirmed=p300_confirmed,
p300_confidence=p300_conf,
final_action=final_action
)
# Example usage: Wheelchair control
def hybrid_wheelchair_control():
while is_active():
# Collect motor imagery trial (direction intent)
mi_features = collect_mi_trial()
# Collect P300 confirmation trial
p300_features = collect_p300_confirmation()
result = hybrid_inference(mi_features, p300_features)
if result.final_action is not None:
print(f"✓ Confirmed action: {result.final_action}")
execute_wheelchair_command(result.final_action)
else:
print("⚠️ Low confidence - no action")
print(f" MI: {result.mi_confidence:.2f}")
print(f" P300: {result.p300_confirmed} ({result.p300_confidence:.2f})")
Copy
using NimbusSDK
NimbusSDK.install_core("your-api-key")
# Load both models
mi_model = load_model(RxLDAModel, "motor_imagery_4class_v1") # Bayesian LDA
p300_model = load_model(RxLDAModel, "p300_binary_v1") # Bayesian LDA
struct HybridBCIResult
mi_prediction::Int
mi_confidence::Float64
p300_confirmed::Bool
p300_confidence::Float64
final_action::Union{Int, Nothing}
end
function hybrid_inference(mi_trial, p300_trial)
"""
Combined motor imagery + P300 for high-confidence control
"""
# Motor imagery: Determine intended direction
mi_results = predict_batch(mi_model, mi_trial)
mi_pred = mi_results.predictions[1]
mi_conf = mi_results.confidences[1]
# P300: Confirm intention
p300_results = predict_batch(p300_model, p300_trial)
p300_confirmed = p300_results.predictions[1] == 1 # Target detected
p300_conf = p300_results.confidences[1]
# Decision logic: Require both high confidence
final_action = nothing
if mi_conf > 0.75 && p300_confirmed && p300_conf > 0.75
final_action = mi_pred
end
return HybridBCIResult(
mi_pred,
mi_conf,
p300_confirmed,
p300_conf,
final_action
)
end
# Example usage: Wheelchair control
function hybrid_wheelchair_control()
while is_active()
# Collect motor imagery trial (direction intent)
mi_trial = collect_mi_trial()
# Collect P300 confirmation trial
p300_trial = collect_p300_confirmation()
result = hybrid_inference(mi_trial, p300_trial)
if !isnothing(result.final_action)
println("✓ Confirmed action: $(result.final_action)")
execute_wheelchair_command(result.final_action)
else
println("⚠️ Low confidence - no action")
println(" MI: $(round(result.mi_confidence, digits=2))")
println(" P300: $(result.p300_confirmed) ($(round(result.p300_confidence, digits=2)))")
end
end
end
Continuous Control with Smoothing
Implement smooth continuous control for cursor/prosthetics.Exponential Moving Average
- Python
- Julia
Copy
from nimbus_bci import StreamingSession
from collections import deque
import numpy as np
# Setup streaming
session = StreamingSession(clf.model_, metadata)
# Smoothing filter class
class MovingAverageFilter:
def __init__(self, window_size=5, n_classes=4):
self.predictions = deque(maxlen=window_size)
self.confidences = deque(maxlen=window_size)
self.window_size = window_size
self.n_classes = n_classes
def smooth_prediction(self, new_pred, new_conf):
"""Apply moving average smoothing with confidence weighting."""
self.predictions.append(new_pred)
self.confidences.append(new_conf)
# Weighted voting by confidence
class_scores = np.zeros(self.n_classes)
for pred, conf in zip(self.predictions, self.confidences):
class_scores[pred] += conf
# Smoothed prediction
smoothed_pred = np.argmax(class_scores)
smoothed_conf = np.max(class_scores) / np.sum(class_scores)
return smoothed_pred, smoothed_conf
# Real-time control with smoothing
filter = MovingAverageFilter(window_size=5, n_classes=4)
for chunk in eeg_stream:
# Get raw prediction
raw_result = session.process_chunk(chunk)
# Apply smoothing
smoothed_pred, smoothed_conf = filter.smooth_prediction(
raw_result.prediction, raw_result.confidence
)
# Execute smoothed command
if smoothed_conf > 0.7:
execute_smooth_control(smoothed_pred, smoothed_conf)
session.reset()
Copy
using NimbusSDK
# Setup streaming
session = init_streaming(model, metadata)
# Smoothing parameters
struct MovingAverageFilter
predictions::Vector{Int}
confidences::Vector{Float64}
alpha::Float64 # Smoothing factor (0-1)
window_size::Int
end
function smooth_prediction(filter::MovingAverageFilter, new_pred::Int, new_conf::Float64)
push!(filter.predictions, new_pred)
push!(filter.confidences, new_conf)
# Keep only recent window
if length(filter.predictions) > filter.window_size
popfirst!(filter.predictions)
popfirst!(filter.confidences)
end
# Weighted voting by confidence
class_scores = zeros(4)
for (pred, conf) in zip(filter.predictions, filter.confidences)
class_scores[pred] += conf
end
# Smoothed prediction
smoothed_pred = argmax(class_scores)
smoothed_conf = maximum(class_scores) / sum(class_scores)
return (prediction=smoothed_pred, confidence=smoothed_conf)
end
# Real-time control with smoothing
filter = MovingAverageFilter(Int[], Float64[], 0.7, 5)
for chunk in eeg_stream
# Get raw prediction
raw_result = process_chunk(session, chunk)
# Apply smoothing
smoothed = smooth_prediction(filter, raw_result.prediction, raw_result.confidence)
# Execute smoothed command
if smoothed.confidence > 0.7
execute_smooth_control(smoothed.prediction, smoothed.confidence)
end
end
Velocity-Based Control
Control velocity instead of discrete actions:- Python
- Julia
Copy
from nimbus_bci import StreamingSession
import numpy as np
def velocity_control(clf, metadata, eeg_stream):
"""Continuous velocity-based BCI control."""
session = StreamingSession(clf.model_, metadata)
velocity = np.array([0.0, 0.0]) # [vx, vy]
max_velocity = 1.0
acceleration = 0.1
friction = 0.95
for chunk in eeg_stream:
result = session.process_chunk(chunk)
if result.confidence > 0.7:
# Map predictions to velocity changes
if result.prediction == 0: # Left
velocity[0] -= acceleration * result.confidence
elif result.prediction == 1: # Right
velocity[0] += acceleration * result.confidence
elif result.prediction == 2: # Up
velocity[1] += acceleration * result.confidence
elif result.prediction == 3: # Down
velocity[1] -= acceleration * result.confidence
# Apply friction
velocity *= friction
# Clamp velocity
velocity = np.clip(velocity, -max_velocity, max_velocity)
# Update position
update_cursor_velocity(velocity)
session.reset()
Copy
using NimbusSDK
function velocity_control(model, eeg_stream)
session = init_streaming(model, metadata)
velocity = [0.0, 0.0] # [vx, vy]
max_velocity = 1.0
acceleration = 0.1
friction = 0.95
for chunk in eeg_stream
result = process_chunk(session, chunk)
if result.confidence > 0.7
# Map predictions to velocity changes
if result.prediction == 1 # Left
velocity[1] -= acceleration * result.confidence
elseif result.prediction == 2 # Right
velocity[1] += acceleration * result.confidence
elseif result.prediction == 3 # Up
velocity[2] += acceleration * result.confidence
elseif result.prediction == 4 # Down
velocity[2] -= acceleration * result.confidence
end
end
# Apply friction
velocity .*= friction
# Clamp velocity
velocity = clamp.(velocity, -max_velocity, max_velocity)
# Update position
update_cursor_velocity(velocity)
end
end
Adaptive Learning During Use
Update model parameters online based on implicit feedback.Online Model Adaptation
- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
import numpy as np
# Start with baseline classifier
clf = NimbusLDA()
clf.fit(X_baseline, y_baseline)
# Collect online data with implicit labels
online_features = []
online_labels = []
adaptation_interval = 20 # Adapt every 20 trials
for trial_idx in range(1, 201):
# Collect trial
trial_features = collect_trial() # (1, n_features)
# Get prediction
prediction = clf.predict(trial_features)[0]
proba = clf.predict_proba(trial_features)[0]
confidence = np.max(proba)
# Implicit labeling: Assume high-confidence predictions are correct
if confidence > 0.85:
online_features.append(trial_features[0])
online_labels.append(prediction)
print(f"Trial {trial_idx}: Confident prediction added to adaptation set")
# Periodic adaptation using partial_fit
if trial_idx % adaptation_interval == 0 and len(online_labels) >= 10:
print(f"\n=== Adapting model at trial {trial_idx} ===")
# Create adaptation dataset
X_adapt = np.array(online_features)
y_adapt = np.array(online_labels)
# Online learning: update model with new data
for _ in range(5): # Multiple passes for better adaptation
clf.partial_fit(X_adapt, y_adapt)
print(f"Model adapted with {len(online_labels)} trials")
# Keep sliding window (keep last 10 trials)
if len(online_features) > 10:
online_features = online_features[-10:]
online_labels = online_labels[-10:]
Copy
using NimbusSDK
# Start with baseline Bayesian LDA model
model = load_model(RxLDAModel, "motor_imagery_4class_v1")
# Collect online data with implicit labels
online_features = []
online_labels = []
adaptation_interval = 20 # Adapt every 20 trials
for trial_idx in 1:200
# Collect trial
trial_features = collect_trial()
# Get prediction
trial_data = BCIData(trial_features, metadata)
result = predict_batch(model, trial_data)
# Implicit labeling: Assume high-confidence predictions are correct
if result.confidences[1] > 0.85
push!(online_features, trial_features)
push!(online_labels, result.predictions[1])
println("Trial $trial_idx: Confident prediction added to adaptation set")
end
# Periodic adaptation
if trial_idx % adaptation_interval == 0 && length(online_labels) >= 10
println("\n=== Adapting model at trial $trial_idx ===")
# Create adaptation dataset
adapt_features = cat(online_features..., dims=3)
adapt_data = BCIData(adapt_features, metadata, online_labels)
# Calibrate with online data
model = calibrate_model(model, adapt_data; iterations=15)
println("Model adapted with $(length(online_labels)) trials")
# Clear buffer (or keep sliding window)
online_features = online_features[end-10:end]
online_labels = online_labels[end-10:end]
end
end
Multi-Session Experiment Design
Run systematic experiments across multiple sessions.Experiment Framework
- Python
- Julia
Copy
from nimbus_bci import NimbusLDA, NimbusGMM
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import numpy as np
import pickle
@dataclass
class ExperimentSession:
session_id: str
subject_id: str
date: datetime
model_type: str # 'lda' or 'gmm'
n_trials: int
predictions: Optional[np.ndarray] = None
probabilities: Optional[np.ndarray] = None
accuracy: Optional[float] = None
mean_confidence: Optional[float] = None
def run_experiment_session(
subject_id: str,
session_id: str,
model_type: str,
n_trials: int
):
"""Run a single experiment session."""
print("\n" + "=" * 60)
print(f"Experiment Session: {session_id}")
print(f"Subject: {subject_id}")
print(f"Model: {model_type}")
print(f"Date: {datetime.now()}")
print("=" * 60)
# Load appropriate model
if model_type == 'lda':
with open("motor_imagery_4class_lda.pkl", "rb") as f:
clf = pickle.load(f)
else:
with open("motor_imagery_4class_gmm.pkl", "rb") as f:
clf = pickle.load(f)
# Collect data
print(f"\nCollecting {n_trials} trials...")
X, y = collect_experiment_trials(n_trials)
# Run inference
print("Running inference...")
predictions = clf.predict(X)
probabilities = clf.predict_proba(X)
# Calculate metrics
accuracy = np.mean(predictions == y)
mean_confidence = np.mean(np.max(probabilities, axis=1))
print("\n" + "=" * 60)
print("Session Results")
print("=" * 60)
print(f"Accuracy: {accuracy * 100:.1f}%")
print(f"Mean confidence: {mean_confidence:.3f}")
print("=" * 60)
# Return session data
return ExperimentSession(
session_id=session_id,
subject_id=subject_id,
date=datetime.now(),
model_type=model_type,
n_trials=n_trials,
predictions=predictions,
probabilities=probabilities,
accuracy=accuracy,
mean_confidence=mean_confidence
)
Copy
using NimbusSDK
using Dates
struct ExperimentSession
session_id::String
subject_id::String
date::DateTime
model_type::Symbol # :rxlda or :rxgmm
n_trials::Int
results::Union{Nothing, BatchResult}
accuracy::Union{Nothing, Float64}
itr::Union{Nothing, Float64}
end
function run_experiment_session(
subject_id::String,
session_id::String,
model_type::Symbol,
n_trials::Int
)
println("\n" * "="^60)
println("Experiment Session: $session_id")
println("Subject: $subject_id")
println("Model: $model_type")
println("Date: $(Dates.now())")
println("="^60)
# Authenticate
NimbusSDK.install_core("your-api-key")
# Load appropriate model
model = if model_type == :rxlda
load_model(RxLDAModel, "motor_imagery_4class_v1") # Bayesian LDA
else
load_model(RxGMMModel, "motor_imagery_4class_gmm_v1") # Bayesian GMM
end
# Collect data
println("\nCollecting $n_trials trials...")
features, labels = collect_experiment_trials(n_trials)
data = BCIData(
features,
BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4
),
labels
)
# Run inference
println("Running inference...")
results = predict_batch(model, data)
# Calculate metrics
accuracy = sum(results.predictions .== labels) / length(labels)
itr = calculate_ITR(accuracy, 4, 4.0)
println("\n" * "="^60)
println("Session Results")
println("="^60)
println("Accuracy: $(round(accuracy * 100, digits=1))%")
println("ITR: $(round(itr, digits=1)) bits/minute")
println("Mean confidence: $(round(mean(results.confidences), digits=3))")
println("="^60)
# Return session data
return ExperimentSession(
session_id,
subject_id,
Dates.now(),
model_type,
n_trials,
results,
accuracy,
itr
)
end
- Python
- Julia
Copy
import numpy as np
def run_multi_session_experiment():
"""Run experiments across multiple subjects and sessions."""
subjects = ["S001", "S002", "S003"]
sessions = []
for subject in subjects:
# Session 1: NimbusLDA baseline
session1 = run_experiment_session(
subject_id=subject,
session_id=f"{subject}_session1_lda",
model_type="lda",
n_trials=80
)
sessions.append(session1)
# Session 2: NimbusGMM comparison
session2 = run_experiment_session(
subject_id=subject,
session_id=f"{subject}_session2_gmm",
model_type="gmm",
n_trials=80
)
sessions.append(session2)
# Aggregate results
print("\n\n" + "=" * 60)
print("Experiment Summary")
print("=" * 60)
for session in sessions:
print(f"\n{session.session_id}:")
print(f" Accuracy: {session.accuracy * 100:.1f}%")
print(f" Mean confidence: {session.mean_confidence:.3f}")
# Statistical analysis
lda_accs = [s.accuracy for s in sessions if s.model_type == 'lda']
gmm_accs = [s.accuracy for s in sessions if s.model_type == 'gmm']
print("\n" + "=" * 60)
print("Model Comparison")
print("=" * 60)
print(f"NimbusLDA mean accuracy: {np.mean(lda_accs) * 100:.1f}% ± {np.std(lda_accs) * 100:.1f}%")
print(f"NimbusGMM mean accuracy: {np.mean(gmm_accs) * 100:.1f}% ± {np.std(gmm_accs) * 100:.1f}%")
return sessions
# Run the experiment
sessions = run_multi_session_experiment()
Copy
# Run multi-session experiment
function run_multi_session_experiment()
subjects = ["S001", "S002", "S003"]
sessions = []
for subject in subjects
# Session 1: RxLDA baseline
session1 = run_experiment_session(
subject,
"$(subject)_session1_rxlda",
:rxlda, # Bayesian LDA
80
)
push!(sessions, session1)
# Session 2: RxGMM comparison
session2 = run_experiment_session(
subject,
"$(subject)_session2_rxgmm",
:rxgmm, # Bayesian GMM
80
)
push!(sessions, session2)
end
# Aggregate results
println("\n\n" * "="^60)
println("Experiment Summary")
println("="^60)
for session in sessions
println("\n$(session.session_id):")
println(" Accuracy: $(round(session.accuracy * 100, digits=1))%")
println(" ITR: $(round(session.itr, digits=1)) bits/min")
end
# Statistical analysis
rxlda_accs = [s.accuracy for s in sessions if s.model_type == :rxlda]
rxgmm_accs = [s.accuracy for s in sessions if s.model_type == :rxgmm]
println("\n" * "="^60)
println("Model Comparison")
println("="^60)
println("Bayesian LDA mean accuracy: $(round(mean(rxlda_accs) * 100, digits=1))% ± $(round(std(rxlda_accs) * 100, digits=1))%")
println("Bayesian GMM mean accuracy: $(round(mean(rxgmm_accs) * 100, digits=1))% ± $(round(std(rxgmm_accs) * 100, digits=1))%")
return sessions
end
Error Recovery and Robustness
Handle errors and maintain reliable operation.Automatic Retry with Fallback
- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
from dataclasses import dataclass
from typing import Optional
import time
import numpy as np
@dataclass
class InferenceResult:
success: bool
predictions: Optional[np.ndarray]
probabilities: Optional[np.ndarray]
attempts: int
def robust_inference(clf, X, max_retries=3, confidence_threshold=0.65):
"""Inference with automatic retry and fallback."""
for attempt in range(1, max_retries + 1):
try:
predictions = clf.predict(X)
probabilities = clf.predict_proba(X)
# Quality check
mean_confidence = np.mean(np.max(probabilities, axis=1))
if mean_confidence > confidence_threshold:
return InferenceResult(
success=True,
predictions=predictions,
probabilities=probabilities,
attempts=attempt
)
else:
print(f"⚠️ Low quality results (confidence: {mean_confidence:.3f}) on attempt {attempt}")
if attempt < max_retries:
print("Retrying...")
continue
except Exception as e:
print(f"❌ Inference failed: {e} (attempt {attempt})")
if attempt < max_retries:
print("Retrying after error...")
time.sleep(0.1) # Brief pause before retry
continue
else:
# All retries exhausted
return InferenceResult(
success=False,
predictions=None,
probabilities=None,
attempts=attempt
)
# If we get here, all retries failed
return InferenceResult(
success=False,
predictions=None,
probabilities=None,
attempts=max_retries
)
# Usage
result = robust_inference(clf, X)
if result.success:
print(f"✓ Inference successful after {result.attempts} attempt(s)")
process_results(result.predictions)
else:
print(f"⚠️ Inference failed after {result.attempts} attempts")
handle_failure()
Copy
using NimbusSDK
function robust_inference(model, data; max_retries=3)
"""
Inference with automatic retry and fallback
"""
for attempt in 1:max_retries
try
results = predict_batch(model, data)
# Quality check
quality = assess_trial_quality(results)
if quality.confidence_acceptable
return (success=true, results=results, attempts=attempt)
else
@warn "Low quality results" quality.overall_score attempt
if attempt < max_retries
println("Retrying...")
continue
end
end
catch e
@error "Inference failed" exception=e attempt
if attempt < max_retries
println("Retrying after error...")
sleep(0.1) # Brief pause before retry
continue
else
# All retries exhausted
return (success=false, results=nothing, attempts=attempt)
end
end
end
# If we get here, all retries failed
return (success=false, results=nothing, attempts=max_retries)
end
# Usage
result = robust_inference(model, data)
if result.success
println("Inference successful after $(result.attempts) attempt(s)")
process_results(result.results)
else
println("⚠️ Inference failed after $(result.attempts) attempts")
handle_failure()
end
Session Health Monitoring
- Python
- Julia
Copy
from nimbus_bci import NimbusLDA
from dataclasses import dataclass, field
from collections import deque
import numpy as np
@dataclass
class SessionHealth:
trial_count: int = 0
success_count: int = 0
failure_count: int = 0
mean_confidence: float = 0.0
recent_accuracies: list = field(default_factory=list)
def monitor_session_health(clf, eeg_stream, health_window=10):
"""Monitor BCI session health with real-time metrics."""
health = SessionHealth()
recent_acc_window = deque(maxlen=health_window)
for trial in eeg_stream:
health.trial_count += 1
try:
# Run inference
X_trial = trial['features'].reshape(1, -1)
y_trial = trial.get('label')
prediction = clf.predict(X_trial)[0]
proba = clf.predict_proba(X_trial)[0]
confidence = np.max(proba)
# Update health metrics
health.success_count += 1
health.mean_confidence = (
(health.mean_confidence * (health.success_count - 1) + confidence) /
health.success_count
)
# Track recent accuracy
if y_trial is not None:
correct = (prediction == y_trial)
recent_acc_window.append(float(correct))
# Health checks
if health.mean_confidence < 0.6:
print(f"⚠️ Mean confidence dropping: {health.mean_confidence:.3f}")
if len(recent_acc_window) >= health_window:
recent_acc = np.mean(list(recent_acc_window))
if recent_acc < 0.65:
print(f"⚠️ Recent accuracy low - consider recalibration: {recent_acc:.3f}")
except Exception as e:
health.failure_count += 1
print(f"❌ Trial failed: {e} (trial #{health.trial_count})")
if health.failure_count / health.trial_count > 0.1:
raise RuntimeError("Session failure rate too high - aborting")
# Periodic health report
if health.trial_count % 20 == 0:
print("\n=== Session Health Report ===")
print(f"Trials: {health.trial_count}")
print(f"Success rate: {100 * health.success_count / health.trial_count:.1f}%")
print(f"Mean confidence: {health.mean_confidence:.3f}")
if len(recent_acc_window) > 0:
print(f"Recent accuracy: {np.mean(list(recent_acc_window)) * 100:.1f}%")
print("=" * 30)
return health
Copy
using NimbusSDK
struct SessionHealth
trial_count::Int
success_count::Int
failure_count::Int
mean_confidence::Float64
recent_accuracies::Vector{Float64}
end
function monitor_session_health(
model,
eeg_stream;
health_window=10
)
health = SessionHealth(0, 0, 0, 0.0, Float64[])
for trial in eeg_stream
health.trial_count += 1
try
# Run inference
data = BCIData(trial.features, metadata, trial.label)
results = predict_batch(model, data)
# Update health metrics
health.success_count += 1
health.mean_confidence = (
(health.mean_confidence * (health.success_count - 1) +
results.confidences[1]) / health.success_count
)
# Track recent accuracy
if !isnothing(trial.label)
correct = results.predictions[1] == trial.label
push!(health.recent_accuracies, Float64(correct))
if length(health.recent_accuracies) > health_window
popfirst!(health.recent_accuracies)
end
end
# Health checks
if health.mean_confidence < 0.6
@warn "Mean confidence dropping" health.mean_confidence
end
if length(health.recent_accuracies) >= health_window
recent_acc = mean(health.recent_accuracies)
if recent_acc < 0.65
@warn "Recent accuracy low - consider recalibration" recent_acc
end
end
catch e
health.failure_count += 1
@error "Trial failed" exception=e trial_num=health.trial_count
if health.failure_count / health.trial_count > 0.1
error("Session failure rate too high - aborting")
end
end
# Periodic health report
if health.trial_count % 20 == 0
println("\n=== Session Health Report ===")
println("Trials: $(health.trial_count)")
println("Success rate: $(round(100 * health.success_count / health.trial_count, digits=1))%")
println("Mean confidence: $(round(health.mean_confidence, digits=3))")
if !isempty(health.recent_accuracies)
println("Recent accuracy: $(round(mean(health.recent_accuracies) * 100, digits=1))%")
end
println("="^30)
end
end
return health
end
Next Steps
Basic Examples
Fundamental BCI examples
Basic Examples
Complete working code
Industry Use Cases
Real-world applications
Julia SDK
Complete SDK reference
All examples use actual NimbusSDK.jl functions and real Bayesian LDA (RxLDA) and Bayesian GMM (RxGMM) models. These are production-ready patterns used in real BCI applications.