diff --git a/midnight/src/App.tsx b/midnight/src/App.tsx index 15bf5df..f30fd36 100644 --- a/midnight/src/App.tsx +++ b/midnight/src/App.tsx @@ -1,6 +1,6 @@ import { useState } from "react"; import useWebSocket from "react-use-websocket"; -import SingleStream from "./LogViewer"; +import SingleStream from "./logs/Index"; function App() { const [recents, setRecents] = useState< @@ -85,7 +85,7 @@ function App() { {connected && ( setRecents({ ...recents, diff --git a/midnight/src/LogViewer.tsx b/midnight/src/LogViewer.tsx deleted file mode 100644 index 7e928ec..0000000 --- a/midnight/src/LogViewer.tsx +++ /dev/null @@ -1,155 +0,0 @@ -import { useMemo } from "react"; -import TextRenderer, { MAX_LINE_HEIGHT, keyFor } from "./StreamChunk"; -import { useStream } from "./StreamReciever"; -import LLMChat from "./nodes/LLMChat"; - -function Listener({ - stream, - onDisconnect, -}: { - stream: string; - onDisconnect: () => void; -}) { - const { data, readyState, unsub } = useStream>( - 2800, - stream - ); - - const logs = data; - - const chunkSections = useMemo(() => { - return logs.reduce( - (acc, cur) => { - if (cur.message.type !== "TEXT_CHUNK") { - if (acc[acc.length - 1].length > 0) { - acc.push([]); - } - return acc; - } - const chunks = cur.message.chunk.split("\n") as string[]; - chunks.forEach((chunk, index) => { - if (index > 0) { - acc.push([]); - } - acc[acc.length - 1].push(keyFor(cur, index)); - if (chunk === ".") { - acc.push([]); - } - }); - return acc; - }, - [[]] as string[][] - ); - }, [logs]); - - const keyToIndexInSectionMap = useMemo(() => { - const map = new Map(); - chunkSections.forEach((section) => { - section.forEach((key, indexInSection) => { - map.set(key, indexInSection); - }); - }); - return map; - }, [chunkSections]); - - const keyToSectionMap = useMemo(() => { - const map = new Map(); - chunkSections.forEach((section, index) => { - section.forEach((key) => { - map.set(key, index); - }); - }); - return map; - }, [chunkSections]); - - const cumulativePrice = useMemo(() => { - return logs - .filter( - (log) => - "price" in log.message && !Number.isNaN(parseFloat(log.message.price)) - ) - .map((log) => parseFloat(log.message.price)) - .reduce((acc, cur) => acc + cur, 0.0); - }, [logs]); - return ( -
- - {readyState === 3 && "Connection is Closed. Check console"} -

Cumulative Price: ${cumulativePrice.toFixed(3)}

-
- {logs.map((log, index) => { - switch (log.message.type) { - case "TEXT_CHUNK": - return ( - - ); - case "LLM_CHAT": - return ( -
- -
- ); - case "INIT": - return ( -
-
-
- ); - default: - return ( -
- - {log.message.type} - - {log.message.txId} - - -

{JSON.stringify(log.message, null, 2)}

-
- ); - } - })} -
-
- ); -} - -export default Listener; diff --git a/midnight/src/StreamChunk.tsx b/midnight/src/StreamChunk.tsx deleted file mode 100644 index beccde1..0000000 --- a/midnight/src/StreamChunk.tsx +++ /dev/null @@ -1,148 +0,0 @@ -import React, { useEffect, useRef, useState } from "react"; -import { useAnimation, motion } from "framer-motion"; - -export const keyFor = (log: any, index: number) => { - return log.id + "*" + index; -}; - -const TextRenderer: React.FC<{ - log: any; - keyToSectionMap: Map; - keyToIndexInSectionMap: Map; - indexFromEnd: number; - nSections: number; -}> = ({ - log, - indexFromEnd, - keyToSectionMap, - keyToIndexInSectionMap, - nSections, -}) => { - const text = log.message.chunk as string; - return ( - <> - {text.split(/[\n]/).map((item, index, array) => { - const sectionIndex = keyToSectionMap.get(keyFor(log, index)) as number; - const indexInSection = keyToIndexInSectionMap.get( - keyFor(log, index) - ) as number; - return ( - - - {index < array.length - 1 &&
} - - ); - })} - - ); -}; - -export const MAX_LINE_HEIGHT = "26px"; -const Chunk: React.FC<{ - text: string; - indexFromEnd: number; - indexInSection: number; - sectionIndex: number; - nSections: number; -}> = ({ text, indexFromEnd, indexInSection, sectionIndex, nSections }) => { - const ref = useRef(null); - const controls = useAnimation(); - const [finishedAppearing, setFinishedAppearing] = useState(false); - - useEffect(() => { - if (!finishedAppearing) { - controls - .start({ - opacity: 1, - transition: { - opacity: { - duration: 0.4, - }, - }, - }) - .then(() => { - setFinishedAppearing(true); - }); - } - controls.start({ - x: 0, - y: 0, - maxHeight: MAX_LINE_HEIGHT, - transition: { - x: { duration: 0.2 }, - y: { duration: 0.2 }, - maxHeight: { duration: 0.4 }, - }, - }); - }, [controls, finishedAppearing]); - - useEffect(() => { - if (sectionIndex === nSections - 2) { - const handle = setTimeout(() => { - if (sectionIndex === nSections - 2 && indexInSection === 0) { - if (ref.current) { - ref.current.scrollIntoView({ behavior: "smooth", block: "center" }); - } - } - controls.start({ - opacity: 1, - transition: { - opacity: { - duration: 0.3, - }, - }, - }); - }, 600); - return () => { - clearTimeout(handle); - }; - } - if (sectionIndex < nSections - 2) { - const handle = setTimeout(() => { - controls.start({ - opacity: 0.3, - transition: { - opacity: { - duration: 0.3, - }, - }, - }); - }, 600); - return () => { - clearTimeout(handle); - }; - } - }, [controls, nSections, indexInSection, sectionIndex]); - - return ( - - {text} - - ); -}; - -export default TextRenderer; diff --git a/midnight/src/StreamReciever.tsx b/midnight/src/StreamReciever.tsx deleted file mode 100644 index b9f8cec..0000000 --- a/midnight/src/StreamReciever.tsx +++ /dev/null @@ -1,65 +0,0 @@ -import throttle from "lodash.throttle"; -import { useEffect, useRef, useState } from "react"; -import useWebSocket from "react-use-websocket"; - -export function useStream(wpt: number, streamName: string) { - const [remoteData, setRemoteData] = useState([]); - const [localData, setLocalData] = useState([]); - const remoteDataRef = useRef(remoteData); - remoteDataRef.current = remoteData; - - const transferRemoteToLocal = useRef( - throttle( - (id: string) => { - console.log(id); - setLocalData((prev) => [ - ...prev, - ...remoteDataRef.current.slice(prev.length, prev.length + 1), - ]); - }, - 1000 * 60.0 / wpt, - { trailing: true, leading: false } - ) - ).current; // Using useRef to persist the throttled function across renders without recreation - - useEffect(() => { - if (localData.length < remoteData.length) { - transferRemoteToLocal("effect"); - } - }, [localData, remoteData, transferRemoteToLocal]); - - const { sendJsonMessage, readyState } = useWebSocket("ws://127.0.0.1:8080", { - onOpen: () => { - sendJsonMessage({ - command: "sub", - stream: streamName, - }); - }, - onMessage: (event) => { - const { type, data } = JSON.parse(event.data) as { - type: "init" | "append"; - data: T[]; - }; - if (type === "init") { - setRemoteData(data); - setLocalData([]); - } else { - setRemoteData((oldData) => [...oldData, ...data]); - } - transferRemoteToLocal("og"); - }, - onClose: () => { - console.log("Connection closed"); - }, - }); - - const unsub = () => { - console.trace("UNSUBBED CALLED"); - sendJsonMessage({ - command: "unsub", - stream: streamName, - }); - }; - - return { data: localData, unsub, readyState }; -} diff --git a/midnight/src/logs/Index.tsx b/midnight/src/logs/Index.tsx new file mode 100644 index 0000000..2d76f52 --- /dev/null +++ b/midnight/src/logs/Index.tsx @@ -0,0 +1,146 @@ +import { useEffect, useMemo } from "react"; +import LLMChat from "../nodes/LLMChat"; +import Price from "./Price"; +import { MAX_LINE_HEIGHT } from "./StreamChunk"; +import TextStreamingDisplay, { + TextChunkLog, + isTextChunk, +} from "./TextStreamingDisplay"; +import { useStream } from "./WSStreamer"; + +type Log = { + id: string; + message: { type: string } & Record; +}; + +export default function LogViewer({ + streamName, + onDisconnect, +}: { + streamName: string; + onDisconnect: () => void; +}) { + const { data, isClosed, unsub } = useStream(streamName); + + const splitSequences = useMemo( + () => splitIntoContinuousSequences(data), + [data] + ); + + return ( +
+ + {isClosed && ( + + Connection is Closed. Check console + + )} + +
+ {splitSequences.map((seq, index) => { + if (seq.isTextChunks) { + return ( + + ); + } else { + return renderNonTextChunkData(seq.data); + } + })} +
+
+ ); +} + +function renderNonTextChunkData(data: Log[]) { + return data.map((log, index) => { + switch (log.message.type) { + case "LLM_CHAT": + return ( +
+ +
+ ); + case "INIT": + return ( +
+
+
+ ); + default: + return ( +
+ + {log.message.type} + + {log.message.txId} + + +

{JSON.stringify(log.message, null, 2)}

+
+ ); + } + }); +} + +function splitIntoContinuousSequences( + data: Log[] +): Array< + | { isTextChunks: true; data: TextChunkLog[] } + | { isTextChunks: false; data: Log[] } +> { + const acc = [] as Array< + | { isTextChunks: true; data: TextChunkLog[] } + | { isTextChunks: false; data: Log[] } + >; + + // add to the current sequence or create a new sequence if isTextChunks differs + const add = (d: any) => { + const isCurTextChunk = isTextChunk(d); + if ( + acc.length === 0 || + acc[acc.length - 1].isTextChunks !== isCurTextChunk + ) { + acc.push({ isTextChunks: isCurTextChunk, data: [d] }); + } else { + acc[acc.length - 1].data.push(d); + } + }; + + for (const d of data) { + add(d); + } + + return acc; +} diff --git a/midnight/src/logs/Price.tsx b/midnight/src/logs/Price.tsx new file mode 100644 index 0000000..c61a8ed --- /dev/null +++ b/midnight/src/logs/Price.tsx @@ -0,0 +1,15 @@ +import { useMemo } from "react"; + +export default function Price({ logs }: { logs: Record[] }) { + const cumulativePrice = useMemo(() => { + return logs + .filter( + (log) => + "price" in log.message && !Number.isNaN(parseFloat(log.message.price)) + ) + .map((log) => parseFloat(log.message.price)) + .reduce((acc, cur) => acc + cur, 0.0); + }, [logs]); + + return

Cumulative Price: ${cumulativePrice.toFixed(3)}

; +} diff --git a/midnight/src/logs/StreamChunk.tsx b/midnight/src/logs/StreamChunk.tsx new file mode 100644 index 0000000..bbf52c7 --- /dev/null +++ b/midnight/src/logs/StreamChunk.tsx @@ -0,0 +1,69 @@ +import React, { useEffect, useRef, useState } from "react"; +import { useAnimation, motion } from "framer-motion"; + +export const MAX_LINE_HEIGHT = "26px"; + +const Chunk: React.FC<{ + text: string; + delayReady: boolean; +}> = ({ text, delayReady }) => { + const ref = useRef(null); + const controls = useAnimation(); + + useEffect(() => { + controls.start({ + x: 0, + y: 0, + maxHeight: MAX_LINE_HEIGHT, + transition: { + x: { duration: 0.2 }, + y: { duration: 0.2 }, + maxHeight: { duration: 0.4 }, + }, + }); + }, [controls]); + + useEffect(() => { + controls.start({ + opacity: delayReady ? 1 : 0.1, + transition: { + opacity: { + duration: 0.4, + }, + }, + }); + }, [controls, delayReady]); + + return ( + <> + {text.split("\n").map((line, index, array) => ( + <> + + {line} + + {index < array.length - 1 && } + + ))} + + ); +}; + +export default Chunk; diff --git a/midnight/src/logs/TextStreamingDisplay.tsx b/midnight/src/logs/TextStreamingDisplay.tsx new file mode 100644 index 0000000..395fff2 --- /dev/null +++ b/midnight/src/logs/TextStreamingDisplay.tsx @@ -0,0 +1,74 @@ +import { useEffect, useMemo, useRef, useState } from "react"; +import throttle from "lodash.throttle"; +import Chunk from "./StreamChunk"; + +export default function TextStreamingDisplay({ + logs, +}: { + logs: TextChunkLog[]; +}) { + const logsRef = useRef(logs); + logsRef.current = logs; + + const [delayedLogs, setDelayedLogs] = useState([]); + + const DEFAULT_TPM = 1000; + const tpm = useMemo(() => { + if ( + delayedLogs.length > 0 && + (delayedLogs[delayedLogs.length - 1].message.chunk === "." || + /[\n:]/g.test(delayedLogs[delayedLogs.length - 1].message.chunk)) + ) { + return DEFAULT_TPM / 4; + } + return DEFAULT_TPM; + }, [delayedLogs]); + + const addToDelayedLogs = useMemo( + () => + throttle( + () => { + setDelayedLogs((prev) => [ + ...prev, + ...logsRef.current.slice(prev.length, prev.length + 1), + ]); + }, + (1000 * 60.0) / tpm, + { trailing: true, leading: false } + ), + [tpm] + ); + + useEffect(() => { + if (delayedLogs.length < logs.length) { + addToDelayedLogs(); + } + }, [delayedLogs, logs, addToDelayedLogs]); + + return ( +
+ {logs.map((log, index) => ( + + ))} +
+ ); +} + +export function isTextChunk(d: Record): d is TextChunkLog { + return d.message.type === "TEXT_CHUNK"; +} + +export type TextChunkLog = { + id: string; + message: { chunk: string; type: "TEXT_CHUNK" } & Record; +}; diff --git a/midnight/src/logs/WSStreamer.ts b/midnight/src/logs/WSStreamer.ts new file mode 100644 index 0000000..0dffb86 --- /dev/null +++ b/midnight/src/logs/WSStreamer.ts @@ -0,0 +1,38 @@ +import { useState } from "react"; +import useWebSocket from "react-use-websocket"; + +export function useStream(name: string) { + const [data, setData] = useState([]); + + const { sendJsonMessage, readyState } = useWebSocket("ws://127.0.0.1:8080", { + onOpen: () => { + sendJsonMessage({ + command: "sub", + stream: name, + }); + }, + onMessage: (event) => { + const { type, data } = JSON.parse(event.data) as { + type: "init" | "append"; + data: T[]; + }; + if (type === "init") { + setData(data); + } else { + setData((oldData) => [...oldData, ...data]); + } + }, + onClose: () => { + console.log("Connection closed"); + }, + }); + + const unsub = () => { + sendJsonMessage({ + command: "unsub", + stream: name, + }); + }; + + return { data, unsub, isClosed: readyState === 3 }; +} diff --git a/midnight/src/nodes/LLMChat.tsx b/midnight/src/nodes/LLMChat.tsx index 87d403e..b0ef58b 100644 --- a/midnight/src/nodes/LLMChat.tsx +++ b/midnight/src/nodes/LLMChat.tsx @@ -29,7 +29,7 @@ const LLMChat = ({
Result: {message.result}
- Spec: {typeof message.spec} + Spec: {message.spec}
Price: {message.price} diff --git a/starlight/src/runner/text-streaming-demo.ts b/starlight/src/runner/text-streaming-demo.ts index 512f2da..f06ae61 100644 --- a/starlight/src/runner/text-streaming-demo.ts +++ b/starlight/src/runner/text-streaming-demo.ts @@ -1,7 +1,7 @@ import "@/runner/initializer"; import { defaultTx } from "@/project/context"; -import { emit } from "@/redis"; +import { emit, redis } from "@/redis"; import OpenAI from "openai"; import { Message, estimatePricing } from "@/llm/utils"; @@ -14,6 +14,7 @@ async function main() { role: "user", content: "What differentiated the Apple Macintosh from the Apple II and the Lisa?", + // "Where should i surf in san francisco?" }, ]; @@ -47,4 +48,6 @@ async function main() { process.stdout.write("\n[EOF]\n"); } await main(); + +await redis?.quit(); process.exit(0);