Skip to main content

Streaming Inference for BCI

Streaming inference is essential for real-time BCI applications. NimbusSDK’s streaming mode processes neural data chunk-by-chunk as it arrives, enabling responsive brain-computer interfaces with sub-25ms latency per chunk.

Streaming vs Batch Processing

Batch Processing

Process complete trials offline:
# Batch: Process all trials at once
results = predict_batch(model, all_trials_data)
Best for:
  • Offline analysis
  • Model training/validation
  • Research studies
  • Multiple trials available upfront

Streaming Processing

Process chunks as they arrive in real-time:
# Streaming: Process chunks incrementally
session = init_streaming(model, metadata)
for chunk in eeg_stream
    chunk_result = process_chunk(session, chunk)
    # Immediate response to user
end
final_result = finalize_trial(session)
Best for:
  • Real-time BCI control
  • Online feedback
  • Interactive applications
  • Gaming and communication

Setting Up Streaming Inference

Basic Streaming Setup

using NimbusSDK

# 1. Authenticate and load model
NimbusSDK.install_core("your-api-key")
model = load_model(RxLDAModel, "motor_imagery_4class_v1")

# 2. Configure streaming metadata
metadata = BCIMetadata(
    sampling_rate = 250.0,
    paradigm = :motor_imagery,
    feature_type = :csp,
    n_features = 16,
    n_classes = 4,
    chunk_size = 250  # 1 second chunks at 250 Hz
)

# 3. Initialize streaming session
session = init_streaming(model, metadata)
println("✓ Streaming session initialized")

# 4. Process chunks as they arrive
for chunk in your_eeg_stream()
    # chunk shape: (n_features × chunk_size)
    chunk_result = process_chunk(session, chunk)
    
    println("Chunk prediction: $(chunk_result.prediction)")
    println("Confidence: $(round(chunk_result.confidence, digits=3))")
    
    # Provide immediate feedback
    if chunk_result.confidence > 0.75
        provide_feedback(chunk_result.prediction)
    end
end

# 5. Finalize trial with aggregated result
final_result = finalize_trial(session; method=:weighted_vote)
println("\n✓ Final prediction: $(final_result.prediction)")
println("Final confidence: $(round(final_result.confidence, digits=3))")

Chunk Size Selection

Choose chunk size based on your application:
# Fast updates (0.5 second chunks)
chunk_size = 125  # At 250 Hz sampling rate
# Pro: More responsive, frequent updates
# Con: Less data per prediction, potentially lower confidence

# Standard (1 second chunks)
chunk_size = 250  
# Pro: Good balance of responsiveness and accuracy
# Con: None - recommended default

# Longer chunks (2 second chunks)
chunk_size = 500
# Pro: More data, higher confidence
# Con: Less responsive, slower feedback
Start with 1-second chunks (chunk_size = sampling_rate). Adjust based on your accuracy/latency requirements.

Aggregation Methods

Weighted Vote

Weight chunks by confidence (recommended):
# Aggregate with weighted voting
final_result = finalize_trial(session; method=:weighted_vote)
High-confidence chunks contribute more to the final decision. Best for most applications.

Majority Vote

Simple majority across chunks:
# Aggregate with majority voting
final_result = finalize_trial(session; method=:majority_vote)
Each chunk has equal weight. Use when all chunks have similar quality.

Latest Chunk

Use only the most recent chunk:
# Use only latest chunk
final_result = finalize_trial(session; method=:latest)
Fastest response, but ignores trial history. Use for ultra-responsive applications.

Real-time BCI Example

Motor Imagery Control

using NimbusSDK

# Setup
NimbusSDK.install_core("your-api-key")
model = load_model(RxLDAModel, "motor_imagery_4class_v1")

metadata = BCIMetadata(
    sampling_rate = 250.0,
    paradigm = :motor_imagery,
    feature_type = :csp,
    n_features = 16,
    n_classes = 4,
    chunk_size = 250
)

session = init_streaming(model, metadata)

# Real-time processing loop
trial_count = 0
while is_bci_active()
    trial_count += 1
    println("\n=== Trial $trial_count ===")
    
    # Collect trial chunks
    chunk_count = 0
    for chunk in collect_trial_chunks()  # Your acquisition function
        chunk_count += 1
        
        # Process chunk
        chunk_result = process_chunk(session, chunk)
        
        # Real-time feedback
        println("Chunk $chunk_count: $(chunk_result.prediction) (conf: $(round(chunk_result.confidence, digits=2)))")
        
        # Early stopping if very confident
        if chunk_result.confidence > 0.95 && chunk_count >= 2
            println("✓ High confidence - early stop")
            break
        end
    end
    
    # Finalize trial
    final_result = finalize_trial(session; method=:weighted_vote)
    
    println("\n✓ Trial complete")
    println("  Prediction: $(final_result.prediction)")
    println("  Confidence: $(round(final_result.confidence, digits=3))")
    
    # Execute command
    execute_motor_command(final_result.prediction)
    
    # Quality check
    if should_reject_trial(final_result.confidence, 0.7)
        println("⚠️  Low confidence - suggest recalibration")
    end
end

P300 Speller

using NimbusSDK

# P300 detection with streaming
model = load_model(RxLDAModel, "p300_detection_v1")

metadata = BCIMetadata(
    sampling_rate = 250.0,
    paradigm = :p300,
    feature_type = :erp,
    n_features = 12,
    n_classes = 2,  # Target vs non-target
    chunk_size = 200  # 0.8 seconds post-stimulus
)

# Process stimulus presentations
for letter_row in spelling_matrix
    session = init_streaming(model, metadata)
    
    for stimulus in letter_row
        # Present stimulus
        flash_stimulus(stimulus)
        
        # Collect and process EEG response
        chunk = collect_post_stimulus_chunk()  # 0.8s after stimulus
        result = process_chunk(session, chunk)
        
        if result.prediction == 2 && result.confidence > 0.8
            # P300 detected - this is the target
            println("✓ Target detected: $stimulus")
            break
        end
    end
end

Performance Monitoring

Real-time Metrics

Track streaming performance:
using NimbusSDK

# Initialize performance tracker
tracker = OnlinePerformanceTracker(window_size=20)

# Process stream with tracking
for chunk in eeg_stream
    start_time = time()
    
    result = process_chunk(session, chunk)
    
    latency_ms = (time() - start_time) * 1000
    
    # Update tracker (if ground truth available)
    if !isnothing(ground_truth)
        final = finalize_trial(session)
        metrics = update_and_report!(tracker, final.prediction, ground_truth, final.confidence)
        
        println("Running accuracy: $(round(metrics.accuracy * 100, digits=1))%")
        println("Chunk latency: $(round(latency_ms, digits=1)) ms")
    end
end

Quality Monitoring

Monitor signal quality during streaming:
# Track confidence trends
confidence_history = Float64[]

for chunk in eeg_stream
    result = process_chunk(session, chunk)
    push!(confidence_history, result.confidence)
    
    # Check for degrading quality
    if length(confidence_history) >= 5
        recent_conf = mean(confidence_history[end-4:end])
        if recent_conf < 0.6
            @warn "Signal quality degrading"
            println("Recent confidence: $(round(recent_conf, digits=2))")
            println("Recommendation: Check electrode contact or reduce movement")
        end
    end
end

Advanced Streaming Features

Adaptive Confidence Thresholds

# Adjust thresholds based on performance
base_threshold = 0.7
current_accuracy = tracker.accuracy

if current_accuracy > 0.85
    # User performing well - can lower threshold for faster response
    threshold = base_threshold * 0.9
elseif current_accuracy < 0.65
    # User struggling - raise threshold for more reliable decisions
    threshold = base_threshold * 1.2
else
    threshold = base_threshold
end

# Apply threshold
if final_result.confidence > threshold
    execute_command(final_result.prediction)
else
    request_retry()
end

Multi-trial Context

# Track recent trial history for context
struct TrialHistory
    predictions::Vector{Int}
    confidences::Vector{Float64}
    max_history::Int
end

function add_trial!(history::TrialHistory, pred::Int, conf::Float64)
    push!(history.predictions, pred)
    push!(history.confidences, conf)
    
    # Keep only recent trials
    if length(history.predictions) > history.max_history
        popfirst!(history.predictions)
        popfirst!(history.confidences)
    end
end

# Use history to detect anomalies
function is_anomaly(history::TrialHistory, new_pred::Int)
    if length(history.predictions) < 3
        return false
    end
    
    # Check if prediction differs from recent trend
    recent_mode = mode(history.predictions[end-2:end])
    return new_pred != recent_mode
end

# Example usage
history = TrialHistory(Int[], Float64[], 10)

for trial in trials
    final_result = finalize_trial(session)
    
    if is_anomaly(history, final_result.prediction)
        println("⚠️  Anomalous prediction - requesting confirmation")
        show_confirmation_dialog(final_result.prediction)
    end
    
    add_trial!(history, final_result.prediction, final_result.confidence)
end

Best Practices

Session Management

# Always initialize new session for each trial
for trial_idx in 1:n_trials
    # New session = fresh state
    session = init_streaming(model, metadata)
    
    # Process trial
    for chunk in collect_trial()
        process_chunk(session, chunk)
    end
    
    # Finalize
    result = finalize_trial(session)
    
    # Session automatically cleaned up
end

Error Handling

using NimbusSDK

try
    session = init_streaming(model, metadata)
    
    for chunk in eeg_stream
        try
            result = process_chunk(session, chunk)
            handle_result(result)
        catch e
            @error "Chunk processing failed" exception=e
            # Continue to next chunk
            continue
        end
    end
    
    final_result = finalize_trial(session)
    
catch e
    @error "Streaming session failed" exception=e
    # Reinitialize and retry
end

Memory Management

# Streaming uses bounded memory - no accumulation
# Each session has fixed memory footprint

# Good: Process long sessions safely
for hour in 1:8
    for trial in collect_hour_trials(hour)
        session = init_streaming(model, metadata)
        # ... process trial
        # Memory freed after finalize_trial
    end
end

# Memory usage stays constant regardless of session length

Troubleshooting

Low Confidence Issues

# Diagnose low confidence
if mean(confidence_history) < 0.65
    println("Troubleshooting low confidence:")
    println("1. Check preprocessing quality")
    println("2. Verify chunk_size is appropriate")
    println("3. Ensure user is focused and relaxed")
    println("4. Check for electrode issues")
    println("5. Consider recalibration")
    
    # Run diagnostics
    diagnostic_data = collect_diagnostic_trial()
    report = diagnose_preprocessing(diagnostic_data)
    println("\nPreprocessing quality: $(round(report.quality_score * 100, digits=1))%")
end

Latency Issues

# Profile streaming latency
latencies = Float64[]

for chunk in eeg_stream[1:100]  # Test 100 chunks
    t_start = time()
    result = process_chunk(session, chunk)
    t_end = time()
    
    push!(latencies, (t_end - t_start) * 1000)  # Convert to ms
end

println("Latency statistics:")
println("  Mean: $(round(mean(latencies), digits=1)) ms")
println("  Median: $(round(median(latencies), digits=1)) ms")
println("  95th percentile: $(round(quantile(latencies, 0.95), digits=1)) ms")
println("  Max: $(round(maximum(latencies), digits=1)) ms")

# First chunk is slower due to JIT compilation
println("\nExcluding first chunk (JIT warmup):")
println("  Mean: $(round(mean(latencies[2:end]), digits=1)) ms")

Next Steps


Next: Configure your real-time BCI setup for optimal streaming performance.