Files
bifrost/core/providers/utils/stream.go
Beyhan Oğur 880f412e2c first commit
2026-04-26 21:52:23 +03:00

76 lines
2.7 KiB
Go

package utils
import (
"context"
schemas "github.com/maximhq/bifrost/core/schemas"
)
// CheckFirstStreamChunkForError reads the first chunk from a streaming channel to detect
// errors returned inside HTTP 200 SSE streams (e.g., providers that send rate limit
// errors as SSE events instead of HTTP 429).
//
// If the first chunk is an error, it drains the source channel in the background
// (so the provider goroutine can exit cleanly) and returns the error for synchronous
// handling, enabling retries and fallbacks. The returned drainDone channel is closed
// once the drain completes — callers must wait on it before releasing any resources
// (e.g., plugin pipelines) that the provider goroutine's postHookRunner may still reference.
//
// If the first chunk is valid data, it returns a wrapped channel that re-emits
// the first chunk followed by all remaining chunks from the source. drainDone is
// closed when the wrapper goroutine finishes forwarding the source stream.
//
// If the source channel is closed immediately (empty stream), it returns a
// nil channel with nil error. drainDone is already closed.
//
// The ctx argument cancels the background forwarding goroutine if the consumer
// abandons the returned wrapped channel. On ctx.Done the goroutine drains the
// source stream so the upstream provider's blocked send can exit cleanly.
func CheckFirstStreamChunkForError(
ctx context.Context,
stream chan *schemas.BifrostStreamChunk,
) (chan *schemas.BifrostStreamChunk, <-chan struct{}, *schemas.BifrostError) {
firstChunk, ok := <-stream
if !ok {
// Channel closed immediately (empty stream) — return nil so callers
// can distinguish this from a live stream channel.
done := make(chan struct{})
close(done)
return nil, done, nil
}
// Check if first chunk is an error
if firstChunk.BifrostError != nil && firstChunk.BifrostError.Error != nil &&
(firstChunk.BifrostError.Error.Message != "" || firstChunk.BifrostError.Error.Code != nil || firstChunk.BifrostError.Error.Type != nil) {
// Drain source channel to let the provider goroutine exit cleanly
done := make(chan struct{})
go func() {
defer close(done)
for range stream {
}
}()
return nil, done, firstChunk.BifrostError
}
// First chunk is valid data — wrap channel to re-inject it
done := make(chan struct{})
wrapped := make(chan *schemas.BifrostStreamChunk, max(cap(stream), 1))
wrapped <- firstChunk
go func() {
defer close(done)
defer close(wrapped)
for chunk := range stream {
select {
case wrapped <- chunk:
case <-ctx.Done():
// Consumer abandoned the wrapped channel. Drain the source so the
// provider's blocked send unblocks and its goroutine can exit.
for range stream {
}
return
}
}
}()
return wrapped, done, nil
}