Streaming inference allows you to process EEG data as it arrives, making predictions on partial trials before the full trial is complete. This is essential for real-time BCI applications where low latency is critical.Supported models:NimbusLDA, NimbusQDA, NimbusSoftmax, and NimbusSTS.Key Features:
Chunk-by-chunk processing: Process data as it streams in
final = session.finalize_trial(method="weighted_vote")print(f"\nFinal Prediction: class {final.prediction}")print(f"Final Confidence: {final.confidence:.2%}")print(f"Chunk predictions: {final.chunk_predictions}")
from nimbus_bci import NimbusLDA, StreamingSessionfrom nimbus_bci.data import BCIMetadataimport numpy as np# 1. Train modelclf = NimbusLDA()clf.fit(X_train, y_train)# 2. Setup streamingmetadata = BCIMetadata( sampling_rate=250.0, paradigm="motor_imagery", feature_type="csp", n_features=16, n_classes=4, chunk_size=125, # 500ms chunks temporal_aggregation="logvar")session = StreamingSession(clf.model_, metadata)# 3. Process multiple trialsn_trials = 10n_chunks_per_trial = 4for trial_idx in range(n_trials): print(f"\n=== Trial {trial_idx + 1} ===") # Process chunks for this trial for chunk_idx in range(n_chunks_per_trial): # Get chunk from EEG stream (shape: n_features x chunk_size) chunk = get_next_chunk() # Your streaming function result = session.process_chunk(chunk) print(f"Chunk {chunk_idx + 1}: class {result.prediction} " f"({result.confidence:.2%})") # Finalize trial final = session.finalize_trial(method="weighted_vote") print(f"Final: class {final.prediction} ({final.confidence:.2%})") # Check if we should reject this trial if final.confidence < 0.7: print("⚠️ Low confidence - trial rejected") # Reset for next trial session.reset()
from nimbus_bci import should_reject_trial# Process trialfor chunk in trial_chunks: result = session.process_chunk(chunk)final = session.finalize_trial()# Check qualityif should_reject_trial(final.confidence, threshold=0.7): print("Trial rejected - low confidence") # Ask user to repeat or skipelse: # Use prediction execute_command(final.prediction)
# Instead of processing one chunk at a timefor chunk in chunks: result = session.process_chunk(chunk)# Process in batches (if your application allows slight delay)batch_size = 4for i in range(0, len(chunks), batch_size): batch = chunks[i:i+batch_size] results = [session.process_chunk(c) for c in batch]
from nimbus_bci import StreamingSessionsession = StreamingSession(clf.model_, metadata)try: while streaming: chunk = acquire_eeg_chunk() # Validate chunk shape if chunk.shape != (n_features, chunk_size): print(f"Invalid chunk shape: {chunk.shape}") continue # Check for artifacts if has_artifacts(chunk): print("Artifact detected - skipping chunk") continue result = session.process_chunk(chunk) # Check for NaN if np.isnan(result.confidence): print("Invalid result - resetting session") session.reset() continue # Process result handle_result(result)except KeyboardInterrupt: print("Streaming stopped by user")finally: session.reset() cleanup()
Unlike static models, NimbusSTS maintains a latent state that evolves over time. The StreamingSessionSTS class automatically handles state propagation between chunks.
Combine streaming inference with online adaptation:
from nimbus_bci import NimbusSTSfrom nimbus_bci.inference import StreamingSessionSTSfrom nimbus_bci.data import BCIMetadata# Train initial modelclf = NimbusSTS(transition_cov=0.05, learning_rate=0.1)clf.fit(X_calibration, y_calibration)# Setup streamingmetadata = BCIMetadata( sampling_rate=250.0, paradigm="motor_imagery", feature_type="csp", n_features=16, n_classes=4, chunk_size=125, temporal_aggregation="logvar")session = StreamingSessionSTS(clf, metadata)# Online session with delayed feedbackfor trial_data in online_trials: # Process chunks for this trial for chunk in trial_data.chunks: result = session.process_chunk(chunk) print(f"Chunk prediction: {result.prediction}") # Get final prediction final = session.finalize_trial() # Execute BCI command execute_command(final.prediction) # Wait for user feedback true_label = wait_for_feedback() # Update model with feedback (adapts to user) # Aggregate chunks back to trial level trial_features = aggregate_chunks(trial_data.chunks) clf.partial_fit(trial_features.reshape(1, -1), [true_label]) # Reset session for next trial session.reset()
from nimbus_bci import NimbusSTSfrom nimbus_bci.inference import StreamingSessionSTSimport matplotlib.pyplot as pltclf = NimbusSTS(transition_cov=0.05)clf.fit(X_train, y_train)session = StreamingSessionSTS(clf, metadata)# Track state evolutionstate_history = []uncertainty_history = []for trial_idx in range(n_trials): for chunk in trial_chunks[trial_idx]: result = session.process_chunk(chunk) # Get current state z_mean, z_cov = clf.get_latent_state() state_history.append(z_mean.copy()) uncertainty_history.append(np.trace(z_cov)) session.finalize_trial() session.reset()# Visualize state evolutionstate_history = np.array(state_history)plt.figure(figsize=(12, 4))plt.subplot(1, 2, 1)plt.plot(state_history)plt.xlabel('Chunk')plt.ylabel('Latent State')plt.title('State Evolution Over Time')plt.grid(True)plt.subplot(1, 2, 2)plt.plot(uncertainty_history)plt.xlabel('Chunk')plt.ylabel('Uncertainty (trace of cov)')plt.title('State Uncertainty')plt.grid(True)plt.tight_layout()plt.show()
from nimbus_bci import NimbusSTSfrom nimbus_bci.inference import StreamingSessionSTSfrom nimbus_bci.data import BCIMetadataimport numpy as np# Initial calibrationclf = NimbusSTS( transition_cov=0.05, # Moderate drift learning_rate=0.1, num_steps=100, verbose=False)clf.fit(X_calibration, y_calibration)# Setup streamingmetadata = BCIMetadata( sampling_rate=250.0, paradigm="motor_imagery", feature_type="csp", n_features=16, n_classes=4, chunk_size=125, temporal_aggregation="logvar")session = StreamingSessionSTS(clf, metadata)# Long session (e.g., 60 minutes)accuracy_window = []window_size = 20for trial_idx, trial in enumerate(long_session): # Process trial chunks for chunk in trial.chunks: result = session.process_chunk(chunk) # Get final prediction final = session.finalize_trial() prediction = final.prediction # Execute command bci_command = execute_action(prediction) # Get feedback after action completes true_label = get_feedback(bci_command) # Track accuracy correct = (prediction == true_label) accuracy_window.append(correct) if len(accuracy_window) > window_size: accuracy_window.pop(0) recent_acc = np.mean(accuracy_window) # Update model (online learning) trial_features = aggregate_trial(trial.chunks) clf.partial_fit(trial_features.reshape(1, -1), [true_label]) # Monitor performance if (trial_idx + 1) % 10 == 0: print(f"Trial {trial_idx + 1}: Recent accuracy = {recent_acc:.1%}") # Get state info z_mean, z_cov = clf.get_latent_state() uncertainty = np.trace(z_cov) print(f" State uncertainty: {uncertainty:.3f}") # Reset for next trial session.reset()print("\nSession complete!")print(f"Final accuracy (last {window_size} trials): {recent_acc:.1%}")# Save adapted model for next sessionz_final, P_final = clf.get_latent_state()import picklewith open("adapted_model_day1.pkl", "wb") as f: pickle.dump({ 'model': clf, 'state': (z_final, P_final), 'metadata': metadata }, f)
Pro Tip: For long BCI sessions, monitor the state uncertainty. If it grows too large, consider a brief recalibration (5-10 trials) to re-anchor the model.