CSP 并不是新技术,Communicating Sequential Processes 是 Tony Hoare 在 1978 年就提出来的概念,甚至在更早的 1975 年,Edsger Dijkstra 的 Guarded Command Language 里面,也能看到 CSP 的影子。
还有其他的一些语言,也有类似的并发模型
Occam (May, 1983)
Erlang (Armstrong, 1986)
Newsqueak (Pike, 1988)
Concurrent ML (Reppy, 1993)
Alef (Winterbottom, 1995)
Limbo (Dorward, Pike, Winterbottom, 1996).
Venice / Golang 和 Erlang 的差异
Venice / Golang 通过 channels 来实现 CSP。
Erlang 是最接近于原始的 CSP 定义的,通过 name 进行通信,而非 channel。
它们的模型其实是一致的,只不过具体的表现形式有差异。
粗略来看相当于:writing to a file by name (process, Erlang) vs. writing to a file descriptor (channel, Venice / Golang).
CSP 的基本使用
这篇文章最主要的目的是讨论并发模式,为了避免陷入编程语言本身的各种细节,我们只会使用到 Swift 很少的语法特性。
从下面这个简单的 boring 函数开始
1 2 3 4 5 6 7 8 9 10 11 12
import Foundation
privatefuncboring(msg: String) { for i in0...10 { print("\(msg)\(i)") usleep(100) } }
publicfuncrun01() { boring("this is a boring func") }
很容易想象到,这段代码的执行结果会是下买这个样子
1 2 3 4 5 6 7 8 9 10 11
this is a boring func 0 this is a boring func 1 this is a boring func 2 this is a boring func 3 this is a boring func 4 this is a boring func 5 this is a boring func 6 this is a boring func 7 this is a boring func 8 this is a boring func 9 this is a boring func 10
privatefuncboring(msg: String) { for i in0..<Int.max { print("\(msg)\(i)") nap(for: Int(arc4random_uniform(1000) + 1).milliseconds)//sleep } }
publicfuncrun03() { co { boring("co a less boring func") } /** //if do not want run03() finish, run the loop below for i in 0.. yield } */ print("run03() will return") }
上面这段代码的运行结果如下
1 2
co a less boring func 0 run03() will return
可以看到,boring函数里面的循环只执行了一次,这是因为 co 函数是立刻返回的,紧接着,run03() 执行完 print 后也立刻返回,然后 run03() 的调用者 main 函数也就执行结束了 (进程结束),之前 co 启动的协程自然也就无法继续执行了。
如果想让 co 里面的协程一直运行下去,可以在 co 调用返回后,执行代码中的那段 for loop。
要注意的一点是,for loop 里面调用的 yield,是 Venice 引入的一种操作,意思是让出 CPU 给其他的协程。Golang 是不需要手动进行这种调用的,runtime 会自动的进行调度。
privatefuncboring(msg: String) { for i in0..<Int.max { print("\(msg)\(i)") nap(for: Int(arc4random_uniform(1000) + 1).milliseconds) } }
publicfuncrun04() { co(boring("co a less boring func")) print("I'm listening") nap(for: 2.second) print("You're boring; I'm leaving.") }
这段代码的执行结果是下面这个样子的
1 2 3 4 5 6 7
co a less boring func 0 I'm listening co a less boring func 1 co a less boring func 2 co a less boring func 3 co a less boring func 4 You're boring; I'm leaving.
privatefuncboring(msg msg: String, channel: SendingChannel ) { for i in0..<Int.max { channel.send("\(msg)\(i)") nap(for: Int(arc4random_uniform(1000) + 1).milliseconds) } }
publicfuncrun05() { let channel = Channel<String>() co { boring(msg: "co a less boring func", channel: channel.sendingChannel) } for_in0..<5 { print("You say: \(channel.receivingChannel.receive()!)") } print("You're boring; I'm leaving.") channel.close() }
运行结果如下
1 2 3 4 5 6
You say: co a less boring func 0 You say: co a less boring func 1 You say: co a less boring func 2 You say: co a less boring func 3 You say: co a less boring func 4 You're boring; I'm leaving.
privatefuncboring(msg: String) -> ReceivingChannel<String> { let channel = Channel<String>() co { for i in0..<Int.max { channel.send("\(msg)\(i)") nap(for: Int(arc4random_uniform(1000) + 1).milliseconds) } } return channel.receivingChannel }
publicfuncrun06() { let receivingChannel = boring("co a less boring func") for_in0..<5 { print("You say: \(receivingChannel.receive()!)") } print("You're boring; I'm leaving.") }
这段代码和前面的代码的运行结果,没有什么差别
1 2 3 4 5 6
You say: co a less boring func 0 You say: co a less boring func 1 You say: co a less boring func 2 You say: co a less boring func 3 You say: co a less boring func 4 You're boring; I'm leaving.
但是代码本身确有明显的变化,boring 函数返回一个 channel 给调用者,同时,在 boring 函数内部,通过 co 启动一个新的协程做具体的业务,并且通过刚才创建的 channel 把结果发送出去。
利用 channel 作为 service 的接口
boring 函数对外提供了一个 service,这个 service 运行在独立的协程里面,并且通过channel 把数据传递给 service 的使用者。
privatefuncboring(msg: String) -> ReceivingChannel<String> { let channel = Channel<String>() co { for i in0..<Int.max { let sleepTime = Int(arc4random_uniform(1000) + 1).milliseconds channel.send("\(msg)\(i) (will sleep \(Int(sleepTime * 1000)) ms)") nap(for: sleepTime) } } return channel.receivingChannel }
publicfuncrun07() { let joe = boring("Joe") let ann = boring("Ann") for_in0..<5 { print("\(joe.receive()!)") print("\(ann.receive()!)") } print("You're both boring; I'm leaving.") }
运行结果如下
1 2 3 4 5 6 7 8 9 10 11
Joe 0 (will sleep 996 ms) Ann 0 (will sleep 681 ms) Joe 1 (will sleep 173 ms) Ann 1 (will sleep 147 ms) Joe 2 (will sleep 750 ms) Ann 2 (will sleep 374 ms) Joe 3 (will sleep 318 ms) Ann 3 (will sleep 705 ms) Joe 4 (will sleep 126 ms) Ann 4 (will sleep 828 ms) You're both boring; I'm leaving.
多路复用 (Multiplexing)
前面 run07() 里面的代码,始终都是先从 joe 里面读取数据,然后再从 ann 里面读取。如果 ann 里面的数据早于 joe 里面的数据就发送了,由于 channel 的同步特性,ann channel 其实会阻塞在它的 send 操作上,直到 run07 从 joe 里面读取完数据后,ann 所在的协程才能继续运行。
为了改善这种情况,可以使用 fan-in 模式。不管是 joe 还是 ann,只要有数据准备好并且执行了 send 操作,都可以立刻读取到。
privatefuncboring(msg: String) -> ReceivingChannel<String> { let channel = Channel<String>() co { for i in0..<Int.max { let sleepTime = Int(arc4random_uniform(1000) + 1).milliseconds nap(for: sleepTime) channel.send("\(msg)\(i) (will sleep \(Int(sleepTime * 1000)) ms)") } } return channel.receivingChannel }
publicfuncrun11() { let joe = boring("Joe") var done = false while !done { select { when in when.receive(from: joe) { value in print("\(value)") } when.timeout(800.millisecond.fromNow()) { print("You are too slow.") done = true } } } print("You're boring; I'm leaving.") }
运行结果是下面这个样子
1 2 3 4 5 6
Joe 0 (will sleep 48 ms) Joe 1 (will sleep 706 ms) Joe 2 (will sleep 747 ms) Joe 3 (will sleep 304 ms) You are too slow. You're boring; I'm leaving.
privatefuncboring(msg: String) -> ReceivingChannel<String> { let channel = Channel<String>() co { for i in0..<Int.max { let sleepTime = Int(arc4random_uniform(1000) + 1).milliseconds nap(for: sleepTime) channel.send("\(msg)\(i) (will sleep \(Int(sleepTime * 1000)) ms)") } } return channel.receivingChannel }
publicfuncrun12() { let joe = boring("Joe") let timeout = Timer(timingOut: 5.second.fromNow()).channel var done = false while !done { select { when in when.receive(from: joe) { value in print("\(value)") } when.receive(from: timeout) { _in print("You are too slow.") done = true } } } print("You're boring; I'm leaving.") }
运行结果如下
1 2 3 4 5 6 7 8 9 10 11 12
Joe 0 (will sleep 586 ms) Joe 1 (will sleep 226 ms) Joe 2 (will sleep 297 ms) Joe 3 (will sleep 850 ms) Joe 4 (will sleep 442 ms) Joe 5 (will sleep 525 ms) Joe 6 (will sleep 730 ms) Joe 7 (will sleep 227 ms) Joe 8 (will sleep 630 ms) Joe 9 (will sleep 411 ms) You are too slow. You're boring; I'm leaving.
privatefuncboring(msg msg: String, quit: ReceivingChannel) -> ReceivingChannel<String> { let channel = Channel<String>() co { forSelect { when, done in let sleepTime = Int(arc4random_uniform(1000) + 1).milliseconds nap(for: sleepTime) when.send("\(msg), and will sleep \(Int(sleepTime * 1000)) ms", to: channel) { //print("sent value") } when.receive(from: quit) { _in done() } } channel.close() } return channel.receivingChannel }
publicfuncrun13() { let quit = Channel<Bool>() let joe = boring(msg: "Joe", quit: quit.receivingChannel) for_in0..<Int64(arc4random_uniform(10) + 1) { print("\(joe.receive()!)") } quit.send(true) print("You're boring; I'm leaving.") }
运行结果仍然是类似的
1 2 3 4 5 6 7 8 9
Joe, and will sleep 154 ms Joe, and will sleep 390 ms Joe, and will sleep 133 ms Joe, and will sleep 520 ms Joe, and will sleep 752 ms Joe, and will sleep 482 ms Joe, and will sleep 47 ms Joe, and will sleep 359 ms You're boring; I'm leaving.
privatefunccleanup() { print("Here, do clean up") }
privatefuncboring(msg msg: String, quit: Channel) -> ReceivingChannel<String> { let channel = Channel<String>() co { forSelect { when, done in let sleepTime = Int(arc4random_uniform(1000) + 1).milliseconds nap(for: sleepTime) when.send("\(msg), and will sleep \(Int(sleepTime * 1000)) ms", to: channel) { //print("sent value") } when.receive(from: quit) { _in cleanup() quit.send("See you!") done() } } channel.close() } return channel.receivingChannel }
publicfuncrun14() { let quit = Channel<String>() let joe = boring(msg: "Joe", quit: quit) for_in0..<Int64(arc4random_uniform(10) + 1) { print("\(joe.receive()!)") } quit.send("Bye") print("Joe says: \(quit.receive()!)") print("You're boring; I'm leaving.") }
现在运行结果会变成下面这个样子
1 2 3 4 5 6 7 8 9 10
Joe, and will sleep 220 ms Joe, and will sleep 736 ms Joe, and will sleep 308 ms Joe, and will sleep 858 ms Joe, and will sleep 527 ms Joe, and will sleep 163 ms Joe, and will sleep 844 ms Here, do clean up Joe says: See you! You're boring; I'm leaving.