병행 처리
머신의 CPU를 최대한 활용하도록 매퍼와 리듀서를 고루틴 여러 개로 동시에 동작시켜 보자.
runConcurrentMap() 함수에서는 매퍼를 고루틴 여러 개로 동작시킨다.
func runConcurrentMap(paths <-chan string) <-chan partial { out := make(chan partial, BUF_SIZE) // mapper 작업을 CPU 코어 수만큼 동시에 처리하게 함 var wg sync.WaitGroup for i := 0; i < runtime.NumCPU(); i++ { wg.Add(1) go func() { defer wg.Done() for path := range paths { mapper(path, out) } }() } go func() { // 모든 mapper에서 작업을 완료할 때까지 대기한 후 out 채널을 닫음 wg.Wait() close(out) }() return out }
리듀서도 고루틴 여러 개로 동작하도록 runConcurrentReduce() 함수를 작성해 보자.
func runConcurrentReduce(in intermediate) summary { result := make(summary) var wg sync.WaitGroup for token, value := range in { wg.Add(1) go func(token string, positions []scanner.Position) { defer wg.Done() result[token] = reducer(token, positions) }(token, value) } wg.Wait() return result }
sync.WaitGroup을 사용하면 모든 고루틴이 완료되는 시점을 알 수 있다. wg.Add() 메서드로 대기해야 할 고루틴 수를 설정하고 wg.Wait() 메서드를 호출하면 모든 고루틴이 종료될 때까지 대기한다. wg.Done() 메서드는 sync.WaitGroup에 고루틴의 종료를 알린다. 즉, 모든 고루틴에서 wg.Done() 메서드를 호출하면 wg.Wait()의 대기가 해제된다. 주의할 점은 고루틴을 종료할 때 wg.Done() 메서드를 호출하지 않으면 교착상태가 발생할 수 있다는 것이다. wg.Done()을 반드시 수행하기 위해 wg.Done() 호출 부분을 defer로 동작시킨다.
다음은 main 함수이다.
func main() { paths := find(parseArgs()) fmt.Println(runConcurrentReduce(collect(runConcurrentMap(paths)))) }