Real-time Streaming API
Important: Streaming inference is performed locally by NimbusSDK.jl, not via API calls. This provides low-latency real-time processing without network overhead.
Overview
Streaming inference allows you to process EEG data in real-time, chunk-by-chunk, as it arrives from your BCI hardware. This is essential for:
- Real-time BCI applications: Wheelchair control, gaming, assistive devices
- Online feedback systems: Neurofeedback, training applications
- Low-latency requirements: Systems requiring <100ms response times
- Continuous monitoring: Long-duration BCI sessions
How Streaming Works
Unlike batch inference which processes complete trials, streaming inference processes data incrementally:
EEG Hardware → Preprocessing → Feature Chunks → NimbusSDK → Predictions
(MNE/LSL) (CSP/etc) (Local) (Real-time)
Architecture
- Hardware Acquisition: EEG amplifier streams data (e.g., via LSL, BrainFlow)
- Preprocessing: Apply filters and feature extraction in chunks
- SDK Streaming: Process features chunk-by-chunk with
process_chunk()
- Aggregation: Combine chunk predictions with
finalize_trial()
Key Benefit: All processing happens locally - no API calls during inference.
Quick Start
Initialize Streaming Session
using NimbusSDK
# 1. Authenticate (one-time, can cache offline)
NimbusSDK.install_core("nbci_live_your_key")
# 2. Load model
model = load_model(RxLDAModel, "motor_imagery_4class_v1")
# 3. Configure streaming metadata
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4,
chunk_size = 250 # 1 second chunks at 250 Hz
)
# 4. Initialize streaming session
session = init_streaming(model, metadata)
Process Chunks in Real-Time
# Process chunks as they arrive
for chunk in eeg_feature_stream
# chunk: (n_features × chunk_size) array
result = process_chunk(session, chunk)
println("Chunk prediction: $(result.prediction)")
println("Confidence: $(round(result.confidence, digits=3))")
println("Posterior: $(result.posterior)")
end
Finalize Trial
After processing all chunks for a trial, aggregate the results:
# Aggregate chunk predictions
final_result = finalize_trial(session; method=:weighted_vote)
println("Final prediction: $(final_result.prediction)")
println("Final confidence: $(round(final_result.confidence, digits=3))")
# Check if trial quality is acceptable
if should_reject_trial(final_result.confidence, 0.7)
println("⚠️ Low confidence - trial rejected")
else
println("✓ High confidence - trial accepted")
end
Complete Example
Motor Imagery Streaming
using NimbusSDK
# Setup
NimbusSDK.install_core("nbci_live_your_key")
model = load_model(RxLDAModel, "motor_imagery_4class_v1")
metadata = BCIMetadata(
sampling_rate = 250.0,
paradigm = :motor_imagery,
feature_type = :csp,
n_features = 16,
n_classes = 4,
chunk_size = 250 # 1 second chunks
)
session = init_streaming(model, metadata)
# Real-time trial processing
function process_trial(csp_chunks)
# Process each chunk as it arrives
for chunk in csp_chunks
chunk_result = process_chunk(session, chunk)
# Real-time feedback
@info "Chunk $(length(session.chunk_history))" prediction=chunk_result.prediction confidence=chunk_result.confidence
# Optional: Early stopping
if chunk_result.confidence > 0.95
@info "High confidence reached early - consider stopping"
end
end
# Finalize with weighted voting
final = finalize_trial(session; method=:weighted_vote)
return final
end
# Simulate streaming from BCI system
trial_chunks = [
randn(16, 250), # Chunk 1: 0-1s
randn(16, 250), # Chunk 2: 1-2s
randn(16, 250), # Chunk 3: 2-3s
randn(16, 250) # Chunk 4: 3-4s
]
final_result = process_trial(trial_chunks)
println("\nFinal Result:")
println(" Prediction: $(final_result.prediction)")
println(" Confidence: $(round(final_result.confidence, digits=3))")
println(" Quality: $(should_reject_trial(final_result.confidence, 0.7) ? "Rejected" : "Accepted")")
Streaming Functions
init_streaming()
Initialize a streaming session for a model.
init_streaming(model, metadata::BCIMetadata) -> StreamingSession
Parameters:
model: Loaded RxLDA or RxGMM model
metadata: BCIMetadata with chunk_size specified
Returns: StreamingSession object
process_chunk()
Process a single chunk of features.
process_chunk(session::StreamingSession, chunk::Matrix{Float64}; iterations::Int=10) -> StreamingResult
Parameters:
session: Active streaming session
chunk: Feature matrix (n_features × chunk_size)
iterations: Number of RxInfer iterations (default: 10)
Returns:
struct StreamingResult
prediction::Int # Predicted class
confidence::Float64 # Confidence (max posterior)
posterior::Vector{Float64} # Full posterior distribution
end
finalize_trial()
Aggregate chunk predictions into final trial result.
finalize_trial(session::StreamingSession; method::Symbol=:weighted_vote) -> StreamingResult
Aggregation Methods:
:weighted_vote: Weight predictions by confidence (recommended)
:majority_vote: Simple majority vote across chunks
:last_chunk: Use only the last chunk’s prediction
:max_confidence: Use chunk with highest confidence
Parameters:
session: Streaming session with processed chunks
method: Aggregation strategy
Returns: Final aggregated StreamingResult
Chunk Size Selection
Choose chunk size based on your paradigm and latency requirements:
| Paradigm | Recommended Chunk Size | Rationale |
| Motor Imagery | 250-500 samples (1-2s at 250 Hz) | Balance between latency and accuracy |
| P300 | 100-200 samples (0.4-0.8s at 250 Hz) | Short epochs for rapid serial visual presentation |
| SSVEP | 500-1000 samples (2-4s at 250 Hz) | Longer windows for frequency analysis |
Trade-offs:
- Smaller chunks: Lower latency, more frequent updates, lower per-chunk accuracy
- Larger chunks: Higher latency, fewer updates, higher per-chunk accuracy
Aggregation Strategies
Weighted Vote (Recommended)
Weight each chunk’s prediction by its confidence:
final = finalize_trial(session; method=:weighted_vote)
Best for: Most applications, especially with variable confidence across chunks
Majority Vote
Simple democratic vote across chunks:
final = finalize_trial(session; method=:majority_vote)
Best for: Consistent predictions across chunks
Max Confidence
Use prediction from most confident chunk:
final = finalize_trial(session; method=:max_confidence)
Best for: When one chunk is clearly more reliable
Last Chunk
Use only the most recent chunk:
final = finalize_trial(session; method=:last_chunk)
Best for: Real-time systems where latest information is most relevant
Quality Assessment
Confidence Thresholds
Set confidence thresholds to reject low-quality trials:
confidence_threshold = 0.7
if should_reject_trial(final_result.confidence, confidence_threshold)
@warn "Trial rejected - confidence too low" confidence=final_result.confidence
# Ask user to repeat trial
else
@info "Trial accepted" confidence=final_result.confidence
# Use prediction
end
Per-Chunk Quality Monitoring
Monitor quality during streaming:
for chunk in chunks
result = process_chunk(session, chunk)
if result.confidence < 0.5
@warn "Low confidence chunk detected" chunk_idx=length(session.chunk_history)
end
end
Integration with Hardware
Lab Streaming Layer (LSL)
using NimbusSDK
# Assuming you have LSL.jl or PyCall with pylsl
function stream_from_lsl(stream_name, model, session)
# Connect to LSL stream (pseudocode)
inlet = LSL.resolve_stream("name", stream_name)
buffer = []
chunk_samples = session.metadata.chunk_size
while true
# Pull sample from LSL
sample, timestamp = inlet.pull_sample()
# Apply preprocessing (filter, CSP, etc.)
features = preprocess_sample(sample)
push!(buffer, features)
# Process when chunk is ready
if length(buffer) >= chunk_samples
chunk = hcat(buffer[1:chunk_samples]...)
result = process_chunk(session, chunk)
println("Real-time prediction: $(result.prediction)")
# Slide window
buffer = buffer[(chunk_samples+1):end]
end
end
end
BrainFlow
using NimbusSDK
# Assuming BrainFlow.jl or PyCall with brainflow
function stream_from_brainflow(board, model, session)
while true
# Get data from BrainFlow board
data = BrainFlow.get_current_board_data(board)
# Apply preprocessing
filtered = apply_filters(data)
csp_features = apply_csp(filtered)
# Process chunk
result = process_chunk(session, csp_features)
# Real-time feedback
display_prediction(result.prediction, result.confidence)
sleep(1.0) # 1-second chunks
end
end
Minimize Latency
# Use fewer iterations for faster processing
result = process_chunk(session, chunk; iterations=5) # ~10-20ms
Precompute Models
# Load and compile model before streaming starts
model = load_model(RxLDAModel, "motor_imagery_4class_v1")
session = init_streaming(model, metadata)
# Warm-up: process dummy chunk to compile functions
dummy_chunk = randn(16, 250)
process_chunk(session, dummy_chunk)
# Now ready for real-time processing
Parallel Processing
using Base.Threads
# Process multiple channels in parallel
@threads for channel_session in channel_sessions
result = process_chunk(channel_session, chunks[Threads.threadid()])
end
Typical Latencies
| Operation | Latency | Notes |
| Chunk processing | 10-30ms | RxInfer inference (10 iterations) |
| Feature extraction | 5-20ms | CSP transform, bandpower |
| Trial finalization | <5ms | Aggregate chunk results |
| Total (end-to-end) | <100ms | Including preprocessing |
All latencies are for local processing. No network calls are made during streaming.
Comparison: Streaming vs Batch
| Aspect | Streaming | Batch |
| Latency | Low (<100ms per chunk) | Higher (full trial) |
| Use Case | Real-time BCI | Offline analysis |
| Feedback | Continuous | After trial completion |
| Memory | Low (per-chunk) | Higher (full trials) |
| Accuracy | Aggregated over chunks | Single prediction |
Next Steps
Support
For streaming and real-time processing questions: