Skip to content

Golang pipeline模式

Published: at 03:15 PM | 4 min read

pipeline流水线模式(管道模式),我们用这个模式来计算所有质数的和。

什么是质数

质数又称素数。指在一个大于1的自然数中,除了1和此整数自身外,没法被其他自然数整除的数。

C++代码实现

我们先用一般的方法实现一下,后面跟Golang实现进行对比

#include <iostream>
#include <vector>
#include <chrono>

bool is_prime(int value) {
    int n = value / 2;
    for (int i = 2; i <= n; i++) {
        if (value % i == 0) {
            return false;
        }
    }
    return value > 1;
}

std::vector<int> makeNums(int min, int max)
{
    std::vector<int> nums;
    nums.reserve(max - min + 1);
    for (int i = min; i <= max; i++) {
        nums.push_back(i);
    }
    return nums;
}

int main(int argc, char* argv[])
{    
    auto start = std::chrono::system_clock::now();

    auto nums = makeNums(1, 500000);
    int64_t sum = 0;
    for (auto n : nums) {
        if (is_prime(n)) {
            sum += n;
        }
    }

    auto end = std::chrono::system_clock::now();
    auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    std::cout << "sum:" << sum << std::endl;
    std::cout << "elapsed:" << elapsed.count() << "ms" << std::endl;

    system("pause");
    return 0;
}

我的windows开发机(4核),开了O2优化,跑了几次大概耗时:700~800ms

Golang pipeline实现

pipeline的好处是可以自由组合

package main

import (
	"fmt"
	"math"
	"sync"
	"time"
)

func chanInt(nums []int) <-chan int {
	out := make(chan int)
	go func() {
		for _, v := range nums {
			out <- v
		}
		close(out)
	}()
	return out
}

func chanSum(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		var sum = 0
		for v := range in {
			sum += v
		}
		out <- sum
		close(out)
	}()
	return out
}

func is_prime(value int) bool {
	n := int(math.Floor(float64(value) / 2))
	for i := 2; i <= n; i++ {
		if value%i == 0 {
			return false
		}
	}
	return value > 1
}

func prime(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for v := range in {
			if is_prime(v) {
				out <- v
			}
		}
		close(out)
	}()
	return out
}

type ChanIntFunc func([]int) <-chan int
type PipeFunc func(<-chan int) <-chan int

func pipeline(nums []int, chanInt ChanIntFunc, pipeFuncs ...PipeFunc) <-chan int {
	f := chanInt(nums)
	for _, p := range pipeFuncs {
		f = p(f)
	}
	return f
}

func makeRange(min, max int) []int {
	out := make([]int, 0, max-min+1)
	for i := min; i <= max; i++ {
		out = append(out, i)
	}
	return out
}

func main() {
	start := time.Now()
	for v := range pipeline(makeRange(1, 100000), chanInt, prime, chanSum) {
		fmt.Println(v)
	}

	fmt.Println(time.Now().Sub(start).Milliseconds())
}

同一个环境golang跑下来大概要花2600ms,是C++的3倍多。 大家可能觉得是用了goroutine和chan的原因,其实用C++类似的方式实现耗时也是一样的:

	start := time.Now()
	sum := 0
	nums := makeRange(1, 100000)
	for i := range nums {
		if is_prime(nums[i]) {
			sum += nums[i]
		}
	}
	fmt.Println(sum)
	fmt.Println(time.Now().Sub(start).Milliseconds())

Golang 并行实现

	start := time.Now()
	nums := chanInt(makeRange(1, 100000))
	const process = 5
	var chans [process]<-chan int
	for i := range chans {
		chans[i] = chanSum(prime(nums))
	}

	for n := range chanSum(merge(chans[:])) {
		fmt.Println(n)
	}
	fmt.Println(time.Now().Sub(start).Milliseconds())

merge实现

func merge(cs []<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
	wg.Add(len(cs))
	for _, c := range cs {
		go func(c <-chan int) {
			for n := range c {
				out <- n
			}
			wg.Done()
		}(c)
	}
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

这样跑下来耗时800多毫秒,跟C++差不多了。

结论

从这个例子来看golang和C++还是有一些性能差距的,当然这只是一个简单的特定例子,golang的优势不在此,对于这种密集型计算还是C/C++比较合适。