Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions streaming/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,28 @@ func (r *Reader) start() {
})
}

// xreadFn fetches the next batch of events for a reader. It is a package
// variable (rather than a struct field) so tests can simulate read errors
// without polluting Reader; it defaults to (*Reader).xread.
var xreadFn = (*Reader).xread

// read reads events from the streams and sends them to the reader channel.
func (r *Reader) read() {
ctx := context.Background()
defer r.cleanup()
for {
streamsEvents, err := r.xread(ctx)
streamsEvents, err := xreadFn(r, ctx)
if r.isClosing() {
return
}
if err != nil {
if err := handleReadError(err, r.logger); err != nil {
r.logger.Error(fmt.Errorf("fatal error while reading events: %w, stopping", err))
r.Close()
// Close waits on this goroutine via wait.Wait, so calling it
// synchronously here would deadlock and leak the reader and its
// Redis connection. Trigger the shutdown asynchronously and let
// this goroutine return so cleanup can release the wait group.
pulse.Go(r.logger, r.Close)
return
}
continue
Expand Down
27 changes: 27 additions & 0 deletions streaming/reader_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package streaming

import (
"context"
"fmt"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -124,6 +126,31 @@ func TestCleanupReader(t *testing.T) {
assert.Eventually(t, func() bool { return rdb.Exists(ctx, s.key).Val() == 0 }, max, delay)
}

func TestReaderCloseOnFatalReadError(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
rdb := ptesting.NewRedisClient(t)
defer ptesting.CleanupRedis(t, rdb, true, testName)
ctx := ptesting.NewTestContext(t)
s, err := NewStream(testName, rdb, options.WithStreamLogger(pulse.ClueLogger(ctx)))
require.NoError(t, err)
// Simulate a fatal read error (e.g. the underlying stream key being
// destroyed) before the read goroutine starts. The read loop reacts to a
// fatal error by closing the reader; because Close waits on the read
// goroutine, it must run asynchronously or it would deadlock and leak the
// reader and its Redis connection.
defer func(orig func(*Reader, context.Context) ([]redis.XStream, error)) { xreadFn = orig }(xreadFn)
xreadFn = func(*Reader, context.Context) ([]redis.XStream, error) {
return nil, fmt.Errorf("stream key no longer exists")
}

reader, err := s.NewReader(ctx, options.WithReaderBlockDuration(testBlockDuration))
require.NoError(t, err)
reader.Subscribe()

require.Eventually(t, func() bool { return reader.IsClosed() }, max, delay,
"reader did not close after a fatal read error (Close likely deadlocked on its own read goroutine)")
}

func TestAddReaderStream(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
rdb := ptesting.NewRedisClient(t)
Expand Down
Loading