더북(TheBook)

병행 처리

머신의 CPU를 최대한 활용하도록 매퍼와 리듀서를 고루틴 여러 개로 동시에 동작시켜 보자.

runConcurrentMap() 함수에서는 매퍼를 고루틴 여러 개로 동작시킨다.

func runConcurrentMap(paths <-chan string) <-chan partial {
    out := make(chan partial, BUF_SIZE)
     
    // mapper 작업을 CPU 코어 수만큼 동시에 처리하게 함
    var wg sync.WaitGroup
    for i := 0; i < runtime.NumCPU(); i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for path := range paths {
                mapper(path, out)
            }
        }()
    }
     
    go func() {
        // 모든 mapper에서 작업을 완료할 때까지 대기한 후 out 채널을 닫음
        wg.Wait()
        close(out)
    }()
   
    return out
}

리듀서도 고루틴 여러 개로 동작하도록 runConcurrentReduce() 함수를 작성해 보자.

func runConcurrentReduce(in intermediate) summary {
    result := make(summary)
    var wg sync.WaitGroup
    for token, value := range in {
        wg.Add(1)
        go func(token string, positions []scanner.Position) {
            defer wg.Done()
            result[token] = reducer(token, positions)
        }(token, value)
    }
    wg.Wait()
    return result
}

sync.WaitGroup을 사용하면 모든 고루틴이 완료되는 시점을 알 수 있다. wg.Add() 메서드로 대기해야 할 고루틴 수를 설정하고 wg.Wait() 메서드를 호출하면 모든 고루틴이 종료될 때까지 대기한다. wg.Done() 메서드는 sync.WaitGroup에 고루틴의 종료를 알린다. 즉, 모든 고루틴에서 wg.Done() 메서드를 호출하면 wg.Wait()의 대기가 해제된다. 주의할 점은 고루틴을 종료할 때 wg.Done() 메서드를 호출하지 않으면 교착상태가 발생할 수 있다는 것이다. wg.Done()을 반드시 수행하기 위해 wg.Done() 호출 부분을 defer로 동작시킨다.

다음은 main 함수이다.

func main() {
    paths := find(parseArgs())
    fmt.Println(runConcurrentReduce(collect(runConcurrentMap(paths))))
}

신간 소식 구독하기
뉴스레터에 가입하시고 이메일로 신간 소식을 받아 보세요.