Go Concurrency Patterns
Introduction
Go's concurrency model is one of its most powerful features, built around goroutines and channels that make concurrent programming more intuitive and safer than traditional thread-based approaches. This comprehensive guide explores advanced concurrency patterns that enable you to build scalable, efficient, and maintainable concurrent applications.
Fundamental Concepts
Goroutines vs Threads
Goroutines are lightweight, managed by the Go runtime, and much more efficient than OS threads:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
fmt.Printf("Number of CPUs: %d\n", runtime.NumCPU())
fmt.Printf("Number of Goroutines: %d\n", runtime.NumGoroutine())
var wg sync.WaitGroup
// Launch 10000 goroutines
for i := 0; i < 10000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(time.Second)
if id%1000 == 0 {
fmt.Printf("Goroutine %d completed\n", id)
}
}(i)
}
wg.Wait()
fmt.Printf("Final Goroutines: %d\n", runtime.NumGoroutine())
}
Channel Fundamentals
Channels are the pipes that connect concurrent goroutines:
package main
import (
"fmt"
"time"
)
func main() {
// Unbuffered channel
messages := make(chan string)
// Buffered channel
buffered := make(chan int, 3)
// Send and receive
go func() {
messages <- "Hello"
messages <- "World"
close(messages)
}()
// Range over channel
for msg := range messages {
fmt.Println(msg)
}
// Buffered channel usage
buffered <- 1
buffered <- 2
buffered <- 3
fmt.Println(<-buffered)
fmt.Println(<-buffered)
fmt.Println(<-buffered)
}
Core Concurrency Patterns
Worker Pool Pattern
The worker pool pattern distributes work across a fixed number of goroutines:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
ID int
Data string
Result chan string
}
type WorkerPool struct {
workers int
jobQueue chan Job
wg sync.WaitGroup
quit chan struct{}
}
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
return &WorkerPool{
workers: workers,
jobQueue: make(chan Job, queueSize),
quit: make(chan struct{}),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for {
select {
case job := <-wp.jobQueue:
// Simulate work
processingTime := time.Duration(rand.Intn(100)) * time.Millisecond
time.Sleep(processingTime)
result := fmt.Sprintf("Worker %d processed job %d: %s", id, job.ID, job.Data)
job.Result <- result
case <-wp.quit:
fmt.Printf("Worker %d stopping\n", id)
return
}
}
}
func (wp *WorkerPool) Submit(job Job) {
wp.jobQueue <- job
}
func (wp *WorkerPool) Stop() {
close(wp.quit)
wp.wg.Wait()
close(wp.jobQueue)
}
func main() {
pool := NewWorkerPool(5, 100)
pool.Start()
// Submit jobs
for i := 0; i < 20; i++ {
result := make(chan string, 1)
job := Job{
ID: i,
Data: fmt.Sprintf("task-%d", i),
Result: result,
}
pool.Submit(job)
go func(jobID int) {
select {
case res := <-result:
fmt.Printf("Job %d result: %s\n", jobID, res)
case <-time.After(time.Second):
fmt.Printf("Job %d timed out\n", jobID)
}
}(i)
}
time.Sleep(2 * time.Second)
pool.Stop()
}
Pipeline Pattern
Pipelines break down processing into stages connected by channels:
package main
import (
"fmt"
"strings"
"sync"
)
// Stage 1: Generate data
func generator(data []string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, item := range data {
out <- item
}
}()
return out
}
// Stage 2: Transform data
func transformer(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for item := range in {
// Transform: convert to uppercase and add prefix
transformed := fmt.Sprintf("PROCESSED: %s", strings.ToUpper(item))
out <- transformed
}
}()
return out
}
// Stage 3: Filter data
func filter(in <-chan string, pattern string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for item := range in {
if strings.Contains(item, pattern) {
out <- item
}
}
}()
return out
}
// Fan-out pattern: distribute work to multiple goroutines
func fanOut(in <-chan string, workers int) []<-chan string {
outputs := make([]<-chan string, workers)
for i := 0; i < workers; i++ {
output := make(chan string)
outputs[i] = output
go func(out chan<- string) {
defer close(out)
for item := range in {
// Simulate processing
processed := fmt.Sprintf("Worker processed: %s", item)
out <- processed
}
}(output)
}
return outputs
}
// Fan-in pattern: merge multiple channels into one
func fanIn(inputs ...<-chan string) <-chan string {
out := make(chan string)
var wg sync.WaitGroup
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan string) {
defer wg.Done()
for item := range ch {
out <- item
}
}(input)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
data := []string{
"hello world",
"golang programming",
"concurrency patterns",
"channel communication",
"goroutine management",
}
// Build pipeline
stage1 := generator(data)
stage2 := transformer(stage1)
stage3 := filter(stage2, "GOLANG")
// Fan-out to multiple workers
workers := fanOut(stage3, 3)
// Fan-in results
results := fanIn(workers...)
// Collect results
for result := range results {
fmt.Println(result)
}
}
Producer-Consumer Pattern
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type Message struct {
ID int
Content string
Timestamp time.Time
}
type Producer struct {
id int
output chan<- Message
quit chan struct{}
interval time.Duration
}
func NewProducer(id int, output chan<- Message, interval time.Duration) *Producer {
return &Producer{
id: id,
output: output,
quit: make(chan struct{}),
interval: interval,
}
}
func (p *Producer) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
messageID := 0
for {
select {
case <-ticker.C:
msg := Message{
ID: messageID,
Content: fmt.Sprintf("Message from producer %d", p.id),
Timestamp: time.Now(),
}
select {
case p.output <- msg:
fmt.Printf("Producer %d sent message %d\n", p.id, messageID)
messageID++
case <-ctx.Done():
fmt.Printf("Producer %d stopping due to context cancellation\n", p.id)
return
}
case <-ctx.Done():
fmt.Printf("Producer %d stopping\n", p.id)
return
}
}
}
type Consumer struct {
id int
input <-chan Message
}
func NewConsumer(id int, input <-chan Message) *Consumer {
return &Consumer{
id: id,
input: input,
}
}
func (c *Consumer) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case msg, ok := <-c.input:
if !ok {
fmt.Printf("Consumer %d: input channel closed\n", c.id)
return
}
// Simulate processing time
processingTime := time.Duration(rand.Intn(500)) * time.Millisecond
time.Sleep(processingTime)
fmt.Printf("Consumer %d processed message %d: %s (age: %v)\n",
c.id, msg.ID, msg.Content, time.Since(msg.Timestamp))
case <-ctx.Done():
fmt.Printf("Consumer %d stopping\n", c.id)
return
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create channel with buffer
messageQueue := make(chan Message, 10)
var wg sync.WaitGroup
// Start producers
numProducers := 2
for i := 0; i < numProducers; i++ {
producer := NewProducer(i, messageQueue, 500*time.Millisecond)
wg.Add(1)
go producer.Start(ctx, &wg)
}
// Start consumers
numConsumers := 3
for i := 0; i < numConsumers; i++ {
consumer := NewConsumer(i, messageQueue)
wg.Add(1)
go consumer.Start(ctx, &wg)
}
// Wait for context to complete
<-ctx.Done()
// Close the message queue
close(messageQueue)
// Wait for all goroutines to finish
wg.Wait()
fmt.Println("All producers and consumers stopped")
}
Advanced Patterns
Rate Limiting
package main
import (
"context"
"fmt"
"sync"
"time"
)
type RateLimiter struct {
tokens chan struct{}
ticker *time.Ticker
quit chan struct{}
}
func NewRateLimiter(rate int, burst int) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, burst),
ticker: time.NewTicker(time.Second / time.Duration(rate)),
quit: make(chan struct{}),
}
// Fill initial burst
for i := 0; i < burst; i++ {
rl.tokens <- struct{}{}
}
// Start token refill
go rl.refill()
return rl
}
func (rl *RateLimiter) refill() {
defer rl.ticker.Stop()
for {
select {
case <-rl.ticker.C:
select {
case rl.tokens <- struct{}{}:
// Token added
default:
// Bucket full, drop token
}
case <-rl.quit:
return
}
}
}
func (rl *RateLimiter) Allow() bool {
select {
case <-rl.tokens:
return true
default:
return false
}
}
func (rl *RateLimiter) Wait(ctx context.Context) error {
select {
case <-rl.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (rl *RateLimiter) Stop() {
close(rl.quit)
}
// API request simulator
func makeAPIRequest(id int, rl *RateLimiter) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := rl.Wait(ctx); err != nil {
return fmt.Errorf("rate limit wait failed: %w", err)
}
// Simulate API call
fmt.Printf("Making API request %d at %s\n", id, time.Now().Format("15:04:05.000"))
time.Sleep(100 * time.Millisecond)
return nil
}
func main() {
// Allow 5 requests per second with burst of 10
rateLimiter := NewRateLimiter(5, 10)
defer rateLimiter.Stop()
var wg sync.WaitGroup
// Simulate 50 API requests
for i := 0; i < 50; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
if err := makeAPIRequest(id, rateLimiter); err != nil {
fmt.Printf("Request %d failed: %v\n", id, err)
}
}(i)
}
wg.Wait()
fmt.Println("All requests completed")
}
Circuit Breaker Pattern
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
type State int
const (
StateClosed State = iota
StateHalfOpen
StateOpen
)
type CircuitBreaker struct {
mu sync.RWMutex
state State
failureCount int
successCount int
failureThreshold int
successThreshold int
timeout time.Duration
lastFailureTime time.Time
nextAttempt time.Time
}
func NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
failureThreshold: failureThreshold,
successThreshold: successThreshold,
timeout: timeout,
}
}
func (cb *CircuitBreaker) Call(ctx context.Context, fn func() error) error {
cb.mu.Lock()
defer cb.mu.Unlock()
if cb.state == StateOpen {
if time.Now().Before(cb.nextAttempt) {
return errors.New("circuit breaker is open")
}
cb.state = StateHalfOpen
cb.successCount = 0
}
err := fn()
if err != nil {
cb.onFailure()
return err
}
cb.onSuccess()
return nil
}
func (cb *CircuitBreaker) onFailure() {
cb.failureCount++
cb.lastFailureTime = time.Now()
if cb.state == StateHalfOpen || cb.failureCount >= cb.failureThreshold {
cb.state = StateOpen
cb.nextAttempt = time.Now().Add(cb.timeout)
fmt.Printf("Circuit breaker opened at %s\n", time.Now().Format("15:04:05"))
}
}
func (cb *CircuitBreaker) onSuccess() {
cb.failureCount = 0
if cb.state == StateHalfOpen {
cb.successCount++
if cb.successCount >= cb.successThreshold {
cb.state = StateClosed
fmt.Printf("Circuit breaker closed at %s\n", time.Now().Format("15:04:05"))
}
}
}
func (cb *CircuitBreaker) GetState() State {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
// Simulate an unreliable service
func unreliableService() error {
// 30% chance of failure
if rand.Float32() < 0.3 {
return errors.New("service failure")
}
// Simulate network delay
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return nil
}
func main() {
cb := NewCircuitBreaker(3, 2, 5*time.Second)
var wg sync.WaitGroup
// Simulate 100 service calls
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
err := cb.Call(ctx, unreliableService)
state := cb.GetState()
stateStr := map[State]string{
StateClosed: "CLOSED",
StateHalfOpen: "HALF-OPEN",
StateOpen: "OPEN",
}
if err != nil {
fmt.Printf("Call %d failed: %v (state: %s)\n", id, err, stateStr[state])
} else {
fmt.Printf("Call %d succeeded (state: %s)\n", id, stateStr[state])
}
}(i)
time.Sleep(100 * time.Millisecond)
}
wg.Wait()
}
Error Handling and Graceful Shutdown
Context-based Cancellation
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type Server struct {
name string
wg sync.WaitGroup
}
func (s *Server) Start(ctx context.Context) {
s.wg.Add(1)
go s.run(ctx)
}
func (s *Server) run(ctx context.Context) {
defer s.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
fmt.Printf("%s started\n", s.name)
for {
select {
case <-ticker.C:
fmt.Printf("%s: heartbeat at %s\n", s.name, time.Now().Format("15:04:05"))
case <-ctx.Done():
fmt.Printf("%s: received shutdown signal\n", s.name)
// Simulate cleanup time
time.Sleep(2 * time.Second)
fmt.Printf("%s: cleanup completed\n", s.name)
return
}
}
}
func (s *Server) Stop() {
s.wg.Wait()
fmt.Printf("%s: stopped\n", s.name)
}
func main() {
// Create context that cancels on interrupt
ctx, cancel := context.WithCancel(context.Background())
// Handle OS signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Start multiple servers
servers := []*Server{
{name: "HTTP Server"},
{name: "gRPC Server"},
{name: "Background Worker"},
}
for _, server := range servers {
server.Start(ctx)
}
// Wait for signal
sig := <-sigChan
fmt.Printf("\nReceived signal: %v\n", sig)
// Cancel context to trigger graceful shutdown
cancel()
// Wait for all servers to stop
for _, server := range servers {
server.Stop()
}
fmt.Println("Application shutdown complete")
}
Performance Monitoring and Debugging
Goroutine Monitoring
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
type Monitor struct {
ticker *time.Ticker
quit chan struct{}
}
func NewMonitor(interval time.Duration) *Monitor {
return &Monitor{
ticker: time.NewTicker(interval),
quit: make(chan struct{}),
}
}
func (m *Monitor) Start() {
go func() {
for {
select {
case <-m.ticker.C:
m.printStats()
case <-m.quit:
m.ticker.Stop()
return
}
}
}()
}
func (m *Monitor) Stop() {
close(m.quit)
}
func (m *Monitor) printStats() {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
fmt.Printf("=== Runtime Stats ===\n")
fmt.Printf("Goroutines: %d\n", runtime.NumGoroutine())
fmt.Printf("CPUs: %d\n", runtime.NumCPU())
fmt.Printf("Memory Alloc: %d KB\n", memStats.Alloc/1024)
fmt.Printf("Total Alloc: %d KB\n", memStats.TotalAlloc/1024)
fmt.Printf("GC Cycles: %d\n", memStats.NumGC)
fmt.Println("====================")
}
// Simulate workload
func worker(id int, wg *sync.WaitGroup, duration time.Duration) {
defer wg.Done()
fmt.Printf("Worker %d started\n", id)
// Simulate memory allocation
data := make([]byte, 1024*1024) // 1MB
for i := 0; i < len(data); i++ {
data[i] = byte(i % 256)
}
time.Sleep(duration)
fmt.Printf("Worker %d finished\n", id)
}
func main() {
monitor := NewMonitor(2 * time.Second)
monitor.Start()
defer monitor.Stop()
var wg sync.WaitGroup
// Launch workers in waves
for wave := 0; wave < 3; wave++ {
fmt.Printf("\n=== Starting wave %d ===\n", wave+1)
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(wave*10+i, &wg, time.Duration(2+wave)*time.Second)
}
time.Sleep(time.Second)
}
wg.Wait()
// Force garbage collection
runtime.GC()
time.Sleep(time.Second)
fmt.Println("\nAll workers completed")
}
Best Practices and Anti-patterns
Memory Leaks Prevention
package main
import (
"context"
"fmt"
"sync"
"time"
)
// BAD: Goroutine leak
func badPattern() {
for i := 0; i < 10; i++ {
go func(id int) {
// This goroutine will run forever
for {
fmt.Printf("Goroutine %d running\n", id)
time.Sleep(time.Second)
}
}(i)
}
}
// GOOD: Proper cleanup with context
func goodPattern(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("Goroutine %d running\n", id)
case <-ctx.Done():
fmt.Printf("Goroutine %d stopping\n", id)
return
}
}
}(i)
}
wg.Wait()
}
// Channel cleanup patterns
func channelCleanupExample() {
ch := make(chan int, 10)
// Producer
go func() {
defer close(ch) // Always close channels when done
for i := 0; i < 5; i++ {
ch <- i
}
}()
// Consumer
for value := range ch {
fmt.Printf("Received: %d\n", value)
}
}
func main() {
fmt.Println("=== Good Pattern Example ===")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
goodPattern(ctx)
fmt.Println("\n=== Channel Cleanup Example ===")
channelCleanupExample()
}
Conclusion
Go's concurrency model provides powerful primitives for building scalable applications. The key patterns covered in this guide—worker pools, pipelines, producer-consumer, rate limiting, and circuit breakers—form the foundation for most concurrent systems.
Remember these essential principles:
- Always provide cleanup mechanisms using context cancellation
- Avoid goroutine leaks by ensuring all goroutines can terminate
- Use buffered channels appropriately to prevent blocking
- Monitor your concurrent systems to detect issues early
- Test concurrent code thoroughly including race conditions and deadlocks
Master these patterns and you'll be able to build robust, efficient concurrent applications that can handle real-world workloads while maintaining code clarity and maintainability.