Skip to content

Advanced SSE Usage

This guide covers advanced features and techniques for using the Bupple AI Engine Server-Sent Events (SSE) capabilities.

Custom Event Types

Sending Different Event Types

php
use Bupple\Engine\Facades\Engine;

public function stream()
{
    return response()->stream(function () {
        // Send start event
        Engine::sse()->send([
            'type' => 'start',
            'message' => 'Starting stream'
        ], 'start');

        // Send content chunks
        $stream = Engine::engine()->stream($messages);
        foreach ($stream as $chunk) {
            Engine::sse()->send($chunk, 'content');
        }

        // Send completion event
        Engine::sse()->send([
            'type' => 'complete',
            'message' => 'Stream completed'
        ], 'complete');
    }, 200, [
        'Content-Type' => 'text/event-stream',
        'Cache-Control' => 'no-cache',
        'X-Accel-Buffering' => 'no',
    ]);
}

Client-Side Event Handling

javascript
const eventSource = new EventSource('/api/stream');

eventSource.addEventListener('start', (event) => {
    console.log('Stream started:', JSON.parse(event.data));
    // Initialize UI components
});

eventSource.addEventListener('content', (event) => {
    const data = JSON.parse(event.data);
    // Process content chunks
    appendContent(data.content);
});

eventSource.addEventListener('complete', (event) => {
    console.log('Stream completed:', JSON.parse(event.data));
    eventSource.close();
    // Finalize UI updates
});

eventSource.addEventListener('error', (event) => {
    console.error('Stream error:', event);
    eventSource.close();
    // Handle error in UI
});

Progress Tracking

Server-Side Progress Updates

php
public function streamWithProgress()
{
    return response()->stream(function () {
        $totalChunks = 10;
        $processedChunks = 0;

        $stream = Engine::engine()->stream($messages);
        foreach ($stream as $chunk) {
            $processedChunks++;
            $progress = ($processedChunks / $totalChunks) * 100;

            // Send content
            Engine::sse()->send($chunk, 'content');

            // Send progress update
            Engine::sse()->send([
                'progress' => $progress,
                'processed' => $processedChunks,
                'total' => $totalChunks
            ], 'progress');
        }
    }, 200, [
        'Content-Type' => 'text/event-stream',
        'Cache-Control' => 'no-cache',
        'X-Accel-Buffering' => 'no',
    ]);
}

Client-Side Progress Handling

javascript
let progressBar = document.querySelector('.progress-bar');

eventSource.addEventListener('progress', (event) => {
    const data = JSON.parse(event.data);
    progressBar.style.width = `${data.progress}%`;
    progressBar.setAttribute('aria-valuenow', data.progress);
    
    // Update progress text
    document.querySelector('.progress-text').textContent = 
        `Processing: ${data.processed}/${data.total}`;
});

Connection Management

Server-Side Connection Handling

php
public function streamWithConnectionManagement()
{
    return response()->stream(function () {
        // Set initial connection timeout
        set_time_limit(0);
        ini_set('max_execution_time', 0);

        // Send keepalive every 30 seconds
        $lastKeepalive = time();
        $keepaliveInterval = 30;

        $stream = Engine::engine()->stream($messages);
        foreach ($stream as $chunk) {
            // Send content
            Engine::sse()->send($chunk, 'content');

            // Check if keepalive needed
            if (time() - $lastKeepalive >= $keepaliveInterval) {
                Engine::sse()->send(['type' => 'keepalive'], 'keepalive');
                $lastKeepalive = time();
            }

            // Check connection status
            if (connection_aborted()) {
                // Clean up and exit
                break;
            }
        }
    }, 200, [
        'Content-Type' => 'text/event-stream',
        'Cache-Control' => 'no-cache',
        'X-Accel-Buffering' => 'no',
    ]);
}

Client-Side Connection Management

javascript
class SSEManager {
    constructor(url, options = {}) {
        this.url = url;
        this.options = {
            reconnectAttempts: 3,
            reconnectDelay: 1000,
            ...options
        };
        this.attemptCount = 0;
        this.connect();
    }

    connect() {
        this.eventSource = new EventSource(this.url);
        this.setupEventListeners();
    }

    setupEventListeners() {
        this.eventSource.addEventListener('open', () => {
            console.log('Connection established');
            this.attemptCount = 0;
        });

        this.eventSource.addEventListener('error', (error) => {
            console.error('Connection error:', error);
            this.handleError();
        });

        this.eventSource.addEventListener('keepalive', () => {
            console.log('Keepalive received');
        });
    }

    handleError() {
        if (this.attemptCount < this.options.reconnectAttempts) {
            this.attemptCount++;
            console.log(`Reconnecting... Attempt ${this.attemptCount}`);
            
            setTimeout(() => {
                this.eventSource.close();
                this.connect();
            }, this.options.reconnectDelay);
        } else {
            console.error('Max reconnection attempts reached');
            this.eventSource.close();
        }
    }

    close() {
        if (this.eventSource) {
            this.eventSource.close();
        }
    }
}

// Usage
const sseManager = new SSEManager('/api/stream', {
    reconnectAttempts: 5,
    reconnectDelay: 2000
});

Advanced Error Handling

Server-Side Error Handling

php
public function streamWithErrorHandling()
{
    return response()->stream(function () {
        try {
            $stream = Engine::engine()->stream($messages);
            
            foreach ($stream as $chunk) {
                try {
                    // Process and send chunk
                    $processedChunk = $this->processChunk($chunk);
                    Engine::sse()->send($processedChunk, 'content');
                } catch (\Exception $e) {
                    // Handle chunk processing error
                    Engine::sse()->send([
                        'error' => 'Chunk processing failed',
                        'message' => $e->getMessage(),
                        'chunk_id' => $chunk['id'] ?? null
                    ], 'chunk_error');
                    
                    // Continue with next chunk
                    continue;
                }
            }
        } catch (\Exception $e) {
            // Handle stream error
            Engine::sse()->send([
                'error' => 'Stream processing failed',
                'message' => $e->getMessage()
            ], 'stream_error');
            
            // Attempt recovery or graceful shutdown
            $this->handleStreamError($e);
        } finally {
            // Clean up resources
            $this->cleanup();
        }
    }, 200, [
        'Content-Type' => 'text/event-stream',
        'Cache-Control' => 'no-cache',
        'X-Accel-Buffering' => 'no',
    ]);
}

Client-Side Error Handling

javascript
class SSEErrorHandler {
    constructor(eventSource) {
        this.eventSource = eventSource;
        this.setupErrorHandlers();
    }

    setupErrorHandlers() {
        this.eventSource.addEventListener('chunk_error', (event) => {
            const error = JSON.parse(event.data);
            console.warn('Chunk processing error:', error);
            this.handleChunkError(error);
        });

        this.eventSource.addEventListener('stream_error', (event) => {
            const error = JSON.parse(event.data);
            console.error('Stream error:', error);
            this.handleStreamError(error);
        });

        this.eventSource.addEventListener('error', (event) => {
            console.error('Connection error:', event);
            this.handleConnectionError(event);
        });
    }

    handleChunkError(error) {
        // Update UI to show chunk error
        this.showError(`Chunk processing failed: ${error.message}`);
        
        // Optionally retry chunk
        if (error.chunk_id) {
            this.retryChunk(error.chunk_id);
        }
    }

    handleStreamError(error) {
        // Update UI to show stream error
        this.showError(`Stream failed: ${error.message}`);
        
        // Close connection and cleanup
        this.eventSource.close();
        this.cleanup();
    }

    handleConnectionError(event) {
        // Update UI to show connection error
        this.showError('Connection lost');
        
        // Attempt to reconnect
        this.reconnect();
    }

    showError(message) {
        // Update UI with error message
        const errorContainer = document.querySelector('.error-container');
        errorContainer.textContent = message;
        errorContainer.style.display = 'block';
    }

    cleanup() {
        // Perform cleanup tasks
    }

    reconnect() {
        // Implement reconnection logic
    }

    retryChunk(chunkId) {
        // Implement chunk retry logic
    }
}

// Usage
const eventSource = new EventSource('/api/stream');
const errorHandler = new SSEErrorHandler(eventSource);

Next Steps

For more advanced topics, check out:

Released under the MIT License.