Skip to main content

Real-time Streaming API

Important: Streaming inference is performed locally by NimbusSDK.jl, not via API calls. This provides low-latency real-time processing without network overhead.

Overview

Streaming inference allows you to process EEG data in real-time, chunk-by-chunk, as it arrives from your BCI hardware. This is essential for:
  • Real-time BCI applications: Wheelchair control, gaming, assistive devices
  • Online feedback systems: Neurofeedback, training applications
  • Low-latency requirements: Systems requiring <100ms response times
  • Continuous monitoring: Long-duration BCI sessions

How Streaming Works

Unlike batch inference which processes complete trials, streaming inference processes data incrementally:
EEG Hardware → Preprocessing → Feature Chunks → NimbusSDK → Predictions
                 (MNE/LSL)        (CSP/etc)      (Local)      (Real-time)

Architecture

  1. Hardware Acquisition: EEG amplifier streams data (e.g., via LSL, BrainFlow)
  2. Preprocessing: Apply filters and feature extraction in chunks
  3. SDK Streaming: Process features chunk-by-chunk with process_chunk()
  4. Aggregation: Combine chunk predictions with finalize_trial()
Key Benefit: All processing happens locally - no API calls during inference.

Quick Start

Initialize Streaming Session

using NimbusSDK

# 1. Authenticate (one-time, can cache offline)
NimbusSDK.install_core("nbci_live_your_key")

# 2. Load model
model = load_model(RxLDAModel, "motor_imagery_4class_v1")

# 3. Configure streaming metadata
metadata = BCIMetadata(
    sampling_rate = 250.0,
    paradigm = :motor_imagery,
    feature_type = :csp,
    n_features = 16,
    n_classes = 4,
    chunk_size = 250  # 1 second chunks at 250 Hz
)

# 4. Initialize streaming session
session = init_streaming(model, metadata)

Process Chunks in Real-Time

# Process chunks as they arrive
for chunk in eeg_feature_stream
    # chunk: (n_features × chunk_size) array
    result = process_chunk(session, chunk)
    
    println("Chunk prediction: $(result.prediction)")
    println("Confidence: $(round(result.confidence, digits=3))")
    println("Posterior: $(result.posterior)")
end

Finalize Trial

After processing all chunks for a trial, aggregate the results:
# Aggregate chunk predictions
final_result = finalize_trial(session; method=:weighted_vote)

println("Final prediction: $(final_result.prediction)")
println("Final confidence: $(round(final_result.confidence, digits=3))")

# Check if trial quality is acceptable
if should_reject_trial(final_result.confidence, 0.7)
    println("⚠️  Low confidence - trial rejected")
else
    println("✓ High confidence - trial accepted")
end

Complete Example

Motor Imagery Streaming

using NimbusSDK

# Setup
NimbusSDK.install_core("nbci_live_your_key")
model = load_model(RxLDAModel, "motor_imagery_4class_v1")

metadata = BCIMetadata(
    sampling_rate = 250.0,
    paradigm = :motor_imagery,
    feature_type = :csp,
    n_features = 16,
    n_classes = 4,
    chunk_size = 250  # 1 second chunks
)

session = init_streaming(model, metadata)

# Real-time trial processing
function process_trial(csp_chunks)
    # Process each chunk as it arrives
    for chunk in csp_chunks
        chunk_result = process_chunk(session, chunk)
        
        # Real-time feedback
        @info "Chunk $(length(session.chunk_history))" prediction=chunk_result.prediction confidence=chunk_result.confidence
        
        # Optional: Early stopping
        if chunk_result.confidence > 0.95
            @info "High confidence reached early - consider stopping"
        end
    end
    
    # Finalize with weighted voting
    final = finalize_trial(session; method=:weighted_vote)
    
    return final
end

# Simulate streaming from BCI system
trial_chunks = [
    randn(16, 250),  # Chunk 1: 0-1s
    randn(16, 250),  # Chunk 2: 1-2s
    randn(16, 250),  # Chunk 3: 2-3s
    randn(16, 250)   # Chunk 4: 3-4s
]

final_result = process_trial(trial_chunks)

println("\nFinal Result:")
println("  Prediction: $(final_result.prediction)")
println("  Confidence: $(round(final_result.confidence, digits=3))")
println("  Quality: $(should_reject_trial(final_result.confidence, 0.7) ? "Rejected" : "Accepted")")

Streaming Functions

init_streaming()

Initialize a streaming session for a model.
init_streaming(model, metadata::BCIMetadata) -> StreamingSession
Parameters:
  • model: Loaded RxLDA or RxGMM model
  • metadata: BCIMetadata with chunk_size specified
Returns: StreamingSession object

process_chunk()

Process a single chunk of features.
process_chunk(session::StreamingSession, chunk::Matrix{Float64}; iterations::Int=10) -> StreamingResult
Parameters:
  • session: Active streaming session
  • chunk: Feature matrix (n_features × chunk_size)
  • iterations: Number of RxInfer iterations (default: 10)
Returns:
struct StreamingResult
    prediction::Int           # Predicted class
    confidence::Float64       # Confidence (max posterior)
    posterior::Vector{Float64} # Full posterior distribution
end

finalize_trial()

Aggregate chunk predictions into final trial result.
finalize_trial(session::StreamingSession; method::Symbol=:weighted_vote) -> StreamingResult
Aggregation Methods:
  • :weighted_vote: Weight predictions by confidence (recommended)
  • :majority_vote: Simple majority vote across chunks
  • :last_chunk: Use only the last chunk’s prediction
  • :max_confidence: Use chunk with highest confidence
Parameters:
  • session: Streaming session with processed chunks
  • method: Aggregation strategy
Returns: Final aggregated StreamingResult

Chunk Size Selection

Choose chunk size based on your paradigm and latency requirements:
ParadigmRecommended Chunk SizeRationale
Motor Imagery250-500 samples (1-2s at 250 Hz)Balance between latency and accuracy
P300100-200 samples (0.4-0.8s at 250 Hz)Short epochs for rapid serial visual presentation
SSVEP500-1000 samples (2-4s at 250 Hz)Longer windows for frequency analysis
Trade-offs:
  • Smaller chunks: Lower latency, more frequent updates, lower per-chunk accuracy
  • Larger chunks: Higher latency, fewer updates, higher per-chunk accuracy

Aggregation Strategies

Weight each chunk’s prediction by its confidence:
final = finalize_trial(session; method=:weighted_vote)
Best for: Most applications, especially with variable confidence across chunks

Majority Vote

Simple democratic vote across chunks:
final = finalize_trial(session; method=:majority_vote)
Best for: Consistent predictions across chunks

Max Confidence

Use prediction from most confident chunk:
final = finalize_trial(session; method=:max_confidence)
Best for: When one chunk is clearly more reliable

Last Chunk

Use only the most recent chunk:
final = finalize_trial(session; method=:last_chunk)
Best for: Real-time systems where latest information is most relevant

Quality Assessment

Confidence Thresholds

Set confidence thresholds to reject low-quality trials:
confidence_threshold = 0.7

if should_reject_trial(final_result.confidence, confidence_threshold)
    @warn "Trial rejected - confidence too low" confidence=final_result.confidence
    # Ask user to repeat trial
else
    @info "Trial accepted" confidence=final_result.confidence
    # Use prediction
end

Per-Chunk Quality Monitoring

Monitor quality during streaming:
for chunk in chunks
    result = process_chunk(session, chunk)
    
    if result.confidence < 0.5
        @warn "Low confidence chunk detected" chunk_idx=length(session.chunk_history)
    end
end

Integration with Hardware

Lab Streaming Layer (LSL)

using NimbusSDK
# Assuming you have LSL.jl or PyCall with pylsl

function stream_from_lsl(stream_name, model, session)
    # Connect to LSL stream (pseudocode)
    inlet = LSL.resolve_stream("name", stream_name)
    
    buffer = []
    chunk_samples = session.metadata.chunk_size
    
    while true
        # Pull sample from LSL
        sample, timestamp = inlet.pull_sample()
        
        # Apply preprocessing (filter, CSP, etc.)
        features = preprocess_sample(sample)
        
        push!(buffer, features)
        
        # Process when chunk is ready
        if length(buffer) >= chunk_samples
            chunk = hcat(buffer[1:chunk_samples]...)
            result = process_chunk(session, chunk)
            
            println("Real-time prediction: $(result.prediction)")
            
            # Slide window
            buffer = buffer[(chunk_samples+1):end]
        end
    end
end

BrainFlow

using NimbusSDK
# Assuming BrainFlow.jl or PyCall with brainflow

function stream_from_brainflow(board, model, session)
    while true
        # Get data from BrainFlow board
        data = BrainFlow.get_current_board_data(board)
        
        # Apply preprocessing
        filtered = apply_filters(data)
        csp_features = apply_csp(filtered)
        
        # Process chunk
        result = process_chunk(session, csp_features)
        
        # Real-time feedback
        display_prediction(result.prediction, result.confidence)
        
        sleep(1.0)  # 1-second chunks
    end
end

Performance Optimization

Minimize Latency

# Use fewer iterations for faster processing
result = process_chunk(session, chunk; iterations=5)  # ~10-20ms

Precompute Models

# Load and compile model before streaming starts
model = load_model(RxLDAModel, "motor_imagery_4class_v1")
session = init_streaming(model, metadata)

# Warm-up: process dummy chunk to compile functions
dummy_chunk = randn(16, 250)
process_chunk(session, dummy_chunk)

# Now ready for real-time processing

Parallel Processing

using Base.Threads

# Process multiple channels in parallel
@threads for channel_session in channel_sessions
    result = process_chunk(channel_session, chunks[Threads.threadid()])
end

Typical Latencies

OperationLatencyNotes
Chunk processing10-30msRxInfer inference (10 iterations)
Feature extraction5-20msCSP transform, bandpower
Trial finalization<5msAggregate chunk results
Total (end-to-end)<100msIncluding preprocessing
All latencies are for local processing. No network calls are made during streaming.

Comparison: Streaming vs Batch

AspectStreamingBatch
LatencyLow (<100ms per chunk)Higher (full trial)
Use CaseReal-time BCIOffline analysis
FeedbackContinuousAfter trial completion
MemoryLow (per-chunk)Higher (full trials)
AccuracyAggregated over chunksSingle prediction

Next Steps

Support

For streaming and real-time processing questions: