AI Streaming with kSync

Learn how to build real-time AI streaming applications using kSync, enabling collaborative AI conversations with streaming responses and real-time synchronization.

Overview

This example demonstrates:
  • Real-time AI response streaming
  • Collaborative AI conversations
  • Conversation history management
  • Multiple AI models integration
  • Streaming state management with React hooks
  • Optimistic updates for instant UX

AI Streaming Store

stores/aiStore.ts
import { 
  KSync, 
  InMemoryStorage, 
  WebSocketSyncClient 
} from 'ksync';

export interface Message {
  id: string;
  conversationId: string;
  role: 'user' | 'assistant' | 'system';
  content: string;
  timestamp: number;
  userId?: string;
  model?: string;
  tokens?: number;
  streaming?: boolean;
  completed?: boolean;
  metadata?: {
    temperature?: number;
    maxTokens?: number;
    model?: string;
    promptTokens?: number;
    completionTokens?: number;
  };
}

export interface Conversation {
  id: string;
  title: string;
  participants: string[];
  model: string;
  systemPrompt?: string;
  createdAt: number;
  updatedAt: number;
  settings: {
    temperature: number;
    maxTokens: number;
    model: string;
    streamingEnabled: boolean;
  };
}

export interface AIState {
  conversations: Map<string, Conversation>;
  messages: Map<string, Message[]>; // conversationId -> messages
  activeStreams: Map<string, boolean>; // messageId -> isStreaming
  currentUser: { id: string; name: string } | null;
}

export const createAIStore = (userId: string) => {
  const ksync = new KSync<AIState>({
    storage: new InMemoryStorage(),
    syncClient: new WebSocketSyncClient('ws://localhost:8081', {
      auth: { type: 'jwt', token: 'user-token' }
    })
  });

  // Set up materializer
  ksync.materialize((events) => {
    const state: AIState = {
      conversations: new Map(),
      messages: new Map(),
      activeStreams: new Map(),
      currentUser: null
    };

    for (const event of events) {
      switch (event.type) {
        case 'user_connected':
          state.currentUser = event.data;
          break;

        case 'conversation_created':
          state.conversations.set(event.data.id, event.data);
          state.messages.set(event.data.id, []);
          break;

        case 'conversation_updated':
          const existing = state.conversations.get(event.data.id);
          if (existing) {
            state.conversations.set(event.data.id, { ...existing, ...event.data });
          }
          break;

        case 'message_started':
          const messages = state.messages.get(event.data.conversationId) || [];
          messages.push({
            ...event.data,
            streaming: true,
            completed: false
          });
          state.messages.set(event.data.conversationId, messages);
          state.activeStreams.set(event.data.id, true);
          break;

        case 'message_chunk':
          const convMessages = state.messages.get(event.data.conversationId) || [];
          const messageIndex = convMessages.findIndex(m => m.id === event.data.messageId);
          if (messageIndex >= 0) {
            convMessages[messageIndex].content += event.data.chunk;
          }
          break;

        case 'message_completed':
          const completeMessages = state.messages.get(event.data.conversationId) || [];
          const completeIndex = completeMessages.findIndex(m => m.id === event.data.messageId);
          if (completeIndex >= 0) {
            completeMessages[completeIndex].streaming = false;
            completeMessages[completeIndex].completed = true;
            completeMessages[completeIndex].metadata = event.data.metadata;
          }
          state.activeStreams.delete(event.data.messageId);
          break;

        case 'message_error':
          const errorMessages = state.messages.get(event.data.conversationId) || [];
          const errorIndex = errorMessages.findIndex(m => m.id === event.data.messageId);
          if (errorIndex >= 0) {
            errorMessages[errorIndex].streaming = false;
            errorMessages[errorIndex].content += `\n\n❌ Error: ${event.data.error}`;
          }
          state.activeStreams.delete(event.data.messageId);
          break;
      }
    }

    return state;
  });

  return ksync;
};

AI Service Layer

services/aiService.ts
import { KSync } from 'ksync';
import { AIState, Message, Conversation } from '../stores/aiStore';

export class AIService {
  constructor(private ksync: KSync<AIState>) {}

  // Create a new conversation
  async createConversation(
    title: string,
    systemPrompt?: string,
    settings?: Partial<Conversation['settings']>
  ): Promise<Conversation> {
    const conversation: Conversation = {
      id: generateId(),
      title,
      participants: [this.getCurrentUserId()],
      model: settings?.model || 'gpt-3.5-turbo',
      systemPrompt,
      createdAt: Date.now(),
      updatedAt: Date.now(),
      settings: {
        temperature: 0.7,
        maxTokens: 2048,
        model: 'gpt-3.5-turbo',
        streamingEnabled: true,
        ...settings
      }
    };

    await this.ksync.emit('conversation_created', conversation);
    return conversation;
  }

  // Send a message and get AI response
  async sendMessage(
    conversationId: string,
    content: string,
    options?: { stream?: boolean }
  ): Promise<void> {
    const conversation = this.getConversation(conversationId);
    if (!conversation) {
      throw new Error('Conversation not found');
    }

    // Create user message
    const userMessage: Message = {
      id: generateId(),
      conversationId,
      role: 'user',
      content,
      timestamp: Date.now(),
      userId: this.getCurrentUserId()
    };

    await this.ksync.emit('message_started', userMessage);
    await this.ksync.emit('message_completed', {
      conversationId,
      messageId: userMessage.id,
      metadata: { promptTokens: this.countTokens(content) }
    });

    // Create AI response message
    const aiMessage: Message = {
      id: generateId(),
      conversationId,
      role: 'assistant',
      content: '',
      timestamp: Date.now(),
      model: conversation.settings.model,
      streaming: true
    };

    await this.ksync.emit('message_started', aiMessage);

    // Get AI response (streaming or non-streaming)
    if (options?.stream !== false && conversation.settings.streamingEnabled) {
      await this.streamAIResponse(conversationId, aiMessage.id, conversation);
    } else {
      await this.getAIResponse(conversationId, aiMessage.id, conversation);
    }
  }

  // Stream AI response
  private async streamAIResponse(
    conversationId: string,
    messageId: string,
    conversation: Conversation
  ): Promise<void> {
    try {
      const messages = this.getConversationMessages(conversationId);
      const response = await fetch('/api/ai/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          messages: messages.map(m => ({ role: m.role, content: m.content })),
          model: conversation.settings.model,
          temperature: conversation.settings.temperature,
          max_tokens: conversation.settings.maxTokens,
          stream: true
        })
      });

      if (!response.body) {
        throw new Error('No response body');
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = '';

      while (true) {
        const { value, done } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop() || '';

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6);
            if (data === '[DONE]') {
              await this.ksync.emit('message_completed', {
                conversationId,
                messageId,
                metadata: {
                  model: conversation.settings.model,
                  completionTokens: this.countTokens(
                    this.getMessageById(messageId)?.content || ''
                  )
                }
              });
              return;
            }

            try {
              const parsed = JSON.parse(data);
              const chunk = parsed.choices?.[0]?.delta?.content || '';
              if (chunk) {
                await this.ksync.emit('message_chunk', {
                  conversationId,
                  messageId,
                  chunk
                });
              }
            } catch (e) {
              // Skip invalid JSON
            }
          }
        }
      }
    } catch (error) {
      await this.ksync.emit('message_error', {
        conversationId,
        messageId,
        error: error.message
      });
    }
  }

  // Get non-streaming AI response
  private async getAIResponse(
    conversationId: string,
    messageId: string,
    conversation: Conversation
  ): Promise<void> {
    try {
      const messages = this.getConversationMessages(conversationId);
      const response = await fetch('/api/ai/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          messages: messages.map(m => ({ role: m.role, content: m.content })),
          model: conversation.settings.model,
          temperature: conversation.settings.temperature,
          max_tokens: conversation.settings.maxTokens
        })
      });

      const data = await response.json();
      const content = data.choices?.[0]?.message?.content || '';

      await this.ksync.emit('message_chunk', {
        conversationId,
        messageId,
        chunk: content
      });

      await this.ksync.emit('message_completed', {
        conversationId,
        messageId,
        metadata: {
          model: conversation.settings.model,
          promptTokens: data.usage?.prompt_tokens,
          completionTokens: data.usage?.completion_tokens
        }
      });
    } catch (error) {
      await this.ksync.emit('message_error', {
        conversationId,
        messageId,
        error: error.message
      });
    }
  }

  // Update conversation settings
  async updateConversationSettings(
    conversationId: string,
    settings: Partial<Conversation['settings']>
  ): Promise<void> {
    await this.ksync.emit('conversation_updated', {
      id: conversationId,
      settings,
      updatedAt: Date.now()
    });
  }

  // Utility methods
  getConversation(id: string): Conversation | undefined {
    return this.ksync.state.conversations.get(id);
  }

  getConversationMessages(conversationId: string): Message[] {
    return this.ksync.state.messages.get(conversationId) || [];
  }

  getMessageById(messageId: string): Message | undefined {
    for (const messages of this.ksync.state.messages.values()) {
      const message = messages.find(m => m.id === messageId);
      if (message) return message;
    }
    return undefined;
  }

  getAllConversations(): Conversation[] {
    return Array.from(this.ksync.state.conversations.values())
      .sort((a, b) => b.updatedAt - a.updatedAt);
  }

  isStreaming(messageId: string): boolean {
    return this.ksync.state.activeStreams.get(messageId) || false;
  }

  private getCurrentUserId(): string {
    return this.ksync.state.currentUser?.id || 'anonymous';
  }

  private countTokens(text: string): number {
    // Simple token estimation (real implementation would use tiktoken)
    return Math.ceil(text.length / 4);
  }
}

function generateId(): string {
  return `${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
}

React Components

components/AIChat.tsx
import React, { useState, useEffect, useRef } from 'react';
import { useKSync, useKSyncStream } from 'ksync/react';
import { createAIStore } from '../stores/aiStore';
import { AIService } from '../services/aiService';
import { MessageList } from './MessageList';
import { MessageInput } from './MessageInput';
import { ConversationList } from './ConversationList';
import { SettingsPanel } from './SettingsPanel';

interface AIChatProps {
  userId: string;
}

export const AIChat: React.FC<AIChatProps> = ({ userId }) => {
  const [aiStore] = useState(() => createAIStore(userId));
  const [aiService] = useState(() => new AIService(aiStore));
  const { ksync } = useKSync(aiStore);
  
  const [activeConversationId, setActiveConversationId] = useState<string | null>(null);
  const [showSettings, setShowSettings] = useState(false);

  // Connect user on mount
  useEffect(() => {
    ksync.emit('user_connected', { id: userId, name: `User ${userId}` });
  }, [ksync, userId]);

  // Create initial conversation if none exists
  useEffect(() => {
    const conversations = aiService.getAllConversations();
    if (conversations.length === 0) {
      aiService.createConversation('New Chat').then(conv => {
        setActiveConversationId(conv.id);
      });
    } else {
      setActiveConversationId(conversations[0].id);
    }
  }, [aiService]);

  const activeConversation = activeConversationId ? 
    aiService.getConversation(activeConversationId) : null;

  const activeMessages = activeConversationId ? 
    aiService.getConversationMessages(activeConversationId) : [];

  return (
    <div className="ai-chat">
      <div className="sidebar">
        <ConversationList
          conversations={aiService.getAllConversations()}
          activeId={activeConversationId}
          onSelect={setActiveConversationId}
          onNew={() => {
            aiService.createConversation('New Chat').then(conv => {
              setActiveConversationId(conv.id);
            });
          }}
        />
      </div>

      <div className="main-chat">
        {activeConversation && (
          <>
            <div className="chat-header">
              <h2>{activeConversation.title}</h2>
              <div className="header-actions">
                <button 
                  onClick={() => setShowSettings(!showSettings)}
                  className="settings-btn"
                >
                  ⚙️ Settings
                </button>
              </div>
            </div>

            {showSettings && (
              <SettingsPanel
                conversation={activeConversation}
                onUpdate={(settings) => {
                  aiService.updateConversationSettings(activeConversation.id, settings);
                }}
                onClose={() => setShowSettings(false)}
              />
            )}

            <MessageList 
              messages={activeMessages}
              aiService={aiService}
            />

            <MessageInput
              onSend={(content) => {
                aiService.sendMessage(activeConversation.id, content);
              }}
              disabled={activeMessages.some(m => m.streaming)}
            />
          </>
        )}
      </div>
    </div>
  );
};
components/MessageList.tsx
import React, { useEffect, useRef } from 'react';
import { Message } from '../stores/aiStore';
import { AIService } from '../services/aiService';
import { StreamingMessage } from './StreamingMessage';

interface MessageListProps {
  messages: Message[];
  aiService: AIService;
}

export const MessageList: React.FC<MessageListProps> = ({ messages, aiService }) => {
  const messagesEndRef = useRef<HTMLDivElement>(null);

  const scrollToBottom = () => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  };

  useEffect(() => {
    scrollToBottom();
  }, [messages.length]);

  return (
    <div className="message-list">
      {messages.map((message) => (
        <div key={message.id} className={`message ${message.role}`}>
          <div className="message-header">
            <span className="role">
              {message.role === 'user' ? '👤' : '🤖'} 
              {message.role === 'assistant' && message.model && ` (${message.model})`}
            </span>
            <span className="timestamp">
              {new Date(message.timestamp).toLocaleTimeString()}
            </span>
            {message.metadata?.completionTokens && (
              <span className="tokens">
                {message.metadata.completionTokens} tokens
              </span>
            )}
          </div>
          
          <div className="message-content">
            {message.streaming ? (
              <StreamingMessage 
                message={message} 
                isStreaming={aiService.isStreaming(message.id)}
              />
            ) : (
              <div className="content">
                {formatContent(message.content)}
              </div>
            )}
          </div>
        </div>
      ))}
      <div ref={messagesEndRef} />
    </div>
  );
};

// Format message content with basic markdown support
function formatContent(content: string): React.ReactNode {
  // Basic markdown parsing for code blocks
  const parts = content.split(/```(\w*)\n([\s\S]*?)```/);
  
  return parts.map((part, index) => {
    if (index % 3 === 0) {
      // Regular text
      return <span key={index}>{part}</span>;
    } else if (index % 3 === 1) {
      // Language identifier
      return null;
    } else {
      // Code block
      const language = parts[index - 1];
      return (
        <pre key={index} className={`code-block language-${language}`}>
          <code>{part}</code>
        </pre>
      );
    }
  });
}
components/StreamingMessage.tsx
import React, { useEffect, useState } from 'react';
import { Message } from '../stores/aiStore';

interface StreamingMessageProps {
  message: Message;
  isStreaming: boolean;
}

export const StreamingMessage: React.FC<StreamingMessageProps> = ({ 
  message, 
  isStreaming 
}) => {
  const [displayedContent, setDisplayedContent] = useState('');
  const [cursor, setCursor] = useState(true);

  // Animate cursor when streaming
  useEffect(() => {
    if (!isStreaming) {
      setCursor(false);
      return;
    }

    const interval = setInterval(() => {
      setCursor(prev => !prev);
    }, 500);

    return () => clearInterval(interval);
  }, [isStreaming]);

  // Update displayed content
  useEffect(() => {
    setDisplayedContent(message.content);
  }, [message.content]);

  return (
    <div className="streaming-message">
      <div className="content">
        {displayedContent}
        {isStreaming && (
          <span className={`cursor ${cursor ? 'visible' : 'hidden'}`}>|</span>
        )}
      </div>
      
      {isStreaming && (
        <div className="streaming-indicator">
          <div className="dots">
            <span></span>
            <span></span>
            <span></span>
          </div>
          <span>AI is thinking...</span>
        </div>
      )}
    </div>
  );
};
components/MessageInput.tsx
import React, { useState, useRef, useCallback } from 'react';

interface MessageInputProps {
  onSend: (content: string) => void;
  disabled?: boolean;
}

export const MessageInput: React.FC<MessageInputProps> = ({ onSend, disabled }) => {
  const [content, setContent] = useState('');
  const textareaRef = useRef<HTMLTextAreaElement>(null);

  const handleSubmit = useCallback((e: React.FormEvent) => {
    e.preventDefault();
    
    if (!content.trim() || disabled) return;

    onSend(content.trim());
    setContent('');
    
    // Reset textarea height
    if (textareaRef.current) {
      textareaRef.current.style.height = 'auto';
    }
  }, [content, disabled, onSend]);

  const handleKeyDown = (e: React.KeyboardEvent) => {
    if (e.key === 'Enter' && !e.shiftKey) {
      e.preventDefault();
      handleSubmit(e);
    }
  };

  const adjustTextareaHeight = () => {
    const textarea = textareaRef.current;
    if (textarea) {
      textarea.style.height = 'auto';
      textarea.style.height = `${Math.min(textarea.scrollHeight, 200)}px`;
    }
  };

  return (
    <form onSubmit={handleSubmit} className="message-input">
      <div className="input-container">
        <textarea
          ref={textareaRef}
          value={content}
          onChange={(e) => {
            setContent(e.target.value);
            adjustTextareaHeight();
          }}
          onKeyDown={handleKeyDown}
          placeholder="Ask anything..."
          disabled={disabled}
          rows={1}
          className="input-field"
        />
        
        <button
          type="submit"
          disabled={!content.trim() || disabled}
          className="send-button"
        >
          {disabled ? '⏳' : '➤'}
        </button>
      </div>
      
      <div className="input-footer">
        <span className="hint">
          Press Enter to send, Shift+Enter for new line
        </span>
      </div>
    </form>
  );
};

Custom Hook for AI Streaming

hooks/useAIStream.ts
import { useEffect, useState, useCallback } from 'react';
import { KSync } from 'ksync';

export function useAIStream(ksync: KSync, messageId: string) {
  const [content, setContent] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
  const [isComplete, setIsComplete] = useState(false);
  const [error, setError] = useState<string | null>(null);

  useEffect(() => {
    const unsubscribeChunk = ksync.on('message_chunk', (event) => {
      if (event.data.messageId === messageId) {
        setContent(prev => prev + event.data.chunk);
      }
    });

    const unsubscribeStart = ksync.on('message_started', (event) => {
      if (event.data.id === messageId) {
        setIsStreaming(true);
        setContent('');
        setError(null);
      }
    });

    const unsubscribeComplete = ksync.on('message_completed', (event) => {
      if (event.data.messageId === messageId) {
        setIsStreaming(false);
        setIsComplete(true);
      }
    });

    const unsubscribeError = ksync.on('message_error', (event) => {
      if (event.data.messageId === messageId) {
        setIsStreaming(false);
        setError(event.data.error);
      }
    });

    return () => {
      unsubscribeChunk();
      unsubscribeStart();
      unsubscribeComplete();
      unsubscribeError();
    };
  }, [ksync, messageId]);

  const reset = useCallback(() => {
    setContent('');
    setIsStreaming(false);
    setIsComplete(false);
    setError(null);
  }, []);

  return {
    content,
    isStreaming,
    isComplete,
    error,
    reset
  };
}

Features Demonstrated

This AI streaming application showcases:
  1. Real-time Streaming: Live AI responses with character-by-character updates
  2. Collaborative Chat: Multiple users can participate in AI conversations
  3. Multiple Models: Support for different AI models and settings
  4. Conversation Management: Create, manage, and switch between conversations
  5. Optimistic Updates: Instant message display before streaming starts
  6. Error Handling: Graceful handling of streaming failures
  7. Token Tracking: Monitor usage and costs
  8. Responsive Design: Adapts to different screen sizes

Running the Example

  1. Set up the AI API server (using Express + OpenAI):
server/ai-server.js
const express = require('express');
const { OpenAI } = require('openai');
const WebSocket = require('ws');

const app = express();
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

app.use(express.json());

// Streaming chat endpoint
app.post('/api/ai/stream', async (req, res) => {
  const { messages, model, temperature, max_tokens } = req.body;
  
  res.writeHead(200, {
    'Content-Type': 'text/plain',
    'Access-Control-Allow-Origin': '*'
  });

  try {
    const stream = await openai.chat.completions.create({
      model: model || 'gpt-3.5-turbo',
      messages,
      temperature,
      max_tokens,
      stream: true
    });

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content || '';
      if (content) {
        res.write(`data: ${JSON.stringify({ choices: [{ delta: { content } }] })}\n\n`);
      }
    }
    
    res.write('data: [DONE]\n\n');
    res.end();
  } catch (error) {
    res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
    res.end();
  }
});

app.listen(3001, () => {
  console.log('AI server running on port 3001');
});
  1. Install dependencies:
npm install ksync openai ws express
  1. Set environment variables:
export OPENAI_API_KEY="your-openai-key"
  1. Run the application:
# Start AI server
node server/ai-server.js

# Start React app
npm run dev
This example demonstrates how kSync enables sophisticated real-time AI applications with seamless streaming and collaboration features.