Build real-time AI streaming applications with kSync - demonstrates streaming responses, collaborative AI chats, and conversation management
import {
KSync,
InMemoryStorage,
WebSocketSyncClient
} from 'ksync';
export interface Message {
id: string;
conversationId: string;
role: 'user' | 'assistant' | 'system';
content: string;
timestamp: number;
userId?: string;
model?: string;
tokens?: number;
streaming?: boolean;
completed?: boolean;
metadata?: {
temperature?: number;
maxTokens?: number;
model?: string;
promptTokens?: number;
completionTokens?: number;
};
}
export interface Conversation {
id: string;
title: string;
participants: string[];
model: string;
systemPrompt?: string;
createdAt: number;
updatedAt: number;
settings: {
temperature: number;
maxTokens: number;
model: string;
streamingEnabled: boolean;
};
}
export interface AIState {
conversations: Map<string, Conversation>;
messages: Map<string, Message[]>; // conversationId -> messages
activeStreams: Map<string, boolean>; // messageId -> isStreaming
currentUser: { id: string; name: string } | null;
}
export const createAIStore = (userId: string) => {
const ksync = new KSync<AIState>({
storage: new InMemoryStorage(),
syncClient: new WebSocketSyncClient('ws://localhost:8081', {
auth: { type: 'jwt', token: 'user-token' }
})
});
// Set up materializer
ksync.materialize((events) => {
const state: AIState = {
conversations: new Map(),
messages: new Map(),
activeStreams: new Map(),
currentUser: null
};
for (const event of events) {
switch (event.type) {
case 'user_connected':
state.currentUser = event.data;
break;
case 'conversation_created':
state.conversations.set(event.data.id, event.data);
state.messages.set(event.data.id, []);
break;
case 'conversation_updated':
const existing = state.conversations.get(event.data.id);
if (existing) {
state.conversations.set(event.data.id, { ...existing, ...event.data });
}
break;
case 'message_started':
const messages = state.messages.get(event.data.conversationId) || [];
messages.push({
...event.data,
streaming: true,
completed: false
});
state.messages.set(event.data.conversationId, messages);
state.activeStreams.set(event.data.id, true);
break;
case 'message_chunk':
const convMessages = state.messages.get(event.data.conversationId) || [];
const messageIndex = convMessages.findIndex(m => m.id === event.data.messageId);
if (messageIndex >= 0) {
convMessages[messageIndex].content += event.data.chunk;
}
break;
case 'message_completed':
const completeMessages = state.messages.get(event.data.conversationId) || [];
const completeIndex = completeMessages.findIndex(m => m.id === event.data.messageId);
if (completeIndex >= 0) {
completeMessages[completeIndex].streaming = false;
completeMessages[completeIndex].completed = true;
completeMessages[completeIndex].metadata = event.data.metadata;
}
state.activeStreams.delete(event.data.messageId);
break;
case 'message_error':
const errorMessages = state.messages.get(event.data.conversationId) || [];
const errorIndex = errorMessages.findIndex(m => m.id === event.data.messageId);
if (errorIndex >= 0) {
errorMessages[errorIndex].streaming = false;
errorMessages[errorIndex].content += `\n\n❌ Error: ${event.data.error}`;
}
state.activeStreams.delete(event.data.messageId);
break;
}
}
return state;
});
return ksync;
};
import { KSync } from 'ksync';
import { AIState, Message, Conversation } from '../stores/aiStore';
export class AIService {
constructor(private ksync: KSync<AIState>) {}
// Create a new conversation
async createConversation(
title: string,
systemPrompt?: string,
settings?: Partial<Conversation['settings']>
): Promise<Conversation> {
const conversation: Conversation = {
id: generateId(),
title,
participants: [this.getCurrentUserId()],
model: settings?.model || 'gpt-3.5-turbo',
systemPrompt,
createdAt: Date.now(),
updatedAt: Date.now(),
settings: {
temperature: 0.7,
maxTokens: 2048,
model: 'gpt-3.5-turbo',
streamingEnabled: true,
...settings
}
};
await this.ksync.emit('conversation_created', conversation);
return conversation;
}
// Send a message and get AI response
async sendMessage(
conversationId: string,
content: string,
options?: { stream?: boolean }
): Promise<void> {
const conversation = this.getConversation(conversationId);
if (!conversation) {
throw new Error('Conversation not found');
}
// Create user message
const userMessage: Message = {
id: generateId(),
conversationId,
role: 'user',
content,
timestamp: Date.now(),
userId: this.getCurrentUserId()
};
await this.ksync.emit('message_started', userMessage);
await this.ksync.emit('message_completed', {
conversationId,
messageId: userMessage.id,
metadata: { promptTokens: this.countTokens(content) }
});
// Create AI response message
const aiMessage: Message = {
id: generateId(),
conversationId,
role: 'assistant',
content: '',
timestamp: Date.now(),
model: conversation.settings.model,
streaming: true
};
await this.ksync.emit('message_started', aiMessage);
// Get AI response (streaming or non-streaming)
if (options?.stream !== false && conversation.settings.streamingEnabled) {
await this.streamAIResponse(conversationId, aiMessage.id, conversation);
} else {
await this.getAIResponse(conversationId, aiMessage.id, conversation);
}
}
// Stream AI response
private async streamAIResponse(
conversationId: string,
messageId: string,
conversation: Conversation
): Promise<void> {
try {
const messages = this.getConversationMessages(conversationId);
const response = await fetch('/api/ai/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: messages.map(m => ({ role: m.role, content: m.content })),
model: conversation.settings.model,
temperature: conversation.settings.temperature,
max_tokens: conversation.settings.maxTokens,
stream: true
})
});
if (!response.body) {
throw new Error('No response body');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
await this.ksync.emit('message_completed', {
conversationId,
messageId,
metadata: {
model: conversation.settings.model,
completionTokens: this.countTokens(
this.getMessageById(messageId)?.content || ''
)
}
});
return;
}
try {
const parsed = JSON.parse(data);
const chunk = parsed.choices?.[0]?.delta?.content || '';
if (chunk) {
await this.ksync.emit('message_chunk', {
conversationId,
messageId,
chunk
});
}
} catch (e) {
// Skip invalid JSON
}
}
}
}
} catch (error) {
await this.ksync.emit('message_error', {
conversationId,
messageId,
error: error.message
});
}
}
// Get non-streaming AI response
private async getAIResponse(
conversationId: string,
messageId: string,
conversation: Conversation
): Promise<void> {
try {
const messages = this.getConversationMessages(conversationId);
const response = await fetch('/api/ai/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: messages.map(m => ({ role: m.role, content: m.content })),
model: conversation.settings.model,
temperature: conversation.settings.temperature,
max_tokens: conversation.settings.maxTokens
})
});
const data = await response.json();
const content = data.choices?.[0]?.message?.content || '';
await this.ksync.emit('message_chunk', {
conversationId,
messageId,
chunk: content
});
await this.ksync.emit('message_completed', {
conversationId,
messageId,
metadata: {
model: conversation.settings.model,
promptTokens: data.usage?.prompt_tokens,
completionTokens: data.usage?.completion_tokens
}
});
} catch (error) {
await this.ksync.emit('message_error', {
conversationId,
messageId,
error: error.message
});
}
}
// Update conversation settings
async updateConversationSettings(
conversationId: string,
settings: Partial<Conversation['settings']>
): Promise<void> {
await this.ksync.emit('conversation_updated', {
id: conversationId,
settings,
updatedAt: Date.now()
});
}
// Utility methods
getConversation(id: string): Conversation | undefined {
return this.ksync.state.conversations.get(id);
}
getConversationMessages(conversationId: string): Message[] {
return this.ksync.state.messages.get(conversationId) || [];
}
getMessageById(messageId: string): Message | undefined {
for (const messages of this.ksync.state.messages.values()) {
const message = messages.find(m => m.id === messageId);
if (message) return message;
}
return undefined;
}
getAllConversations(): Conversation[] {
return Array.from(this.ksync.state.conversations.values())
.sort((a, b) => b.updatedAt - a.updatedAt);
}
isStreaming(messageId: string): boolean {
return this.ksync.state.activeStreams.get(messageId) || false;
}
private getCurrentUserId(): string {
return this.ksync.state.currentUser?.id || 'anonymous';
}
private countTokens(text: string): number {
// Simple token estimation (real implementation would use tiktoken)
return Math.ceil(text.length / 4);
}
}
function generateId(): string {
return `${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
}
import React, { useState, useEffect, useRef } from 'react';
import { useKSync, useKSyncStream } from 'ksync/react';
import { createAIStore } from '../stores/aiStore';
import { AIService } from '../services/aiService';
import { MessageList } from './MessageList';
import { MessageInput } from './MessageInput';
import { ConversationList } from './ConversationList';
import { SettingsPanel } from './SettingsPanel';
interface AIChatProps {
userId: string;
}
export const AIChat: React.FC<AIChatProps> = ({ userId }) => {
const [aiStore] = useState(() => createAIStore(userId));
const [aiService] = useState(() => new AIService(aiStore));
const { ksync } = useKSync(aiStore);
const [activeConversationId, setActiveConversationId] = useState<string | null>(null);
const [showSettings, setShowSettings] = useState(false);
// Connect user on mount
useEffect(() => {
ksync.emit('user_connected', { id: userId, name: `User ${userId}` });
}, [ksync, userId]);
// Create initial conversation if none exists
useEffect(() => {
const conversations = aiService.getAllConversations();
if (conversations.length === 0) {
aiService.createConversation('New Chat').then(conv => {
setActiveConversationId(conv.id);
});
} else {
setActiveConversationId(conversations[0].id);
}
}, [aiService]);
const activeConversation = activeConversationId ?
aiService.getConversation(activeConversationId) : null;
const activeMessages = activeConversationId ?
aiService.getConversationMessages(activeConversationId) : [];
return (
<div className="ai-chat">
<div className="sidebar">
<ConversationList
conversations={aiService.getAllConversations()}
activeId={activeConversationId}
onSelect={setActiveConversationId}
onNew={() => {
aiService.createConversation('New Chat').then(conv => {
setActiveConversationId(conv.id);
});
}}
/>
</div>
<div className="main-chat">
{activeConversation && (
<>
<div className="chat-header">
<h2>{activeConversation.title}</h2>
<div className="header-actions">
<button
onClick={() => setShowSettings(!showSettings)}
className="settings-btn"
>
⚙️ Settings
</button>
</div>
</div>
{showSettings && (
<SettingsPanel
conversation={activeConversation}
onUpdate={(settings) => {
aiService.updateConversationSettings(activeConversation.id, settings);
}}
onClose={() => setShowSettings(false)}
/>
)}
<MessageList
messages={activeMessages}
aiService={aiService}
/>
<MessageInput
onSend={(content) => {
aiService.sendMessage(activeConversation.id, content);
}}
disabled={activeMessages.some(m => m.streaming)}
/>
</>
)}
</div>
</div>
);
};
import React, { useEffect, useRef } from 'react';
import { Message } from '../stores/aiStore';
import { AIService } from '../services/aiService';
import { StreamingMessage } from './StreamingMessage';
interface MessageListProps {
messages: Message[];
aiService: AIService;
}
export const MessageList: React.FC<MessageListProps> = ({ messages, aiService }) => {
const messagesEndRef = useRef<HTMLDivElement>(null);
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};
useEffect(() => {
scrollToBottom();
}, [messages.length]);
return (
<div className="message-list">
{messages.map((message) => (
<div key={message.id} className={`message ${message.role}`}>
<div className="message-header">
<span className="role">
{message.role === 'user' ? '👤' : '🤖'}
{message.role === 'assistant' && message.model && ` (${message.model})`}
</span>
<span className="timestamp">
{new Date(message.timestamp).toLocaleTimeString()}
</span>
{message.metadata?.completionTokens && (
<span className="tokens">
{message.metadata.completionTokens} tokens
</span>
)}
</div>
<div className="message-content">
{message.streaming ? (
<StreamingMessage
message={message}
isStreaming={aiService.isStreaming(message.id)}
/>
) : (
<div className="content">
{formatContent(message.content)}
</div>
)}
</div>
</div>
))}
<div ref={messagesEndRef} />
</div>
);
};
// Format message content with basic markdown support
function formatContent(content: string): React.ReactNode {
// Basic markdown parsing for code blocks
const parts = content.split(/```(\w*)\n([\s\S]*?)```/);
return parts.map((part, index) => {
if (index % 3 === 0) {
// Regular text
return <span key={index}>{part}</span>;
} else if (index % 3 === 1) {
// Language identifier
return null;
} else {
// Code block
const language = parts[index - 1];
return (
<pre key={index} className={`code-block language-${language}`}>
<code>{part}</code>
</pre>
);
}
});
}
import React, { useEffect, useState } from 'react';
import { Message } from '../stores/aiStore';
interface StreamingMessageProps {
message: Message;
isStreaming: boolean;
}
export const StreamingMessage: React.FC<StreamingMessageProps> = ({
message,
isStreaming
}) => {
const [displayedContent, setDisplayedContent] = useState('');
const [cursor, setCursor] = useState(true);
// Animate cursor when streaming
useEffect(() => {
if (!isStreaming) {
setCursor(false);
return;
}
const interval = setInterval(() => {
setCursor(prev => !prev);
}, 500);
return () => clearInterval(interval);
}, [isStreaming]);
// Update displayed content
useEffect(() => {
setDisplayedContent(message.content);
}, [message.content]);
return (
<div className="streaming-message">
<div className="content">
{displayedContent}
{isStreaming && (
<span className={`cursor ${cursor ? 'visible' : 'hidden'}`}>|</span>
)}
</div>
{isStreaming && (
<div className="streaming-indicator">
<div className="dots">
<span>•</span>
<span>•</span>
<span>•</span>
</div>
<span>AI is thinking...</span>
</div>
)}
</div>
);
};
import React, { useState, useRef, useCallback } from 'react';
interface MessageInputProps {
onSend: (content: string) => void;
disabled?: boolean;
}
export const MessageInput: React.FC<MessageInputProps> = ({ onSend, disabled }) => {
const [content, setContent] = useState('');
const textareaRef = useRef<HTMLTextAreaElement>(null);
const handleSubmit = useCallback((e: React.FormEvent) => {
e.preventDefault();
if (!content.trim() || disabled) return;
onSend(content.trim());
setContent('');
// Reset textarea height
if (textareaRef.current) {
textareaRef.current.style.height = 'auto';
}
}, [content, disabled, onSend]);
const handleKeyDown = (e: React.KeyboardEvent) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
handleSubmit(e);
}
};
const adjustTextareaHeight = () => {
const textarea = textareaRef.current;
if (textarea) {
textarea.style.height = 'auto';
textarea.style.height = `${Math.min(textarea.scrollHeight, 200)}px`;
}
};
return (
<form onSubmit={handleSubmit} className="message-input">
<div className="input-container">
<textarea
ref={textareaRef}
value={content}
onChange={(e) => {
setContent(e.target.value);
adjustTextareaHeight();
}}
onKeyDown={handleKeyDown}
placeholder="Ask anything..."
disabled={disabled}
rows={1}
className="input-field"
/>
<button
type="submit"
disabled={!content.trim() || disabled}
className="send-button"
>
{disabled ? '⏳' : '➤'}
</button>
</div>
<div className="input-footer">
<span className="hint">
Press Enter to send, Shift+Enter for new line
</span>
</div>
</form>
);
};
import { useEffect, useState, useCallback } from 'react';
import { KSync } from 'ksync';
export function useAIStream(ksync: KSync, messageId: string) {
const [content, setContent] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const [isComplete, setIsComplete] = useState(false);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
const unsubscribeChunk = ksync.on('message_chunk', (event) => {
if (event.data.messageId === messageId) {
setContent(prev => prev + event.data.chunk);
}
});
const unsubscribeStart = ksync.on('message_started', (event) => {
if (event.data.id === messageId) {
setIsStreaming(true);
setContent('');
setError(null);
}
});
const unsubscribeComplete = ksync.on('message_completed', (event) => {
if (event.data.messageId === messageId) {
setIsStreaming(false);
setIsComplete(true);
}
});
const unsubscribeError = ksync.on('message_error', (event) => {
if (event.data.messageId === messageId) {
setIsStreaming(false);
setError(event.data.error);
}
});
return () => {
unsubscribeChunk();
unsubscribeStart();
unsubscribeComplete();
unsubscribeError();
};
}, [ksync, messageId]);
const reset = useCallback(() => {
setContent('');
setIsStreaming(false);
setIsComplete(false);
setError(null);
}, []);
return {
content,
isStreaming,
isComplete,
error,
reset
};
}
const express = require('express');
const { OpenAI } = require('openai');
const WebSocket = require('ws');
const app = express();
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
app.use(express.json());
// Streaming chat endpoint
app.post('/api/ai/stream', async (req, res) => {
const { messages, model, temperature, max_tokens } = req.body;
res.writeHead(200, {
'Content-Type': 'text/plain',
'Access-Control-Allow-Origin': '*'
});
try {
const stream = await openai.chat.completions.create({
model: model || 'gpt-3.5-turbo',
messages,
temperature,
max_tokens,
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
res.write(`data: ${JSON.stringify({ choices: [{ delta: { content } }] })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});
app.listen(3001, () => {
console.log('AI server running on port 3001');
});
npm install ksync openai ws express
export OPENAI_API_KEY="your-openai-key"
# Start AI server
node server/ai-server.js
# Start React app
npm run dev