Introduction

 

While developing concurrent date processing pipelines in Go, I was introduced to an edge case that would not allow our processes to shut down. My team realized we had leaking go-routines that were blocked due to channel sends. We adopted a pattern I call "cancelable sends" as a means of fixing the bugs associated with this edge case.

Use Case


Our team was constantly tasked to implement a concurrent process that read IDs off an incoming channel, calls an API or external resource, then publishes the hydrated data on an output channel. The original implementation of our code is below.
 

We typically do not return a second channel from the function, but I added it to the examples instance to make highlighting the edge case easier during testing

Original Implementation

 

type User struct {
	Id string
	UserName string
	JobTitle string
}


type UserFetcher func(context.Context, string) (User, error)


func PopulateUserData_Old(ctx context.Context, userFetcher UserFetcher, userIdC <-chan string) (<-chan User, <-chan struct{}) {
	userC, doneC := make(chan User), make(chan struct{})
	go func() {
		defer func() {
			close(userC)
			close(doneC)
		}()
		for userId := range userIdC {
			if ctx.Err() != nil {
				break
			}
			user, err := userFetcher(ctx, userId)
			if err != nil {
				break
			}
			// this is blocked indefinitely if the downsteam process is not reading data off the userC. It cannot exit due to context cancellation at this point.
			userC <- user
		}
	}()
	return userC, doneC
}

 

In the above implementation, the function creates an output `User` channel and fires off a go-routine that reads off the `userIdC` channel and hydrates the users. The go-routine that reads from the `userIdC` reads a message off the channel, then immediately checks if the context has been cancelled. If the context has not been cancelled, we fetch the user data and write it to the output channel. The issue with this code is if the down steam process has stopped that was processing our `userC` we are able to exit the routine because we will be blocked on the channel send regardless if the context is cancelled.

New Implementation
 
func PopulateUserData_New(ctx context.Context, userFetcher UserFetcher, userIdC <-chan string) (<-chan User, <-chan struct{}) {
	userC, doneC := make(chan User), make(chan struct{})
	go func() {
		defer func() {
			close(userC)
			close(doneC)
		}()
		for userId := range userIdC {
			user, err := userFetcher(ctx, userId)
			if err != nil {
				break
			}
			select {
			case userC <- user:
			case <-ctx.Done():
				return
			}
		}
	}()
	return userC, doneC
}

 

In the above implementation, we fix the edge case bug we created in the first implementation. We can now exit the go-routine by cancelling the context by using a select statement for the channel send. The select statement will now be blocked until we are able to send a `User` on the `userC` or the context has been cancelled. In this new implementation we are now abiding by golang rule of "never starting a go-routine you cannot cancel". This new practice of "cancelable sends" has helped reduce bug edge cases in our concurrent data processing pipelines.


Tests 

 

func TestPopulateUserData_Old(t *testing.T) {
	t.Run("blocks on send to output channel when context has been cancelled", func(t *testing.T) {
		inputC := make(chan string, 2)
		inputC <- "id1"

		ctx, cancel := context.WithCancel(context.Background())
		fakeFetcher := func(context.Context, string) (User, error) {
			cancel()
			return User{}, nil
		}
		_, doneC := PopulateUserData_Old(ctx, fakeFetcher, inputC)

		if operationTimeout(func() {
			<-doneC

		}, time.Second) {
			t.Error("the operation timed out")
		}
	})
}

func TestPopulateUserData_New(t *testing.T) {
	t.Run("does not block on send to output channel when context has been cancelled", func(t *testing.T) {
		inputC := make(chan string, 2)
		inputC <- "id1"

		ctx, cancel := context.WithCancel(context.Background())
		fakeFetcher := func(context.Context, string) (User, error) {
			cancel()
			return User{}, nil
		}
		_, doneC := PopulateUserData_New(ctx, fakeFetcher, inputC)

		if operationTimeout(func() {
			<-doneC
			t.Log("operation did not time out")
		}, time.Second) {
			t.Error("the operation timed out")
		}
	})
}

func operationTimeout(f func(), duration time.Duration) bool {
	doneC := make(chan struct{})
	timeoutC := time.After(duration)
	go func() {
		f()
		doneC <- struct{}{}
	}()
	select {
	case <-doneC:
	case <-timeoutC:
		return true
	}
	return false
}