253 lines
6.9 KiB
Go
253 lines
6.9 KiB
Go
package utils
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
schemas "github.com/maximhq/bifrost/core/schemas"
|
|
)
|
|
|
|
func TestCheckFirstStreamChunk_ErrorInFirstChunk(t *testing.T) {
|
|
stream := make(chan *schemas.BifrostStreamChunk, 2)
|
|
stream <- &schemas.BifrostStreamChunk{
|
|
BifrostError: &schemas.BifrostError{
|
|
Error: &schemas.ErrorField{
|
|
Code: schemas.Ptr("limit_burst_rate"),
|
|
Message: "Request rate increased too quickly",
|
|
},
|
|
},
|
|
}
|
|
close(stream)
|
|
|
|
_, drainDone, err := CheckFirstStreamChunkForError(context.Background(), stream)
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
<-drainDone
|
|
if err.Error.Message != "Request rate increased too quickly" {
|
|
t.Errorf("unexpected error message: %s", err.Error.Message)
|
|
}
|
|
if err.Error.Code == nil || *err.Error.Code != "limit_burst_rate" {
|
|
t.Errorf("unexpected error code: %v", err.Error.Code)
|
|
}
|
|
}
|
|
|
|
func TestCheckFirstStreamChunk_ValidFirstChunk(t *testing.T) {
|
|
stream := make(chan *schemas.BifrostStreamChunk, 3)
|
|
chunk1 := &schemas.BifrostStreamChunk{
|
|
BifrostChatResponse: &schemas.BifrostChatResponse{
|
|
ID: "chatcmpl-123",
|
|
},
|
|
}
|
|
chunk2 := &schemas.BifrostStreamChunk{
|
|
BifrostChatResponse: &schemas.BifrostChatResponse{
|
|
ID: "chatcmpl-123",
|
|
},
|
|
}
|
|
stream <- chunk1
|
|
stream <- chunk2
|
|
close(stream)
|
|
|
|
wrapped, _, err := CheckFirstStreamChunkForError(context.Background(), stream)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
|
|
// First chunk should be re-injected
|
|
got1 := <-wrapped
|
|
if got1.BifrostChatResponse == nil || got1.BifrostChatResponse.ID != "chatcmpl-123" {
|
|
t.Error("first chunk not re-injected correctly")
|
|
}
|
|
|
|
// Second chunk should follow
|
|
got2 := <-wrapped
|
|
if got2.BifrostChatResponse == nil || got2.BifrostChatResponse.ID != "chatcmpl-123" {
|
|
t.Error("second chunk not forwarded correctly")
|
|
}
|
|
|
|
// Channel should be closed
|
|
_, ok := <-wrapped
|
|
if ok {
|
|
t.Error("expected wrapped channel to be closed")
|
|
}
|
|
}
|
|
|
|
func TestCheckFirstStreamChunk_EmptyStream(t *testing.T) {
|
|
stream := make(chan *schemas.BifrostStreamChunk)
|
|
close(stream)
|
|
|
|
wrapped, drainDone, err := CheckFirstStreamChunkForError(context.Background(), stream)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
|
|
// Empty stream should return nil channel
|
|
if wrapped != nil {
|
|
t.Error("expected nil channel for empty stream")
|
|
}
|
|
|
|
// drainDone should be already closed
|
|
select {
|
|
case <-drainDone:
|
|
default:
|
|
t.Error("expected drainDone to be closed for empty stream")
|
|
}
|
|
}
|
|
|
|
func TestCheckFirstStreamChunk_ErrorInSecondChunk(t *testing.T) {
|
|
stream := make(chan *schemas.BifrostStreamChunk, 3)
|
|
stream <- &schemas.BifrostStreamChunk{
|
|
BifrostChatResponse: &schemas.BifrostChatResponse{
|
|
ID: "chatcmpl-123",
|
|
},
|
|
}
|
|
stream <- &schemas.BifrostStreamChunk{
|
|
BifrostError: &schemas.BifrostError{
|
|
Error: &schemas.ErrorField{
|
|
Message: "some error in second chunk",
|
|
},
|
|
},
|
|
}
|
|
close(stream)
|
|
|
|
// Should NOT return error — only first chunk matters for retry
|
|
wrapped, _, err := CheckFirstStreamChunkForError(context.Background(), stream)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
|
|
// Read all chunks
|
|
got1 := <-wrapped
|
|
if got1.BifrostChatResponse == nil {
|
|
t.Error("first chunk should be valid data")
|
|
}
|
|
got2 := <-wrapped
|
|
if got2.BifrostError == nil {
|
|
t.Error("second chunk should be the error")
|
|
}
|
|
|
|
_, ok := <-wrapped
|
|
if ok {
|
|
t.Error("expected wrapped channel to be closed")
|
|
}
|
|
}
|
|
|
|
func TestCheckFirstStreamChunk_ErrorDrainsSource(t *testing.T) {
|
|
stream := make(chan *schemas.BifrostStreamChunk, 5)
|
|
stream <- &schemas.BifrostStreamChunk{
|
|
BifrostError: &schemas.BifrostError{
|
|
Error: &schemas.ErrorField{
|
|
Message: "rate limit error",
|
|
},
|
|
},
|
|
}
|
|
// Add more chunks that should be drained
|
|
stream <- &schemas.BifrostStreamChunk{
|
|
BifrostChatResponse: &schemas.BifrostChatResponse{ID: "1"},
|
|
}
|
|
stream <- &schemas.BifrostStreamChunk{
|
|
BifrostChatResponse: &schemas.BifrostChatResponse{ID: "2"},
|
|
}
|
|
close(stream)
|
|
|
|
_, drainDone, err := CheckFirstStreamChunkForError(context.Background(), stream)
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
<-drainDone
|
|
if err.Error.Message != "rate limit error" {
|
|
t.Errorf("unexpected error message: %s", err.Error.Message)
|
|
}
|
|
if drainDone == nil {
|
|
t.Fatal("expected drainDone channel, got nil")
|
|
}
|
|
// Wait for drain to complete — verifies the channel signals properly
|
|
<-drainDone
|
|
}
|
|
|
|
func TestCheckFirstStreamChunk_ErrorWithEmptyMessage(t *testing.T) {
|
|
// Error with empty message and no code/type should NOT be treated as an error
|
|
stream := make(chan *schemas.BifrostStreamChunk, 2)
|
|
stream <- &schemas.BifrostStreamChunk{
|
|
BifrostError: &schemas.BifrostError{
|
|
Error: &schemas.ErrorField{
|
|
Message: "",
|
|
},
|
|
},
|
|
}
|
|
close(stream)
|
|
|
|
wrapped, _, err := CheckFirstStreamChunkForError(context.Background(), stream)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error for empty message: %v", err)
|
|
}
|
|
// Should be treated as valid chunk
|
|
<-wrapped
|
|
}
|
|
|
|
func TestCheckFirstStreamChunk_CtxCancelUnblocksWrapper(t *testing.T) {
|
|
// Source with cap=1 so wrapped also has cap=1. wrapped is left full by
|
|
// the re-injected first chunk, which makes the forwarder goroutine block
|
|
// on its next send — the exact leak condition this test guards against.
|
|
src := make(chan *schemas.BifrostStreamChunk, 1)
|
|
src <- &schemas.BifrostStreamChunk{
|
|
BifrostChatResponse: &schemas.BifrostChatResponse{ID: "1"},
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
wrapped, drainDone, err := CheckFirstStreamChunkForError(ctx, src)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if wrapped == nil {
|
|
t.Fatal("expected wrapped channel, got nil")
|
|
}
|
|
|
|
// Push a second chunk; forwarder will read it from src and then block
|
|
// trying to send into the full wrapped channel (we intentionally never
|
|
// read from wrapped).
|
|
src <- &schemas.BifrostStreamChunk{
|
|
BifrostChatResponse: &schemas.BifrostChatResponse{ID: "2"},
|
|
}
|
|
|
|
// Cancel — forwarder must stop trying to send to wrapped and drain src.
|
|
cancel()
|
|
|
|
// Simulate the upstream producer still emitting, then closing. The
|
|
// drain loop should consume these and terminate.
|
|
src <- &schemas.BifrostStreamChunk{
|
|
BifrostChatResponse: &schemas.BifrostChatResponse{ID: "3"},
|
|
}
|
|
close(src)
|
|
|
|
select {
|
|
case <-drainDone:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("drainDone did not close after ctx cancel; forwarder goroutine leaked")
|
|
}
|
|
}
|
|
|
|
func TestCheckFirstStreamChunk_CodeOnlyError(t *testing.T) {
|
|
// Error with code but no message should be treated as an error
|
|
stream := make(chan *schemas.BifrostStreamChunk, 2)
|
|
stream <- &schemas.BifrostStreamChunk{
|
|
BifrostError: &schemas.BifrostError{
|
|
Error: &schemas.ErrorField{
|
|
Code: schemas.Ptr("limit_burst_rate"),
|
|
},
|
|
},
|
|
}
|
|
close(stream)
|
|
|
|
_, drainDone, err := CheckFirstStreamChunkForError(context.Background(), stream)
|
|
if err == nil {
|
|
t.Fatal("expected error for code-only error, got nil")
|
|
}
|
|
<-drainDone
|
|
if err.Error.Code == nil || *err.Error.Code != "limit_burst_rate" {
|
|
t.Errorf("unexpected error code: %v", err.Error.Code)
|
|
}
|
|
}
|