Skip to content

Some Go idioms...

Asynchronous messages

Go provides powerful builtin mechanims for asynchronous messages. By using channels we are affectively avoiding locks by messages passing and, at the same time, we are improving the chances the compiler can parallelize our code by serializing the logic in the program.

Let's look at the following example. The Counter class is a simple counter that is initialized to 0 and where we can Add() an arbitrary value. Instead of protecting the count attribute with a lock, we centralize all the operations in a single actor loop (lines 15-20). After reading a value that must be added from the countChan channel, all the counter modifications will be done in this loop, so no locks are needed.

go
package main

import "fmt"

type Counter struct {
     count     int
     countChan chan int
}

func NewCounter() *Counter {
     c := Counter{
         countChan: make(chan int),
     }

     go func() {
         for v := range c.countChan {
             c.count += v
             fmt.Printf("New value: %d\n", c.count)
         }
     }()

     return &c
}

func (c *Counter) Close() {
     close(c.countChan)
}

func (c *Counter) Add(v int) {
     c.countChan
}

Sending closures

The problem in this solution is that we centralize all the Counter logic in the actor loop. Including more operations in the Counter (eg, Sub()) would mean a bigger loop, or more private methods for the implementations.

So, instead of running some specific actions in the actor loop, it would be better if we could define how to add inside the Add() method. We can achieve this by taking advantage of Golang functions being first-class objects and sending closures to the loop. The loop will be then responsible for just running these closures once they have been serialized by the channel.

go
package main

import "fmt"

type Counter struct {
     count       int
     actionsChan chan func()
}

func NewCounter() *Counter {
     c := Counter{
         actionsChan: make(chan func()),
     }

     go func() {
         for f := range c.actionsChan {
             f()
         }
     }()
     return &c
}

func (c *Counter) Close() {
     close(c.actionsChan)
}

func (c *Counter) Add(v int) {
     c.actionsChan <- func() {
         c.count += v
         fmt.Printf("New value: %d\n", c.count)
     }
}

func main() {
     c := NewCounter()
     defer c.Close()
     c.Add(1)
     c.Add(1)
     c.Add(1)
}

Synchronous messages

However, there is a problem with this code. We are filling the channel with messages but you don't really know whether they will be all processed before the main function exits. This is not so obvious when we send three messages to the channel, but it would become obvious when inserting one million messages.

So, in order to be sure that all messages are processed, we must wait until all the messages have been received and the channel is empty. A simple way to achieve this is by sending a last synchronous message and to wait for it.

How do we do that? Very simple: the closure will include a channel, and we can wait for that channel to be closed. We could have a Wait() method like this:

go
func (c *Counter) Wait() {
         ch := make(chan struct{})
         c.actionsChan <- func() { close(ch) }
         <-ch
}

A more general solution would include channels in all the messages we send to the actor’s loop. This makes use of channels being first-class values as well, an important feature for implementing synchronization in Go.

go
package main

import "fmt"

type counterItem struct {
     f         func()
     processed chan struct{}
}

func newCounterItem(f func()) counterItem {
     return counterItem{
         f:         f,
         processed: make(chan struct{}),
     }
}

//////////////////////////////////////////

type Counter struct {
     count       int
     actionsChan chan counterItem
}

func NewCounter() *Counter {
     c := Counter{
         actionsChan: make(chan counterItem),
     }

     go func() {
         for item := range c.actionsChan {
             item.f()
             close(item.processed)
         }
     }()
     return &c
}

func (c *Counter) Close() {
     close(c.actionsChan)
}

func (c *Counter) Add(v int) {
     c.actionsChan <- newCounterItem(func() {
         c.count += v
         fmt.Printf("New value: %d\n", c.count)
     })
}

func (c *Counter) Wait() {
     ci := newCounterItem(func() {})
     c.actionsChan <- ci
     <-ci.processed
}

func main() {
     c := NewCounter()
     defer c.Close()
     c.Add(1)
     c.Add(1)
     c.Add(1)
     c.Wait()
}

In this code, the actor accepts a counterItems, a struct that contains a closure and a channel (the processed attribute) that will be used for signaling that the closure has been executed. The Add() method will work as it did before, sending a closure to the channel and then continuing, but the new Ẁait() method with send an empty closure and wait on the processed channel, until is closed by the actor's loop.

The inclusion of a channel in the channel items is not only useful for waiting for an event, but also for returning values from the actor to the caller. The actor could send some value before closing the channel (line 32), so the caller could receive the result of the asynchronous processing and continue...