home

Leaking Goroutines

Mar 16, 2015

Admittedly this should be classified in a duhhh category of knowledge, but...

I recently wrote code that used the useful atomic.Value type to support hot reloading a configuration. In doing so, I introduced a goroutine (and memory) leak.

The bug was 100% my fault, and didn't have anything to do with atomic.Value other than the fact that it's the type of issue you'll most likely run into when swapping out values like that. Consider this code:

type Writer struct {
  queue chan []byte
}

func NewWriter() *Writer {
  w := &Writer{
    queue: make(chan []byte, 10),
  }
  go w.process()
  return w
}

func (w *Writer) Write(message []byte) {
  w.queue <- message
}

func (w *Writer) process() {
  for {
    message := <- w.queue
    // do something with message
  }
}

If you create a worker which later falls out of scope, not only will the worker continue to exist, but you'll have lost all references to it:

func main() {
  fmt.Println(runtime.NumGoroutine()) // 4
  test()
  fmt.Println(runtime.NumGoroutine()) // 5
}

func test() {
  NewWriter()
}

The solution is to use a channel, add a Stop method and change the process method to:

type Writer struct {
  queue chan []byte
  stop chan struct{}
}

func (w *Writer) Stop() {
  w.stop <- struct{}{}
}

func (w *Writer) process() {
  for {
    select {
    case message := <-w.queue:
      // do something with message
    case <-w.stop:
      return
    }
  }
}

Of course, the burden is still on the caller to Stop the worker.

update

@dqminh pointed out that instead of introducing the new stop channel, we could simply have called close on the existing queue. For the above, this is definitely a more elegant solution. However, it's possible that the your worker code won't be channel-bound, say:

func (s *Storage) compact() {
  for {
    time.Sleep(time.Minute)
    //do compaction
  }
}

In such cases, you're back to a stop channel in conjunction with the lovely time.After:

func (s *Storage) compact() {
  for {
    select {
    case <-time.After(time.Minute):
      // do compaction
    case <-s.stop:
      return
    }
  }
}