Go Concurrency Patterns

Go Concurrency Patterns

Introduction

Go's concurrency model is one of its most powerful features, built around goroutines and channels that make concurrent programming more intuitive and safer than traditional thread-based approaches. This comprehensive guide explores advanced concurrency patterns that enable you to build scalable, efficient, and maintainable concurrent applications.

Fundamental Concepts

Goroutines vs Threads

Goroutines are lightweight, managed by the Go runtime, and much more efficient than OS threads:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    fmt.Printf("Number of CPUs: %d\n", runtime.NumCPU())
    fmt.Printf("Number of Goroutines: %d\n", runtime.NumGoroutine())

    var wg sync.WaitGroup

    // Launch 10000 goroutines
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            time.Sleep(time.Second)
            if id%1000 == 0 {
                fmt.Printf("Goroutine %d completed\n", id)
            }
        }(i)
    }

    wg.Wait()
    fmt.Printf("Final Goroutines: %d\n", runtime.NumGoroutine())
}

Channel Fundamentals

Channels are the pipes that connect concurrent goroutines:

package main

import (
    "fmt"
    "time"
)

func main() {
    // Unbuffered channel
    messages := make(chan string)

    // Buffered channel
    buffered := make(chan int, 3)

    // Send and receive
    go func() {
        messages <- "Hello"
        messages <- "World"
        close(messages)
    }()

    // Range over channel
    for msg := range messages {
        fmt.Println(msg)
    }

    // Buffered channel usage
    buffered <- 1
    buffered <- 2
    buffered <- 3

    fmt.Println(<-buffered)
    fmt.Println(<-buffered)
    fmt.Println(<-buffered)
}

Core Concurrency Patterns

Worker Pool Pattern

The worker pool pattern distributes work across a fixed number of goroutines:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    ID     int
    Data   string
    Result chan string
}

type WorkerPool struct {
    workers   int
    jobQueue  chan Job
    wg        sync.WaitGroup
    quit      chan struct{}
}

func NewWorkerPool(workers int, queueSize int) *WorkerPool {
    return &WorkerPool{
        workers:  workers,
        jobQueue: make(chan Job, queueSize),
        quit:     make(chan struct{}),
    }
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()

    for {
        select {
        case job := <-wp.jobQueue:
            // Simulate work
            processingTime := time.Duration(rand.Intn(100)) * time.Millisecond
            time.Sleep(processingTime)

            result := fmt.Sprintf("Worker %d processed job %d: %s", id, job.ID, job.Data)
            job.Result <- result

        case <-wp.quit:
            fmt.Printf("Worker %d stopping\n", id)
            return
        }
    }
}

func (wp *WorkerPool) Submit(job Job) {
    wp.jobQueue <- job
}

func (wp *WorkerPool) Stop() {
    close(wp.quit)
    wp.wg.Wait()
    close(wp.jobQueue)
}

func main() {
    pool := NewWorkerPool(5, 100)
    pool.Start()

    // Submit jobs
    for i := 0; i < 20; i++ {
        result := make(chan string, 1)
        job := Job{
            ID:     i,
            Data:   fmt.Sprintf("task-%d", i),
            Result: result,
        }

        pool.Submit(job)

        go func(jobID int) {
            select {
            case res := <-result:
                fmt.Printf("Job %d result: %s\n", jobID, res)
            case <-time.After(time.Second):
                fmt.Printf("Job %d timed out\n", jobID)
            }
        }(i)
    }

    time.Sleep(2 * time.Second)
    pool.Stop()
}

Pipeline Pattern

Pipelines break down processing into stages connected by channels:

package main

import (
    "fmt"
    "strings"
    "sync"
)

// Stage 1: Generate data
func generator(data []string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for _, item := range data {
            out <- item
        }
    }()
    return out
}

// Stage 2: Transform data
func transformer(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for item := range in {
            // Transform: convert to uppercase and add prefix
            transformed := fmt.Sprintf("PROCESSED: %s", strings.ToUpper(item))
            out <- transformed
        }
    }()
    return out
}

// Stage 3: Filter data
func filter(in <-chan string, pattern string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for item := range in {
            if strings.Contains(item, pattern) {
                out <- item
            }
        }
    }()
    return out
}

// Fan-out pattern: distribute work to multiple goroutines
func fanOut(in <-chan string, workers int) []<-chan string {
    outputs := make([]<-chan string, workers)

    for i := 0; i < workers; i++ {
        output := make(chan string)
        outputs[i] = output

        go func(out chan<- string) {
            defer close(out)
            for item := range in {
                // Simulate processing
                processed := fmt.Sprintf("Worker processed: %s", item)
                out <- processed
            }
        }(output)
    }

    return outputs
}

// Fan-in pattern: merge multiple channels into one
func fanIn(inputs ...<-chan string) <-chan string {
    out := make(chan string)
    var wg sync.WaitGroup

    for _, input := range inputs {
        wg.Add(1)
        go func(ch <-chan string) {
            defer wg.Done()
            for item := range ch {
                out <- item
            }
        }(input)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    data := []string{
        "hello world",
        "golang programming",
        "concurrency patterns",
        "channel communication",
        "goroutine management",
    }

    // Build pipeline
    stage1 := generator(data)
    stage2 := transformer(stage1)
    stage3 := filter(stage2, "GOLANG")

    // Fan-out to multiple workers
    workers := fanOut(stage3, 3)

    // Fan-in results
    results := fanIn(workers...)

    // Collect results
    for result := range results {
        fmt.Println(result)
    }
}

Producer-Consumer Pattern

package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Message struct {
    ID        int
    Content   string
    Timestamp time.Time
}

type Producer struct {
    id       int
    output   chan<- Message
    quit     chan struct{}
    interval time.Duration
}

func NewProducer(id int, output chan<- Message, interval time.Duration) *Producer {
    return &Producer{
        id:       id,
        output:   output,
        quit:     make(chan struct{}),
        interval: interval,
    }
}

func (p *Producer) Start(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(p.interval)
    defer ticker.Stop()

    messageID := 0

    for {
        select {
        case <-ticker.C:
            msg := Message{
                ID:        messageID,
                Content:   fmt.Sprintf("Message from producer %d", p.id),
                Timestamp: time.Now(),
            }

            select {
            case p.output <- msg:
                fmt.Printf("Producer %d sent message %d\n", p.id, messageID)
                messageID++
            case <-ctx.Done():
                fmt.Printf("Producer %d stopping due to context cancellation\n", p.id)
                return
            }

        case <-ctx.Done():
            fmt.Printf("Producer %d stopping\n", p.id)
            return
        }
    }
}

type Consumer struct {
    id    int
    input <-chan Message
}

func NewConsumer(id int, input <-chan Message) *Consumer {
    return &Consumer{
        id:    id,
        input: input,
    }
}

func (c *Consumer) Start(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()

    for {
        select {
        case msg, ok := <-c.input:
            if !ok {
                fmt.Printf("Consumer %d: input channel closed\n", c.id)
                return
            }

            // Simulate processing time
            processingTime := time.Duration(rand.Intn(500)) * time.Millisecond
            time.Sleep(processingTime)

            fmt.Printf("Consumer %d processed message %d: %s (age: %v)\n", 
                c.id, msg.ID, msg.Content, time.Since(msg.Timestamp))

        case <-ctx.Done():
            fmt.Printf("Consumer %d stopping\n", c.id)
            return
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // Create channel with buffer
    messageQueue := make(chan Message, 10)

    var wg sync.WaitGroup

    // Start producers
    numProducers := 2
    for i := 0; i < numProducers; i++ {
        producer := NewProducer(i, messageQueue, 500*time.Millisecond)
        wg.Add(1)
        go producer.Start(ctx, &wg)
    }

    // Start consumers
    numConsumers := 3
    for i := 0; i < numConsumers; i++ {
        consumer := NewConsumer(i, messageQueue)
        wg.Add(1)
        go consumer.Start(ctx, &wg)
    }

    // Wait for context to complete
    <-ctx.Done()

    // Close the message queue
    close(messageQueue)

    // Wait for all goroutines to finish
    wg.Wait()
    fmt.Println("All producers and consumers stopped")
}

Advanced Patterns

Rate Limiting

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type RateLimiter struct {
    tokens chan struct{}
    ticker *time.Ticker
    quit   chan struct{}
}

func NewRateLimiter(rate int, burst int) *RateLimiter {
    rl := &RateLimiter{
        tokens: make(chan struct{}, burst),
        ticker: time.NewTicker(time.Second / time.Duration(rate)),
        quit:   make(chan struct{}),
    }

    // Fill initial burst
    for i := 0; i < burst; i++ {
        rl.tokens <- struct{}{}
    }

    // Start token refill
    go rl.refill()

    return rl
}

func (rl *RateLimiter) refill() {
    defer rl.ticker.Stop()

    for {
        select {
        case <-rl.ticker.C:
            select {
            case rl.tokens <- struct{}{}:
                // Token added
            default:
                // Bucket full, drop token
            }
        case <-rl.quit:
            return
        }
    }
}

func (rl *RateLimiter) Allow() bool {
    select {
    case <-rl.tokens:
        return true
    default:
        return false
    }
}

func (rl *RateLimiter) Wait(ctx context.Context) error {
    select {
    case <-rl.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (rl *RateLimiter) Stop() {
    close(rl.quit)
}

// API request simulator
func makeAPIRequest(id int, rl *RateLimiter) error {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := rl.Wait(ctx); err != nil {
        return fmt.Errorf("rate limit wait failed: %w", err)
    }

    // Simulate API call
    fmt.Printf("Making API request %d at %s\n", id, time.Now().Format("15:04:05.000"))
    time.Sleep(100 * time.Millisecond)

    return nil
}

func main() {
    // Allow 5 requests per second with burst of 10
    rateLimiter := NewRateLimiter(5, 10)
    defer rateLimiter.Stop()

    var wg sync.WaitGroup

    // Simulate 50 API requests
    for i := 0; i < 50; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            if err := makeAPIRequest(id, rateLimiter); err != nil {
                fmt.Printf("Request %d failed: %v\n", id, err)
            }
        }(i)
    }

    wg.Wait()
    fmt.Println("All requests completed")
}

Circuit Breaker Pattern

package main

import (
    "context"
    "errors"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type State int

const (
    StateClosed State = iota
    StateHalfOpen
    StateOpen
)

type CircuitBreaker struct {
    mu                  sync.RWMutex
    state              State
    failureCount       int
    successCount       int
    failureThreshold   int
    successThreshold   int
    timeout           time.Duration
    lastFailureTime   time.Time
    nextAttempt       time.Time
}

func NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        state:            StateClosed,
        failureThreshold: failureThreshold,
        successThreshold: successThreshold,
        timeout:          timeout,
    }
}

func (cb *CircuitBreaker) Call(ctx context.Context, fn func() error) error {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if cb.state == StateOpen {
        if time.Now().Before(cb.nextAttempt) {
            return errors.New("circuit breaker is open")
        }
        cb.state = StateHalfOpen
        cb.successCount = 0
    }

    err := fn()

    if err != nil {
        cb.onFailure()
        return err
    }

    cb.onSuccess()
    return nil
}

func (cb *CircuitBreaker) onFailure() {
    cb.failureCount++
    cb.lastFailureTime = time.Now()

    if cb.state == StateHalfOpen || cb.failureCount >= cb.failureThreshold {
        cb.state = StateOpen
        cb.nextAttempt = time.Now().Add(cb.timeout)
        fmt.Printf("Circuit breaker opened at %s\n", time.Now().Format("15:04:05"))
    }
}

func (cb *CircuitBreaker) onSuccess() {
    cb.failureCount = 0

    if cb.state == StateHalfOpen {
        cb.successCount++
        if cb.successCount >= cb.successThreshold {
            cb.state = StateClosed
            fmt.Printf("Circuit breaker closed at %s\n", time.Now().Format("15:04:05"))
        }
    }
}

func (cb *CircuitBreaker) GetState() State {
    cb.mu.RLock()
    defer cb.mu.RUnlock()
    return cb.state
}

// Simulate an unreliable service
func unreliableService() error {
    // 30% chance of failure
    if rand.Float32() < 0.3 {
        return errors.New("service failure")
    }

    // Simulate network delay
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    return nil
}

func main() {
    cb := NewCircuitBreaker(3, 2, 5*time.Second)

    var wg sync.WaitGroup

    // Simulate 100 service calls
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            ctx, cancel := context.WithTimeout(context.Background(), time.Second)
            defer cancel()

            err := cb.Call(ctx, unreliableService)

            state := cb.GetState()
            stateStr := map[State]string{
                StateClosed:   "CLOSED",
                StateHalfOpen: "HALF-OPEN",
                StateOpen:     "OPEN",
            }

            if err != nil {
                fmt.Printf("Call %d failed: %v (state: %s)\n", id, err, stateStr[state])
            } else {
                fmt.Printf("Call %d succeeded (state: %s)\n", id, stateStr[state])
            }
        }(i)

        time.Sleep(100 * time.Millisecond)
    }

    wg.Wait()
}

Error Handling and Graceful Shutdown

Context-based Cancellation

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

type Server struct {
    name string
    wg   sync.WaitGroup
}

func (s *Server) Start(ctx context.Context) {
    s.wg.Add(1)
    go s.run(ctx)
}

func (s *Server) run(ctx context.Context) {
    defer s.wg.Done()

    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    fmt.Printf("%s started\n", s.name)

    for {
        select {
        case <-ticker.C:
            fmt.Printf("%s: heartbeat at %s\n", s.name, time.Now().Format("15:04:05"))
        case <-ctx.Done():
            fmt.Printf("%s: received shutdown signal\n", s.name)

            // Simulate cleanup time
            time.Sleep(2 * time.Second)
            fmt.Printf("%s: cleanup completed\n", s.name)
            return
        }
    }
}

func (s *Server) Stop() {
    s.wg.Wait()
    fmt.Printf("%s: stopped\n", s.name)
}

func main() {
    // Create context that cancels on interrupt
    ctx, cancel := context.WithCancel(context.Background())

    // Handle OS signals
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    // Start multiple servers
    servers := []*Server{
        {name: "HTTP Server"},
        {name: "gRPC Server"},
        {name: "Background Worker"},
    }

    for _, server := range servers {
        server.Start(ctx)
    }

    // Wait for signal
    sig := <-sigChan
    fmt.Printf("\nReceived signal: %v\n", sig)

    // Cancel context to trigger graceful shutdown
    cancel()

    // Wait for all servers to stop
    for _, server := range servers {
        server.Stop()
    }

    fmt.Println("Application shutdown complete")
}

Performance Monitoring and Debugging

Goroutine Monitoring

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

type Monitor struct {
    ticker *time.Ticker
    quit   chan struct{}
}

func NewMonitor(interval time.Duration) *Monitor {
    return &Monitor{
        ticker: time.NewTicker(interval),
        quit:   make(chan struct{}),
    }
}

func (m *Monitor) Start() {
    go func() {
        for {
            select {
            case <-m.ticker.C:
                m.printStats()
            case <-m.quit:
                m.ticker.Stop()
                return
            }
        }
    }()
}

func (m *Monitor) Stop() {
    close(m.quit)
}

func (m *Monitor) printStats() {
    var memStats runtime.MemStats
    runtime.ReadMemStats(&memStats)

    fmt.Printf("=== Runtime Stats ===\n")
    fmt.Printf("Goroutines: %d\n", runtime.NumGoroutine())
    fmt.Printf("CPUs: %d\n", runtime.NumCPU())
    fmt.Printf("Memory Alloc: %d KB\n", memStats.Alloc/1024)
    fmt.Printf("Total Alloc: %d KB\n", memStats.TotalAlloc/1024)
    fmt.Printf("GC Cycles: %d\n", memStats.NumGC)
    fmt.Println("====================")
}

// Simulate workload
func worker(id int, wg *sync.WaitGroup, duration time.Duration) {
    defer wg.Done()

    fmt.Printf("Worker %d started\n", id)

    // Simulate memory allocation
    data := make([]byte, 1024*1024) // 1MB

    for i := 0; i < len(data); i++ {
        data[i] = byte(i % 256)
    }

    time.Sleep(duration)
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    monitor := NewMonitor(2 * time.Second)
    monitor.Start()
    defer monitor.Stop()

    var wg sync.WaitGroup

    // Launch workers in waves
    for wave := 0; wave < 3; wave++ {
        fmt.Printf("\n=== Starting wave %d ===\n", wave+1)

        for i := 0; i < 10; i++ {
            wg.Add(1)
            go worker(wave*10+i, &wg, time.Duration(2+wave)*time.Second)
        }

        time.Sleep(time.Second)
    }

    wg.Wait()

    // Force garbage collection
    runtime.GC()
    time.Sleep(time.Second)

    fmt.Println("\nAll workers completed")
}

Best Practices and Anti-patterns

Memory Leaks Prevention

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// BAD: Goroutine leak
func badPattern() {
    for i := 0; i < 10; i++ {
        go func(id int) {
            // This goroutine will run forever
            for {
                fmt.Printf("Goroutine %d running\n", id)
                time.Sleep(time.Second)
            }
        }(i)
    }
}

// GOOD: Proper cleanup with context
func goodPattern(ctx context.Context) {
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            ticker := time.NewTicker(time.Second)
            defer ticker.Stop()

            for {
                select {
                case <-ticker.C:
                    fmt.Printf("Goroutine %d running\n", id)
                case <-ctx.Done():
                    fmt.Printf("Goroutine %d stopping\n", id)
                    return
                }
            }
        }(i)
    }

    wg.Wait()
}

// Channel cleanup patterns
func channelCleanupExample() {
    ch := make(chan int, 10)

    // Producer
    go func() {
        defer close(ch) // Always close channels when done
        for i := 0; i < 5; i++ {
            ch <- i
        }
    }()

    // Consumer
    for value := range ch {
        fmt.Printf("Received: %d\n", value)
    }
}

func main() {
    fmt.Println("=== Good Pattern Example ===")
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    goodPattern(ctx)

    fmt.Println("\n=== Channel Cleanup Example ===")
    channelCleanupExample()
}

Conclusion

Go's concurrency model provides powerful primitives for building scalable applications. The key patterns covered in this guide—worker pools, pipelines, producer-consumer, rate limiting, and circuit breakers—form the foundation for most concurrent systems.

Remember these essential principles:

  1. Always provide cleanup mechanisms using context cancellation
  2. Avoid goroutine leaks by ensuring all goroutines can terminate
  3. Use buffered channels appropriately to prevent blocking
  4. Monitor your concurrent systems to detect issues early
  5. Test concurrent code thoroughly including race conditions and deadlocks

Master these patterns and you'll be able to build robust, efficient concurrent applications that can handle real-world workloads while maintaining code clarity and maintainability.