banner
biuaxia

biuaxia

"万物皆有裂痕,那是光进来的地方。"
github
bilibili
tg_channel

[Reprint] Basics of Concurrency in Go Language

title: 【Reprint】Basics of Go Language Concurrency
date: 2021-08-09 16:39:33
comment: false
toc: true
category:

  • Golang
    tags:
  • Reprint
  • Go
  • Basics
  • Concurrency

This article is reprinted from: Basics of Go Language Concurrency | Li Wenzhou's Blog


Concurrency is a very important concept in programming, and Go language natively supports concurrency at the language level, which is also a significant reason for its popularity.

Concurrency Programming in Go#

Concurrency vs. Parallelism#

Concurrency: Executing multiple tasks during the same time period (like chatting on WeChat with two girlfriends).

Parallelism: Executing multiple tasks at the same moment (like you and your friend both chatting on WeChat with your girlfriends).

Go language achieves concurrency through goroutine. A goroutine is similar to a thread and belongs to user-space threads. We can create thousands of goroutines to work concurrently as needed. goroutines are scheduled by Go's runtime, while threads are scheduled by the operating system.

Go also provides channels for communication between multiple goroutines. goroutines and channels are important foundational implementations of the CSP (Communicating Sequential Processes) concurrency model that Go language adheres to.

goroutine#

In Java/C++, when we want to implement concurrent programming, we usually need to maintain a thread pool ourselves, wrap one task after another, and schedule threads to execute tasks while managing context switching. This often consumes a lot of mental effort from programmers. So, is there a mechanism where programmers only need to define many tasks and let the system help us allocate these tasks to the CPU for concurrent execution?

The goroutine in Go language is such a mechanism. The concept of goroutine is similar to threads, but goroutines are scheduled and managed by Go's runtime. Go programs intelligently allocate tasks in goroutines reasonably to each CPU. The reason Go is called a modern programming language is that it has built-in scheduling and context-switching mechanisms at the language level.

In Go programming, you do not need to write processes, threads, or coroutines yourself; your skill set only includes one skill – goroutine. When you need a task to execute concurrently, you just need to wrap that task into a function and start a goroutine to execute that function. It's that simple and straightforward.

Using goroutine#

Using goroutine in Go is very simple; you just need to add the go keyword before the function call to create a goroutine for a function.

A goroutine must correspond to a function, and you can create multiple goroutines to execute the same function.

Starting a single goroutine#

Starting a goroutine is very simple; you just need to add the go keyword before the function (ordinary function or anonymous function) you are calling.

For example:

func hello() {
	fmt.Println("Hello Goroutine!")
}
func main() {
	hello()
	fmt.Println("main goroutine done!")
}

In this example, the hello function and the following statement are executed serially, resulting in printing Hello Goroutine! followed by main goroutine done!.

Next, we add the go keyword before the hello function call, which starts a goroutine to execute the hello function.

func main() {
	go hello() // Start another goroutine to execute the hello function
	fmt.Println("main goroutine done!")
}

This time, the execution result only prints main goroutine done!, without printing Hello Goroutine!. Why is that?

When the program starts, Go creates a default goroutine for the main() function.

When the main() function returns, that goroutine ends, and all goroutines started in the main() function will end together. The goroutine where the main function resides is like the Night King in Game of Thrones; when the Night King dies, all the wights he turned will also perish.

So we need to find a way to make the main function wait for the hello function. The simplest and most straightforward way is time.Sleep.

func main() {
	go hello() // Start another goroutine to execute the hello function
	fmt.Println("main goroutine done!")
	time.Sleep(time.Second)
}

When you execute the above code, you will find that this time it first prints main goroutine done!, followed by Hello Goroutine!.

The reason main goroutine done! is printed first is that it takes some time to create the new goroutine, and during this time, the goroutine where the main function resides continues to execute.

Starting multiple goroutines#

Implementing concurrency in Go is that simple; we can also start multiple goroutines. Let's do another example: (Here we use sync.WaitGroup to synchronize goroutines)

var wg sync.WaitGroup

func hello(i int) {
	defer wg.Done() // Register -1 when the goroutine ends
	fmt.Println("Hello Goroutine!", i)
}
func main() {

	for i := 0; i < 10; i++ {
		wg.Add(1) // Register +1 when starting a goroutine
		go hello(i)
	}
	wg.Wait() // Wait for all registered goroutines to finish
}

If you execute the above code multiple times, you will find that the order of the printed numbers is inconsistent each time. This is because the 10 goroutines are executed concurrently, and the scheduling of goroutines is random.

goroutine vs. thread#

Growable Stack#

OS threads (operating system threads) generally have fixed stack memory (usually 2MB), while a goroutine starts with a very small stack (typically 2KB). The stack of a goroutine is not fixed; it can grow and shrink as needed, with a size limit of up to 1GB, although such a large size is rarely used. Therefore, in Go, it is possible to create around 100,000 goroutines at once.

goroutine Scheduling#

GPM is an implementation at the runtime level of Go language, a scheduling system implemented by Go itself, distinct from the operating system scheduling OS threads.

  • G is easy to understand; it represents a goroutine, which stores information about the goroutine and other binding information with the P.
  • P manages a queue of goroutines, storing the context environment (function pointers, stack addresses, and address boundaries) of the currently running goroutine. P schedules the goroutine queue it manages (for example, pausing goroutines that occupy CPU time for too long and running subsequent goroutines, etc.). When its queue is exhausted, it fetches from the global queue; if the global queue is also exhausted, it will rob tasks from other P's queues.
  • M (machine) is a virtual representation of the operating system kernel thread in Go runtime. M generally has a one-to-one mapping with kernel threads, and a goroutine ultimately needs to be executed on an M.

P and M are generally also one-to-one. Their relationship is: P manages a group of G mounted on M for execution. When a G is long-blocked on an M, the runtime will create a new M, and the blocking G's P will mount other Gs on the newly created M. When the old G unblocks or is considered dead, the old M is reclaimed.

The number of P is set by runtime.GOMAXPROCS (maximum 256), and after Go version 1.5, it defaults to the number of physical threads. When concurrency is high, some P and M will be added, but not too many; if switching is too frequent, it becomes counterproductive.

From the perspective of thread scheduling, Go language has an advantage over other languages because OS threads are scheduled by the OS kernel, while goroutines are scheduled by Go's own runtime scheduler, which uses a technique called m scheduling (multiplexing/scheduling m goroutines to n OS threads). One major feature is that goroutine scheduling is done in user space, without involving frequent switches between kernel and user space. Memory allocation and deallocation are also maintained in a large memory pool in user space, without directly calling the system's malloc function (unless the memory pool needs to change), which is much cheaper than scheduling OS threads. On the other hand, it fully utilizes multi-core hardware resources, approximately evenly distributing several goroutines across physical threads, and the lightweight nature of goroutines ensures performance in scheduling.

Click here to learn more

GOMAXPROCS#

The Go runtime scheduler uses the GOMAXPROCS parameter to determine how many OS threads to use to execute Go code simultaneously. The default value is the number of CPU cores on the machine. For example, on an 8-core machine, the scheduler will schedule Go code to run on 8 OS threads simultaneously (GOMAXPROCS is n in m scheduling).

In Go, you can set the number of CPU logical cores occupied during concurrent execution of the current program using the runtime.GOMAXPROCS() function.

Before Go version 1.5, the default was single-core execution. After Go version 1.5, it defaults to using all CPU logical cores.

We can achieve parallelism by distributing tasks across different CPU logical cores. Here’s an example:

func a() {
	for i := 1; i < 10; i++ {
		fmt.Println("A:", i)
	}
}

func b() {
	for i := 1; i < 10; i++ {
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(1)
	go a()
	go b()
	time.Sleep(time.Second)
}

With only one logical core, the two tasks will complete one after the other. Setting the logical core count to 2 allows both tasks to execute in parallel, as shown below.

func a() {
	for i := 1; i < 10; i++ {
		fmt.Println("A:", i)
	}
}

func b() {
	for i := 1; i < 10; i++ {
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(2)
	go a()
	go b()
	time.Sleep(time.Second)
}

The relationship between operating system threads and goroutines in Go language:

  1. One operating system thread corresponds to multiple user-space goroutines.
  2. Go programs can use multiple operating system threads simultaneously.
  3. The relationship between goroutines and OS threads is many-to-many, i.e., m.

channel#

Simply executing functions concurrently is meaningless. Functions need to exchange data to reflect the significance of executing functions concurrently.

While shared memory can be used for data exchange, it is prone to race conditions in different goroutines. To ensure the correctness of data exchange, mutexes must be used to lock the memory, which inevitably causes performance issues.

The concurrency model of Go language is CSP (Communicating Sequential Processes), which advocates sharing memory through communication rather than communicating through shared memory.

If goroutines are the execution bodies of concurrent Go programs, channels are the connections between them. A channel is a communication mechanism that allows one goroutine to send specific values to another goroutine.

Channels in Go are a special type. A channel acts like a conveyor belt or queue, always following the First In First Out (FIFO) rule, ensuring the order of data transmission. Each channel is a conduit of a specific type, meaning that when declaring a channel, you need to specify the element type.

Channel Types#

channel is a type, a reference type. The format for declaring a channel type is as follows:

var variable chan element_type

Here are a few examples:

var ch1 chan int   // Declare a channel that transmits integers
var ch2 chan bool  // Declare a channel that transmits booleans
var ch3 chan []int // Declare a channel that transmits slices of integers

Creating a Channel#

A channel is a reference type, and the zero value of a channel type is nil.

var ch chan int
fmt.Println(ch) // <nil>

A declared channel needs to be initialized using the make function before it can be used.

The format for creating a channel is as follows:

make(chan element_type, [buffer_size])

The buffer size of a channel is optional.

Here are a few examples:

ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)

Channel Operations#

Channels have three operations: send, receive, and close.

Sending and receiving use the <- symbol.

Now let's define a channel with the following statement:

ch := make(chan int)

Sending#

Send a value into the channel.

ch <- 10 // Send 10 into ch

Receiving#

Receive a value from a channel.

x := <- ch // Receive a value from ch and assign it to variable x
<-ch       // Receive a value from ch, ignoring the result

Closing#

We close a channel by calling the built-in close function.

close(ch)

Regarding closing a channel, it should only be closed when notifying the receiving goroutine that all data has been sent. Channels can be garbage collected; closing a channel is not mandatory like closing a file after operations.

A closed channel has the following characteristics:

  1. Sending a value to a closed channel will cause a panic.
  2. Receiving from a closed channel will continue to retrieve values until the channel is empty.
  3. Receiving from a closed channel that has no values will yield the corresponding type's zero value.
  4. Closing an already closed channel will cause a panic.

Unbuffered Channels#

Unbuffered channels are also known as blocking channels. Let's look at the following code:

func main() {
	ch := make(chan int)
	ch <- 10
	fmt.Println("Send successful")
}

The above code can compile, but it will produce the following error during execution:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        .../src/github.com/Q1mi/studygo/day06/channel02/main.go:8 +0x54

Why does the deadlock error occur?

Because we created an unbuffered channel with ch := make(chan int), an unbuffered channel can only send values when there is someone to receive them. It's like living in a community without a delivery locker or collection point; the delivery person must deliver the item directly to you. In simple terms, an unbuffered channel must have a receiver to send.

The above code will block at ch <- 10, causing a deadlock. How can we solve this problem?

One way is to enable a goroutine to receive the value, for example:

func recv(c chan int) {
	ret := <-c
	fmt.Println("Receive successful", ret)
}
func main() {
	ch := make(chan int)
	go recv(ch) // Start a goroutine to receive values from the channel
	ch <- 10
	fmt.Println("Send successful")
}

Sending operations on an unbuffered channel will block until another goroutine performs a receive operation on that channel, at which point the value can be successfully sent, and both goroutines will continue executing. Conversely, if the receive operation executes first, the receiving goroutine will block until another goroutine sends a value on that channel.

Using unbuffered channels for communication synchronizes the sending and receiving goroutines. Therefore, unbuffered channels are also called synchronous channels.

Buffered Channels#

Another way to solve the above problem is to use buffered channels. We can specify the capacity of the channel when initializing it with the make function, for example:

func main() {
	ch := make(chan int, 1) // Create a buffered channel with a capacity of 1
	ch <- 10
	fmt.Println("Send successful")
}

As long as the channel's capacity is greater than zero, it is a buffered channel. The capacity of the channel indicates how many elements it can hold. It's like a delivery locker in your community; if the locker is full, it cannot accept more deliveries until someone picks up an item.

We can use the built-in len function to get the number of elements in the channel and the cap function to get the channel's capacity, although we rarely do this.

Looping through a Channel with for range#

When we finish sending data to a channel, we can close the channel using the close function.

When a channel is closed, sending values to that channel will cause a panic, and operations to receive values from that channel will first retrieve all values in the channel, followed by receiving the corresponding type's zero value. How can we determine if a channel has been closed?

Let's look at the following example:

// Channel exercise
func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	// Start a goroutine to send numbers 0 to 100 into ch1
	go func() {
		for i := 0; i < 100; i++ {
			ch1 <- i
		}
		close(ch1)
	}()
	// Start a goroutine to receive values from ch1 and send their squares to ch2
	go func() {
		for {
			i, ok := <-ch1 // ok=false when the channel is closed
			if !ok {
				break
			}
			ch2 <- i * i
		}
		close(ch2)
	}()
	// In the main goroutine, receive values from ch2 and print them
	for i := range ch2 { // The for range loop will exit when the channel is closed
		fmt.Println(i)
	}
}

From the above example, we see two ways to check if a channel has been closed when receiving values, but we usually use the for range method. Using for range to iterate over a channel will exit the loop when the channel is closed.

Unidirectional Channels#

Sometimes we pass channels as parameters between multiple task functions, and often we want to restrict the channel's usage in different task functions, such as allowing the channel to only send or only receive.

Go language provides unidirectional channels to handle this situation. For example, we can modify the previous example as follows:

func counter(out chan<- int) {
	for i := 0; i < 100; i++ {
		out <- i
	}
	close(out)
}

func squarer(out chan<- int, in <-chan int) {
	for i := range in {
		out <- i * i
	}
	close(out)
}
func printer(in <-chan int) {
	for i := range in {
		fmt.Println(i)
	}
}

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	go counter(ch1)
	go squarer(ch2, ch1)
	printer(ch2)
}

In this case,

  • chan<- int is a write-only unidirectional channel (can only send int type values to it), allowing send operations but not receive operations;
  • <-chan int is a read-only unidirectional channel (can only read int type values from it), allowing receive operations but not send operations.

In function parameters and any assignment operations, a bidirectional channel can be converted to a unidirectional channel, but the reverse is not allowed.

Channel Summary#

Common exceptions with channel are summarized as follows: channel exception summary

Closing an already closed channel will also trigger a panic.

Worker Pool (Goroutine Pool)#

In practice, we often use a worker pool pattern that allows us to specify the number of goroutines to start, controlling the number of goroutines to prevent leaks and surges.

Here is a simple example of a work pool:

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Printf("worker:%d start job:%d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("worker:%d end job:%d\n", id, j)
		results <- j * 2
	}
}

func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	// Start 3 goroutines
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}
	// 5 tasks
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)
	// Output results
	for a := 1; a <= 5; a++ {
		<-results
	}
}

Select Multiplexing#

In some scenarios, we need to receive data from multiple channels simultaneously. When a channel is receiving data, if there is no data to receive, it will block. You might write code like this using a loop:

for{
    // Try to receive a value from ch1
    data, ok := <-ch1
    // Try to receive a value from ch2
    data, ok := <-ch2

}

While this method can achieve the goal of receiving values from multiple channels, it performs poorly. To address this scenario, Go has the built-in select keyword, which can respond to multiple channel operations simultaneously.

The usage of select is similar to a switch statement; it has a series of case branches and a default branch. Each case corresponds to a communication operation (receive or send) on a channel. select will wait until a communication operation of a certain case completes, at which point it will execute the statement corresponding to that case. The specific format is as follows:

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        default operation
}

Here’s a small example to demonstrate the use of select:

func main() {
	ch := make(chan int, 1)
	for i := 0; i < 10; i++ {
		select {
		case x := <-ch:
			fmt.Println(x)
		case ch <- i:
		}
	}
}

Using the select statement improves code readability.

  • It can handle one or more channel send/receive operations.
  • If multiple case statements are satisfied, select will randomly choose one.
  • A select{} with no case will wait indefinitely, which can be used to block the main function.

Concurrency Safety and Locks#

Sometimes in Go code, multiple goroutines may operate on a single resource (critical section) simultaneously, which can lead to race conditions (data races). This is analogous to cars competing at an intersection or people competing for a restroom on a train.

For example:

var x int64
var wg sync.WaitGroup

func add() {
	for i := 0; i < 5000; i++ {
		x = x + 1
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

In the above code, we start two goroutines to increment the value of variable x. When these two goroutines access and modify the x variable, there will be data competition, leading to a final result that does not match expectations.

Mutex#

A mutex is a common method for controlling access to shared resources, ensuring that only one goroutine can access the shared resource at a time. In Go language, the sync package's Mutex type is used to implement mutexes. Here’s how to fix the above code using a mutex:

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
	for i := 0; i < 5000; i++ {
		lock.Lock() // Lock
		x = x + 1
		lock.Unlock() // Unlock
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

Using a mutex ensures that only one goroutine can enter the critical section at a time; other goroutines will wait for the lock. When the mutex is released, waiting goroutines can acquire the lock and enter the critical section. When multiple goroutines wait for a lock, the waking strategy is random.

Read/Write Mutex#

Mutexes are completely exclusive, but many practical scenarios involve more reads than writes. When we concurrently read a resource without modifying it, there is no need to lock it. In such cases, using a read/write lock is a better choice. In Go language, the sync package provides the RWMutex type for read/write locks.

Read/write locks have two types: read locks and write locks. When a goroutine acquires a read lock, other goroutines can also acquire read locks, but if a goroutine tries to acquire a write lock, it will wait. When a goroutine acquires a write lock, other goroutines will wait regardless of whether they are trying to acquire read or write locks.

Here’s an example of a read/write lock:

var (
	x      int64
	wg     sync.WaitGroup
	lock   sync.Mutex
	rwlock sync.RWMutex
)

func write() {
	// lock.Lock()   // Use mutex lock
	rwlock.Lock() // Use write lock
	x = x + 1
	time.Sleep(10 * time.Millisecond) // Assume read operation takes 10 milliseconds
	rwlock.Unlock()                   // Unlock write lock
	// lock.Unlock()                     // Unlock mutex lock
	wg.Done()
}

func read() {
	// lock.Lock()                  // Use mutex lock
	rwlock.RLock()               // Use read lock
	time.Sleep(time.Millisecond) // Assume read operation takes 1 millisecond
	rwlock.RUnlock()             // Unlock read lock
	// lock.Unlock()                // Unlock mutex lock
	wg.Done()
}

func main() {
	start := time.Now()
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}

	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go read()
	}

	wg.Wait()
	end := time.Now()
	fmt.Println(end.Sub(start))
}

It is important to note that read/write locks are very suitable for scenarios with many reads and few writes. If the read and write operations are roughly equal, the advantages of read/write locks will not be fully realized.

sync.WaitGroup#

Using time.Sleep in code is certainly inappropriate. In Go language, we can use sync.WaitGroup to synchronize concurrent tasks. sync.WaitGroup has the following methods:

Method NameFunction
(wg * WaitGroup) Add(delta int)Counter + delta
(wg *WaitGroup) Done()Counter -1
(wg *WaitGroup) Wait()Block until counter becomes 0

sync.WaitGroup maintains a counter internally, which can be increased and decreased. For example, when we start N concurrent tasks, we increase the counter by N. Each task, upon completion, calls the Done() method to decrease the counter by 1. By calling Wait(), we wait for the concurrent tasks to finish; when the counter value is 0, it indicates that all concurrent tasks have completed.

We can optimize the above code using sync.WaitGroup:

var wg sync.WaitGroup

func hello() {
	defer wg.Done()
	fmt.Println("Hello Goroutine!")
}
func main() {
	wg.Add(1)
	go hello() // Start another goroutine to execute the hello function
	fmt.Println("main goroutine done!")
	wg.Wait()
}

It is important to note that sync.WaitGroup is a struct, and it should be passed by pointer.

sync.Once#

Preliminary remarks: This is an advanced knowledge point.

In many programming scenarios, we need to ensure that certain operations are executed only once in high-concurrency situations, such as loading a configuration file only once or closing a channel only once.

The Go language's sync package provides a solution for the "execute only once" scenario – sync.Once.

sync.Once has only one method, Do, with the following signature:

func (o *Once) Do(f func()) {}

Note: If the function f to be executed requires parameters, it needs to be used in conjunction with a closure.

Example of Loading a Configuration File#

Delaying a costly initialization operation until it is actually needed is a good practice. Pre-initializing a variable (for example, in the init function) increases the program's startup time, and if the variable is not used during execution, that initialization operation is unnecessary. Let's look at an example:

var icons map[string]image.Image

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon is not concurrency safe when called by multiple goroutines
func Icon(name string) image.Image {
	if icons == nil {
		loadIcons()
	}
	return icons[name]
}

When multiple goroutines concurrently call the Icon function, it is not concurrency safe. Modern compilers and CPUs may freely reorder memory access as long as each goroutine satisfies serial consistency. The loadIcons function may be reordered as follows:

func loadIcons() {
	icons = make(map[string]image.Image)
	icons["left"] = loadIcon("left.png")
	icons["up"] = loadIcon("up.png")
	icons["right"] = loadIcon("right.png")
	icons["down"] = loadIcon("down.png")
}

In this case, even if we check that icons is not nil, it does not mean that the variable initialization is complete. Considering this situation, we might think of adding a mutex to ensure that the initialization of icons is not interrupted by other goroutines, but this would introduce performance issues.

Here’s the modified example code using sync.Once:

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon is concurrency safe
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}

Concurrency Safe Singleton Pattern#

Here’s a concurrency-safe singleton pattern implemented using sync.Once:

package singleton

import (
    "sync"
)

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}

sync.Once internally contains a mutex and a boolean value; the mutex ensures the safety of the boolean value and data, while the boolean value records whether the initialization is complete. This design guarantees that the initialization operation is concurrency safe and will not be executed multiple times.

sync.Map#

The built-in map in Go language is not concurrency safe. Consider the following example:

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			set(key, n)
			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
}

When a few goroutines are started, the above code may work fine, but when the concurrency increases, it will throw a fatal error: concurrent map writes error.

In such scenarios, we need to lock the map to ensure concurrency safety. The Go language's sync package provides a ready-to-use concurrency-safe version of a map – sync.Map. "Ready-to-use" means that it can be used directly without needing to initialize it with the make function like the built-in map. Additionally, sync.Map has built-in methods such as Store, Load, LoadOrStore, Delete, and Range.

var m = sync.Map{}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			m.Store(key, n)
			value, _ := m.Load(key)
			fmt.Printf("k=:%v,v:=%v\n", key, value)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

Atomic Operations#

In the above code, we used lock operations to achieve synchronization. The locking mechanism is fundamentally based on atomic operations, which are generally implemented directly through CPU instructions. The Go language provides atomic operations through the built-in standard library sync/atomic.

atomic Package#

MethodExplanation
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
Read Operation
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
Write Operation
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
Modify Operation
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
Swap Operation
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
Compare and Swap Operation

Example#

Let's provide an example to compare the performance of mutexes and atomic operations.

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Counter interface {
	Inc()
	Load() int64
}

// Common version
type CommonCounter struct {
	counter int64
}

func (c CommonCounter) Inc() {
	c.counter++
}

func (c CommonCounter) Load() int64 {
	return c.counter
}

// Mutex version
type MutexCounter struct {
	counter int64
	lock    sync.Mutex
}

func (m *MutexCounter) Inc() {
	m.lock.Lock()
	defer m.lock.Unlock()
	m.counter++
}

func (m *MutexCounter) Load() int64 {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.counter
}

// Atomic operation version
type AtomicCounter struct {
	counter int64
}

func (a *AtomicCounter) Inc() {
	atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
	return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
	var wg sync.WaitGroup
	start := time.Now()
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			c.Inc()
			wg.Done()
		}()
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(c.Load(), end.Sub(start))
}

func main() {
	c1 := CommonCounter{} // Not concurrency safe
	test(c1)
	c2 := MutexCounter{} // Concurrency safe using mutex
	test(&c2)
	c3 := AtomicCounter{} // Concurrency safe and more efficient than mutex
	test(&c3)
}

The atomic package provides low-level atomic memory operations, which are useful for implementing synchronization algorithms. These functions must be used carefully to ensure correctness. Unless for certain special low-level applications, using channels or functions/types from the sync package for synchronization is generally better.

Exercises#

  1. Use goroutine and channel to implement a program that calculates the sum of the digits of a random int64 number.
    1. Start a goroutine to loop generating random int64 numbers and send them to jobChan.
    2. Start 24 goroutines to take random numbers from jobChan, calculate the sum of the digits, and send the results to resultChan.
    3. The main goroutine takes results from resultChan and prints them to the terminal.
  2. To ensure the performance of business code, rewrite the previously written logging library to use asynchronous logging.
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.