[go: nahoru, domu]

Skip to content

Commit

Permalink
Is stream removed
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinlu1248 committed Jun 22, 2024
1 parent dfce046 commit 84871d0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 60 deletions.
8 changes: 0 additions & 8 deletions sweep_chat/components/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,6 @@ function App({ defaultMessageId = '' }: { defaultMessageId?: string }) {
codeSuggestions: CodeSuggestion[],
commitToPR: boolean
) => {
isStream.current = true
let currentCodeSuggestions: StatefulCodeSuggestion[] = codeSuggestions.map(
(suggestion) => ({
...suggestion,
Expand Down Expand Up @@ -612,7 +611,6 @@ function App({ defaultMessageId = '' }: { defaultMessageId?: string }) {
annotations: { pulls: PullRequest[] } = { pulls: [] }
) => {
setIsLoading(true)
isStream.current = true
var currentSnippets = snippets
if (currentSnippets.length == 0) {
try {
Expand Down Expand Up @@ -672,7 +670,6 @@ function App({ defaultMessageId = '' }: { defaultMessageId?: string }) {
duration: Infinity,
})
setIsLoading(false)
isStream.current = false
posthog_capture('chat errored', {
error: e.message,
})
Expand Down Expand Up @@ -732,8 +729,6 @@ function App({ defaultMessageId = '' }: { defaultMessageId?: string }) {
throw e
}

isStream.current = false

var lastMessage = streamedMessages[streamedMessages.length - 1]
if (
lastMessage.role == 'function' &&
Expand Down Expand Up @@ -799,7 +794,6 @@ function App({ defaultMessageId = '' }: { defaultMessageId?: string }) {
pull_request_number: pr.number,
})
try {
isStream.current = true
let scrolledToBottom = false
let currentPrValidationStatuses: PrValidationStatus[] = []
setMessages([
Expand Down Expand Up @@ -836,7 +830,6 @@ function App({ defaultMessageId = '' }: { defaultMessageId?: string }) {
scrolledToBottom = true
}
}
isStream.current = false

const prFailed = currentPrValidationStatuses.some(
(status: PrValidationStatus) =>
Expand All @@ -854,7 +847,6 @@ function App({ defaultMessageId = '' }: { defaultMessageId?: string }) {
variant: 'destructive',
})
} finally {
isStream.current = false
setIsValidatingPR(false)
}
}
Expand Down
117 changes: 65 additions & 52 deletions sweep_chat/lib/streamingUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,65 +12,78 @@ async function* streamMessages(
let buffer = ''
let timeoutId: ReturnType<typeof setTimeout> | null = null

while (!done && (isStream ? isStream.current : true)) {
try {
const { value, done: streamDone } = await Promise.race([
reader.read(),
new Promise<ReadableStreamDefaultReadResult<Uint8Array>>(
(_, reject) => {
if (timeoutId) {
clearTimeout(timeoutId)
}
timeoutId = setTimeout(
() =>
reject(
new Error(
'Stream timeout after ' +
timeout / 1000 +
' seconds, this is likely caused by the LLM freezing. You can try again by editing your last message. Further, decreasing the number of snippets to retrieve in the settings will help mitigate this issue.'
)
),
timeout
)
}
),
])
if (isStream) {
isStream.current = true
}

if (streamDone) {
done = true
continue
}
try {
while (!done && (isStream ? isStream.current : true)) {
try {
const { value, done: streamDone } = await Promise.race([
reader.read(),
new Promise<ReadableStreamDefaultReadResult<Uint8Array>>(
(_, reject) => {
if (timeoutId) {
clearTimeout(timeoutId)
}
timeoutId = setTimeout(
() =>
reject(
new Error(
'Stream timeout after ' +
timeout / 1000 +
' seconds, this is likely caused by the LLM freezing. You can try again by editing your last message. Further, decreasing the number of snippets to retrieve in the settings will help mitigate this issue.'
)
),
timeout
)
}
),
])

if (value) {
const decodedValue = new TextDecoder().decode(value)
if (buffer.length + decodedValue.length > maxBufferSize) {
throw new Error('Buffer size exceeded. Possible malformed input.')
if (streamDone) {
done = true
continue
}
buffer += decodedValue

const [parsedObjects, currentIndex] = getJSONPrefix(buffer)
for (let parsedObject of parsedObjects) {
yield parsedObject
if (value) {
const decodedValue = new TextDecoder().decode(value)
if (buffer.length + decodedValue.length > maxBufferSize) {
throw new Error('Buffer size exceeded. Possible malformed input.')
}
buffer += decodedValue

const [parsedObjects, currentIndex] = getJSONPrefix(buffer)
for (let parsedObject of parsedObjects) {
yield parsedObject
}
buffer = buffer.slice(currentIndex)
if (
buffer.length > 0 &&
!buffer.startsWith('{') &&
!buffer.startsWith('[') &&
!buffer.startsWith('(')
) {
// If there's remaining data that doesn't start with '{', it's likely incomplete
// Wait for the next chunk before processing
continue
}
}
buffer = buffer.slice(currentIndex)
if (
buffer.length > 0 &&
!buffer.startsWith('{') &&
!buffer.startsWith('[') &&
!buffer.startsWith('(')
) {
// If there's remaining data that doesn't start with '{', it's likely incomplete
// Wait for the next chunk before processing
continue
} catch (error) {
console.error('Error during streaming:', error)
throw error // Rethrow timeout errors
} finally {
if (timeoutId) {
clearTimeout(timeoutId)
}
}
} catch (error) {
console.error('Error during streaming:', error)
throw error // Rethrow timeout errors
} finally {
if (timeoutId) {
clearTimeout(timeoutId)
}
}
} catch (error) {
console.error('Error during streaming:', error)
throw error // Rethrow timeout errors
} finally {
if (isStream) {
isStream.current = false
}
}

Expand Down

0 comments on commit 84871d0

Please sign in to comment.