Streaming Inference for BCI
Streaming inference is essential for real-time BCI applications. NimbusSDK’s streaming mode processes neural data chunk-by-chunk as it arrives, enabling responsive brain-computer interfaces with sub-25ms latency per chunk.
Streaming vs Batch Processing
Batch Processing
Process complete trials offline:
# Batch: Process all trials at once
results = predict_batch(model, all_trials_data)
Best for:
- Offline analysis
- Model training/validation
- Research studies
- Multiple trials available upfront
Streaming Processing
Process chunks as they arrive in real-time:
# Streaming: Process chunks incrementally
session = init_streaming(model, metadata)
for chunk in eeg_stream
chunk_result = process_chunk(session, chunk)
# Immediate response to user
end
final_result = finalize_trial(session)
Best for:
- Real-time BCI control
- Online feedback
- Interactive applications
- Gaming and communication
Setting Up Streaming Inference
Basic Streaming Setup
using NimbusSDK
# 1. Authenticate and load model
NimbusSDK.install_core("your-api-key")
model = load_model(RxLDAModel, "motor_imagery_4class_v1")
# 2. Configure streaming metadata
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4,
chunk_size = 250 # 1 second chunks at 250 Hz
)
# 3. Initialize streaming session
session = init_streaming(model, metadata)
println("✓ Streaming session initialized")
# 4. Process chunks as they arrive
for chunk in your_eeg_stream()
# chunk shape: (n_features × chunk_size)
chunk_result = process_chunk(session, chunk)
println("Chunk prediction: $(chunk_result.prediction)")
println("Confidence: $(round(chunk_result.confidence, digits=3))")
# Provide immediate feedback
if chunk_result.confidence > 0.75
provide_feedback(chunk_result.prediction)
end
end
# 5. Finalize trial with aggregated result
final_result = finalize_trial(session; method=:weighted_vote)
println("\n✓ Final prediction: $(final_result.prediction)")
println("Final confidence: $(round(final_result.confidence, digits=3))")
Chunk Size Selection
Choose chunk size based on your application:
# Fast updates (0.5 second chunks)
chunk_size = 125 # At 250 Hz sampling rate
# Pro: More responsive, frequent updates
# Con: Less data per prediction, potentially lower confidence
# Standard (1 second chunks)
chunk_size = 250
# Pro: Good balance of responsiveness and accuracy
# Con: None - recommended default
# Longer chunks (2 second chunks)
chunk_size = 500
# Pro: More data, higher confidence
# Con: Less responsive, slower feedback
Start with 1-second chunks (chunk_size = sampling_rate). Adjust based on your accuracy/latency requirements.
Aggregation Methods
Weighted Vote
Weight chunks by confidence (recommended):
# Aggregate with weighted voting
final_result = finalize_trial(session; method=:weighted_vote)
High-confidence chunks contribute more to the final decision. Best for most applications.
Majority Vote
Simple majority across chunks:
# Aggregate with majority voting
final_result = finalize_trial(session; method=:majority_vote)
Each chunk has equal weight. Use when all chunks have similar quality.
Latest Chunk
Use only the most recent chunk:
# Use only latest chunk
final_result = finalize_trial(session; method=:latest)
Fastest response, but ignores trial history. Use for ultra-responsive applications.
Real-time BCI Example
Motor Imagery Control
using NimbusSDK
# Setup
NimbusSDK.install_core("your-api-key")
model = load_model(RxLDAModel, "motor_imagery_4class_v1")
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4,
chunk_size = 250
)
session = init_streaming(model, metadata)
# Real-time processing loop
trial_count = 0
while is_bci_active()
trial_count += 1
println("\n=== Trial $trial_count ===")
# Collect trial chunks
chunk_count = 0
for chunk in collect_trial_chunks() # Your acquisition function
chunk_count += 1
# Process chunk
chunk_result = process_chunk(session, chunk)
# Real-time feedback
println("Chunk $chunk_count: $(chunk_result.prediction) (conf: $(round(chunk_result.confidence, digits=2)))")
# Early stopping if very confident
if chunk_result.confidence > 0.95 && chunk_count >= 2
println("✓ High confidence - early stop")
break
end
end
# Finalize trial
final_result = finalize_trial(session; method=:weighted_vote)
println("\n✓ Trial complete")
println(" Prediction: $(final_result.prediction)")
println(" Confidence: $(round(final_result.confidence, digits=3))")
# Execute command
execute_motor_command(final_result.prediction)
# Quality check
if should_reject_trial(final_result.confidence, 0.7)
println("⚠️ Low confidence - suggest recalibration")
end
end
P300 Speller
using NimbusSDK
# P300 detection with streaming
model = load_model(RxLDAModel, "p300_detection_v1")
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :p300,
feature_type = :erp,
n_features = 12,
n_classes = 2, # Target vs non-target
chunk_size = 200 # 0.8 seconds post-stimulus
)
# Process stimulus presentations
for letter_row in spelling_matrix
session = init_streaming(model, metadata)
for stimulus in letter_row
# Present stimulus
flash_stimulus(stimulus)
# Collect and process EEG response
chunk = collect_post_stimulus_chunk() # 0.8s after stimulus
result = process_chunk(session, chunk)
if result.prediction == 2 && result.confidence > 0.8
# P300 detected - this is the target
println("✓ Target detected: $stimulus")
break
end
end
end
Real-time Metrics
Track streaming performance:
using NimbusSDK
# Initialize performance tracker
tracker = OnlinePerformanceTracker(window_size=20)
# Process stream with tracking
for chunk in eeg_stream
start_time = time()
result = process_chunk(session, chunk)
latency_ms = (time() - start_time) * 1000
# Update tracker (if ground truth available)
if !isnothing(ground_truth)
final = finalize_trial(session)
metrics = update_and_report!(tracker, final.prediction, ground_truth, final.confidence)
println("Running accuracy: $(round(metrics.accuracy * 100, digits=1))%")
println("Chunk latency: $(round(latency_ms, digits=1)) ms")
end
end
Quality Monitoring
Monitor signal quality during streaming:
# Track confidence trends
confidence_history = Float64[]
for chunk in eeg_stream
result = process_chunk(session, chunk)
push!(confidence_history, result.confidence)
# Check for degrading quality
if length(confidence_history) >= 5
recent_conf = mean(confidence_history[end-4:end])
if recent_conf < 0.6
@warn "Signal quality degrading"
println("Recent confidence: $(round(recent_conf, digits=2))")
println("Recommendation: Check electrode contact or reduce movement")
end
end
end
Advanced Streaming Features
Adaptive Confidence Thresholds
# Adjust thresholds based on performance
base_threshold = 0.7
current_accuracy = tracker.accuracy
if current_accuracy > 0.85
# User performing well - can lower threshold for faster response
threshold = base_threshold * 0.9
elseif current_accuracy < 0.65
# User struggling - raise threshold for more reliable decisions
threshold = base_threshold * 1.2
else
threshold = base_threshold
end
# Apply threshold
if final_result.confidence > threshold
execute_command(final_result.prediction)
else
request_retry()
end
Multi-trial Context
# Track recent trial history for context
struct TrialHistory
predictions::Vector{Int}
confidences::Vector{Float64}
max_history::Int
end
function add_trial!(history::TrialHistory, pred::Int, conf::Float64)
push!(history.predictions, pred)
push!(history.confidences, conf)
# Keep only recent trials
if length(history.predictions) > history.max_history
popfirst!(history.predictions)
popfirst!(history.confidences)
end
end
# Use history to detect anomalies
function is_anomaly(history::TrialHistory, new_pred::Int)
if length(history.predictions) < 3
return false
end
# Check if prediction differs from recent trend
recent_mode = mode(history.predictions[end-2:end])
return new_pred != recent_mode
end
# Example usage
history = TrialHistory(Int[], Float64[], 10)
for trial in trials
final_result = finalize_trial(session)
if is_anomaly(history, final_result.prediction)
println("⚠️ Anomalous prediction - requesting confirmation")
show_confirmation_dialog(final_result.prediction)
end
add_trial!(history, final_result.prediction, final_result.confidence)
end
Best Practices
Session Management
# Always initialize new session for each trial
for trial_idx in 1:n_trials
# New session = fresh state
session = init_streaming(model, metadata)
# Process trial
for chunk in collect_trial()
process_chunk(session, chunk)
end
# Finalize
result = finalize_trial(session)
# Session automatically cleaned up
end
Error Handling
using NimbusSDK
try
session = init_streaming(model, metadata)
for chunk in eeg_stream
try
result = process_chunk(session, chunk)
handle_result(result)
catch e
@error "Chunk processing failed" exception=e
# Continue to next chunk
continue
end
end
final_result = finalize_trial(session)
catch e
@error "Streaming session failed" exception=e
# Reinitialize and retry
end
Memory Management
# Streaming uses bounded memory - no accumulation
# Each session has fixed memory footprint
# Good: Process long sessions safely
for hour in 1:8
for trial in collect_hour_trials(hour)
session = init_streaming(model, metadata)
# ... process trial
# Memory freed after finalize_trial
end
end
# Memory usage stays constant regardless of session length
Troubleshooting
Low Confidence Issues
# Diagnose low confidence
if mean(confidence_history) < 0.65
println("Troubleshooting low confidence:")
println("1. Check preprocessing quality")
println("2. Verify chunk_size is appropriate")
println("3. Ensure user is focused and relaxed")
println("4. Check for electrode issues")
println("5. Consider recalibration")
# Run diagnostics
diagnostic_data = collect_diagnostic_trial()
report = diagnose_preprocessing(diagnostic_data)
println("\nPreprocessing quality: $(round(report.quality_score * 100, digits=1))%")
end
Latency Issues
# Profile streaming latency
latencies = Float64[]
for chunk in eeg_stream[1:100] # Test 100 chunks
t_start = time()
result = process_chunk(session, chunk)
t_end = time()
push!(latencies, (t_end - t_start) * 1000) # Convert to ms
end
println("Latency statistics:")
println(" Mean: $(round(mean(latencies), digits=1)) ms")
println(" Median: $(round(median(latencies), digits=1)) ms")
println(" 95th percentile: $(round(quantile(latencies, 0.95), digits=1)) ms")
println(" Max: $(round(maximum(latencies), digits=1)) ms")
# First chunk is slower due to JIT compilation
println("\nExcluding first chunk (JIT warmup):")
println(" Mean: $(round(mean(latencies[2:end]), digits=1)) ms")
Next Steps