Venice
Venice provides CSP for Swift 3.0.
Features
- Coroutines
- Channels
- Fallible Channels
- Receive-only Channels
- Send-only Channels
- Channel Iteration
- Select
- Timers
- Tickers
- File Descriptor Polling
Venice wraps a fork of the C library libmill.
Installation
- Add
Venice
to yourPackage.swift
import PackageDescription let package = Package( dependencies: [ .Package(url: "https://github.com/VeniceX/Venice.git", majorVersion: 0, minor: 12) ] )
Usage
co
func doSomething() { print("did something") } // regular call doSomething() // coroutine call co(doSomething()) // coroutine closure co { print("did something else") }
nap
and wake
nap
stops the execution for the given amount of time, while wake
stops the execution until some moment.
co { // sleep for one second nap(for: 1.second) print("yawn") } // stop for two seconds so the program // doesn't terminate before the print let deadline = 2.seconds.fromNow() wake(at: deadline)
Always use nap
if you're setting up the time yourself. Use wake
only if you got deadline
from somewhere else.
after
after
runs the coroutine after the specified duration.
after(1.second) { print("yoo") } // same as co { nap(for: 1.second) print("yoo") }
every
every
runs the expression in a coroutine periodically. Call done() to leave the loop.
var count = 0 every(1.second) { done in print("yoo") count += 1 if count == 3 { done() } } // same as var count = 0 co { while true { nap(for: 1.second) print("yoo") count += 1 if count == 3 { break } } }
Channel
Channels are typed and return optionals wrapping the value or nil if the channel is closed and doesn't have any values left in the buffer.
let messages = Channel<String>() co(messages.send("ping")) let message = messages.receive()! print(message) // buffered channels let messages = Channel<String>(bufferSize: 2) messages.send("buffered") messages.send("channel") print(messages.receive()!) print(messages.receive()!)
ReceivingChannel
and SendingChannel
You can get a reference to a channel with receive or send only capabilities.
func receiveOnly(channel: ReceivingChannel<String>) { // can only receive from channel let string = channel.receive()! } func sendOnly(channel: SendingChannel<String>) { // can only send to channel channel.send("yo") } let channel = Channel<String>(bufferSize: 1) receiveOnly(channel.receivingChannel) sendOnly(channel.sendingChannel)
FallibleChannel
Fallible channels accept values and errors as well.
struct Error: ErrorProtocol {} let channel = FallibleChannel<String>(bufferSize: 2) channel.send("yo") channel.send(Error()) do { let yo = try channel.receive() try channel.receive() // will throw } catch { print("error") }
select
Sometimes select
can clash with the system libraries function with the same name select
. To solve this you can call Venice's select with Venice.select
or with the terser alias sel
.
let channel = Channel<String>() let fallibleChannel = FallibleChannel<String>() select { when in when.received(valueFrom: channel) { value in print("received \(value)") } when.received(resultFrom: fallibleChannel) { result in result.success { value in print(value) } result.failure { error in print(error) } } when.sent("value", to: channel) { print("sent value") } when.sent("value", to: fallibleChannel) { print("sent value") } when.sent(Error(), to: fallibleChannel) { print("threw error") } when.timedOut(1.second.fromNow()) { print("timeout") } when.otherwise { print("default case") } }
You can disable a channel selection by turning it to nil
var channelA: Channel<String>? = Channel<String>() var channelB: Channel<String>? = Channel<String>() if random(0...1) == 0 { channelA = nil print("disabled channel a") } else { channelB = nil print("disabled channel b") } co { channelA?.send("a") } co { channelB?.send("b") } sel { when in when.received(valueFrom: channelA) { value in print("received \(value) from channel a") } when.received(valueFrom: channelB) { value in print("received \(value) from channel b") } }
Another way to disable a channel selection is to simply put its case inside an if statement.
let channelA = Channel<String>() let channelB = Channel<String>() co(channelA.send("a")) co(channelB.send("b")) select { when in if random(0...1) == 0 { print("disabled channel b") when.received(valueFrom: channelA) { value in print("received \(value) from channel a") } } else { print("disabled channel a") when.received(valueFrom: channelB) { value in print("received \(value) from channel b") } } }
forSelect
A lot of times we need to wrap our select inside a while loop. To make it easier to work with this pattern we can use forSelect
. forSelect
will loop until you call done()
.
func flipCoin(outcome: FallibleChannel<String>) { if random(0...1) == 0 { outcome.send("Success") } else { outcome.send(Error(description: "Something went wrong")) } } let outcome = FallibleChannel<String>() co(flipCoin(outcome)) forSelect { when, done in when.received(resultFrom: outcome) { result in result.success { value in print(value) done() } result.failure { error in print("\(error). Retrying...") co(flipCoin(results)) } } }
Timer
Timer
sends to its channel when it expires.
let timer = Timer(deadline: 2.second.fromNow()) co { timer.channel.receive() print("Timer expired") } if timer.stop() { print("Timer stopped") }
Ticker
Ticker
sends current time to its channel periodically until stopped.
let ticker = Ticker(period: 500.milliseconds) co { for time in ticker.channel { print("Tick at \(time)") } } after(2.seconds) { ticker.stop() }
poll
poll
polls a file descriptor for reading or writing optionally timing out if the file descriptor is not ready before the given deadline.
do { // yields to other coroutines if fd not ready try poll(fileDescriptor, for: .writing, timingOut: 5.seconds.fromNow()) // runs when fd is ready fileDescriptor.write(data) } catch { // throws in case of timeout or polling error }
Examples
The examples 01-15 were taken from gobyexample and translated from Go to Swift using Venice.
01 - Coroutines
A coroutine is a lightweight thread of execution.
func f(from: String) { for i in 0 ..< 4 { print("\(from): \(i)") yield } }
Suppose we have a function call f(s)
. Here's how we'd call
that in the usual way, running it synchronously.
f("direct")
To invoke this function in a coroutine, use co(f(s))
. This new
coroutine will execute concurrently with the calling one.
co(f("coroutine"))
You can also start a coroutine with a closure.
co {
print("going")
}
Our two function calls are running asynchronously in separate coroutines now, so execution falls through to here. We wait 1 second before the program exits
nap(for: 1.second) print("done")
When we run this program, we see the output of the blocking call first, then the interleaved output of the two coroutines. This interleaving reflects the coroutines being run concurrently by the runtime.
Output
direct: 0
direct: 1
direct: 2
direct: 3
coroutine: 0
going
coroutine: 1
coroutine: 2
coroutine: 3
done
02 - Channels
Channels are the pipes that connect concurrent coroutines. You can send values into channels from one coroutine and receive those values into another coroutine.
Create a new channel with Channel
.
Channels are typed by the values they convey.
let messages = Channel<String>()
Send a value into a channel using the channel.send(value)
syntax. Here we send "ping"
to the messages
channel we made above, from a new coroutine.
co(messages.send("ping"))
The channel.receive()
syntax receives a value from the
channel. Here we'll receive the "ping"
message
we sent above and print it out.
let message = messages.receive() print(message!)
When we run the program the "ping" message is successfully passed from one coroutine to another via our channel. By default sends and receives block until both the sender and receiver are ready. This property allowed us to wait at the end of our program for the "ping" message without having to use any other synchronization.
Values received from channels are Optional
s. If you try to get a value from a closed channel with no values left in the buffer, it'll return nil
. If you are sure that there is a value wraped in the Optional
, you can use the !
operator, to force unwrap the optional.
Output
ping
03 - Channel Buffering
By default channels are unbuffered, meaning that they
will only accept values (channel.send(value)
) if there is a
corresponding receive (let value = channel.receive()
) ready to receive the
value sent by the channel. Buffered channels accept a limited
number of values without a corresponding receiver for
those values.
Here we make a channel of strings buffering up to 2 values.
let messages = Channel<String>(bufferSize: 2)
Because this channel is buffered, we can send these values into the channel without a corresponding concurrent receive.
messages.send("buffered") messages.send("channel")
Later we can receive these two values as usual.
print(messages.receive()!) print(messages.receive()!)
Output
buffered
channel
04 - Channel Synchronization
We can use channels to synchronize execution across coroutines. Here's an example of using a blocking receive to wait for a coroutine to finish.
This is the function we'll run in a coroutine. The
done
channel will be used to notify another
coroutine that this function's work is done.
func worker(done: Channel<Void>) { print("working...") nap(for: 1.second) print("done") done.send() // Send to notify that we're done. }
Start a worker coroutine, giving it the channel to notify on.
let done = Channel<Bool>(bufferSize: 1) co(worker(done))
Block until we receive a notification from the worker on the channel.
done.receive()
If you remove the done.receive()
line from this program, the program would
exit before the worker even started.
Output
working...
done
05 - Channel Directions
When using channels as function parameters, you can specify if a channel is meant to only send or receive values. This specificity increases the type-safety of the program.
This ping
function only accepts a channel that receives
values. It would be a compile-time error to try to
receive values from this channel.
func ping(pings: SendingChannel<String>, message: String) { pings.send(message) }
The pong
function accepts one channel that only sends values
(pings
) and a second that only receives values (pongs
).
func pong(pings: ReceivingChannel<String>, _ pongs: SendingChannel<String>) { let message = pings.receive()! pongs.send(message) } let pings = Channel<String>(bufferSize: 1) let pongs = Channel<String>(bufferSize: 1) ping(pings.sendingChannel, message: "passed message") pong(pings.receivingChannel, pongs.sendingChannel) print(pongs.receive()!)
Output
passed message
06 - Select
Select lets you wait on multiple channel operations. Combining coroutines and channels with select is an extremely powerful feature.
For our example we'll select across two channels.
let channel1 = Channel<String>() let channel2 = Channel<String>()
Each channel will receive a value after some amount of time, to simulate e.g. blocking RPC operations executing in concurrent coroutines.
after(1.second) { channel1.send("one") } after(2.seconds) { channel2.send("two") }
We'll use select
to await both of these values
simultaneously, printing each one as it arrives.
for _ in 0 ..< 2 { select { when in when.received(valueFrom: channel1) { message1 in print("received \(message1)") } when.received(valueFrom: channel2) { message2 in print("received \(message2)") } } }
We receive the values "one"
and then "two"
as expected.
Note that the total execution time is only ~2 seconds since
both the 1 and 2 second nap
s execute concurrently.
Output
received one
received two
07 - Timeouts
Timeouts are important for programs that connect to
external resources or that otherwise need to bound
execution time. Implementing timeouts is easy and
elegant thanks to channels and select
.
For our example, suppose we're executing an external
call that returns its result on a channel channel1
after 2s.
let channel1 = Channel<String>(bufferSize: 1) after(2.seconds) { channel1.send("result 1") }
Here's the select
implementing a timeout.
received(resultFrom: channel1)
awaits the result and timeout(1.second.fromNow())
awaits a value to be sent after the timeout of
1s. Since select
proceeds with the first
receive that's ready, we'll take the timeout case
if the operation takes more than the allowed 1s.
select { when in when.received(resultFrom: channel1) { result in print(result) } when.timedOut(1.second.fromNow()) { print("timeout 1") } }
If we allow a longer timeout of 3s, then the receive
from channel2
will succeed and we'll print the result.
let channel2 = Channel<String>(bufferSize: 1) after(2.seconds) channel2.send("result 2") } select { when in when.received(resultFrom: channel2) { result in print(result) } when.timedOut(3.seconds.fromNow()) { print("timeout 2") } }
Running this program shows the first operation timing out and the second succeeding.
Using this select timeout pattern requires communicating results over channels. This is a good idea in general because other important features are based on channels and select. We’ll look at two examples of this next: timers and tickers.
Output
timeout 1
result 2
08 - Non-Blocking Channel Operations
Basic sends and receives on channels are blocking.
However, we can use select
with a otherwise
clause to
implement non-blocking sends, receives, and even
non-blocking multi-way select
s.
let messages = Channel<String>() let signals = Channel<Bool>()
Here's a non-blocking receive. If a value is
available on messages
then select
will take
the received(valueFrom: messages)
case with that value. If not
it will immediately take the otherwise
case.
select { when in when.received(valueFrom: messages) { message in print("received message \(message)") } when.otherwise { print("no message received") } }
A non-blocking send works similarly.
let message = "hi" select { when in when.sent(message, to: messages) { print("sent message \(message)") } when.otherwise { print("no message sent") } }
We can use multiple cases above the otherwise
clause to implement a multi-way non-blocking
select. Here we attempt non-blocking receives
on both messages
and signals
.
select { when in when.received(valueFrom: messages) { message in print("received message \(message)") } when.received(valueFrom: signals) { signal in print("received signal \(signal)") } when.otherwise { print("no activity") } }
Output
no message received
no message sent
no activity
09 - Closing Channels
Closing a channel indicates that no more values can be sent to it. This can be useful to communicate completion to the channel's receivers.
In this example we'll use a jobs
channel to
communicate work to be done to a worker coroutine. When we have no more jobs for
the worker we'll close
the jobs
channel.
let jobs = Channel<Int>(bufferSize: 5) let done = Channel<Void>()
Here's the worker coroutine. It repeatedly receives
from jobs
with j = jobs.receive()
. The return value
will be nil
if jobs
has been close
d and all
values in the channel have already been received.
We use this to notify on done
when we've worked
all our jobs.
co { while true { if let job = jobs.receive() { print("received job \(job)") } else { print("received all jobs") done.send() return } } }
This sends 3 jobs to the worker over the jobs
channel, then closes it.
for job in 1...3 { print("sent job \(job)") jobs.send(job) } jobs.close() print("sent all jobs")
We await the worker using the synchronization approach we saw earlier.
done.receive()
The idea of closed channels leads naturally to our next example: iterating over channels.
Output
sent job 1
received job 1
sent job 2
received job 2
sent job 3
received job 3
sent all jobs
received job 3
received all jobs
10 - Iterating Over Channels
We can use for in
to iterate over
values received from a channel.
We'll iterate over 2 values in the queue
channel.
let queue = Channel<String>(bufferSize: 2) queue.send("one") queue.send("two") queue.close()
This for in
loop iterates over each element as it's
received from queue
. Because we close
d the
channel above, the iteration terminates after
receiving the 2 elements. If we didn't close
it
we'd block on a 3rd receive in the loop.
for element in queue { print(element) }
This example also showed that it’s possible to close a non-empty channel but still have the remaining values be received.
Output
one
two
11 - Timers
We often want to execute code at some point in the future, or repeatedly at some interval. Timer and ticker features make both of these tasks easy. We'll look first at timers and then at tickers.
Timers represent a single event in the future. You tell the timer how long you want to wait, and it provides a channel that will be notified at that time. This timer will wait 2 seconds.
let timer1 = Timer(deadline: 2.seconds.fromNow())
The timer1.channel.receive()
blocks on the timer's channel
until it sends a value indicating that the timer
expired.
timer1.channel.receive() print("Timer 1 expired")
If you just wanted to wait, you could have used
nap
. One reason a timer may be useful is
that you can cancel the timer before it expires.
Here's an example of that.
let timer2 = Timer(deadline: 1.second.fromNow()) co { timer2.channel.receive() print("Timer 2 expired") } if timer2.stop() { print("Timer 2 stopped") }
The first timer will expire ~2s after we start the program, but the second should be stopped before it has a chance to expire.
Output
Timer 1 expired
Timer 2 stopped
12 - Tickers
Timers are for when you want to do something once in the future - tickers are for when you want to do something repeatedly at regular intervals. Here's an example of a ticker that ticks periodically until we stop it.
Tickers use a similar mechanism to timers: a
channel that is sent values. Here we'll use the
iterator
builtin on the channel to iterate over
the values as they arrive every 500ms.
let ticker = Ticker(period: 500.milliseconds) co { for time in ticker.channel { print("Tick at \(time)") } }
Tickers can be stopped like timers. Once a ticker is stopped it won't receive any more values on its channel. We'll stop ours after 1600ms.
nap(for: 1600.milliseconds) ticker.stop() print("Ticker stopped")
When we run this program the ticker should tick 3 times before we stop it.
Output
Tick at 37024098
Tick at 37024599
Tick at 37025105
Ticker stopped
13 - Worker Pools
In this example we'll look at how to implement a worker pool using coroutines and channels.
Here's the worker, of which we'll run several
concurrent instances. These workers will receive
work on the jobs
channel and send the corresponding
results on results
. We'll sleep a second per job to
simulate an expensive task.
func worker(id: Int, jobs: Channel<Int>, results: Channel<Int>) { for job in jobs { print("worker \(id) processing job \(job)") nap(for: 1.second) results.send(job * 2) } }
In order to use our pool of workers we need to send them work and collect their results. We make 2 channels for this.
let jobs = Channel<Int>(bufferSize: 100) let results = Channel<Int>(bufferSize: 100)
This starts up 3 workers, initially blocked because there are no jobs yet.
for workerId in 1...3 { co(worker(workerId, jobs: jobs, results: results)) }
Here we send 9 jobs
and then close
that
channel to indicate that's all the work we have.
for job in 1...9 { jobs.send(job) } jobs.close()
Finally we collect all the results of the work.
for _ in 1...9 { results.receive() }
Our running program shows the 9 jobs being executed by various workers. The program only takes about 3 seconds despite doing about 9 seconds of total work because there are 3 workers operating concurrently.
Output
worker 1 processing job 1
worker 2 processing job 2
worker 3 processing job 3
worker 1 processing job 4
worker 2 processing job 5
worker 3 processing job 6
worker 1 processing job 7
worker 2 processing job 8
worker 3 processing job 9
14 - Rate Limiting
Rate limiting is an important mechanism for controlling resource utilization and maintaining quality of service. Venice elegantly supports rate limiting with coroutines, channels, and tickers.
First we'll look at basic rate limiting. Suppose we want to limit our handling of incoming requests. We'll serve these requests off a channel of the same name.
var requests = Channel<Int>(bufferSize: 5) for request in 1...5 { requests.send(request) } requests.close()
This limiter
channel will receive a value
every 200 milliseconds. This is the regulator in
our rate limiting scheme.
let limiter = Ticker(period: 200.milliseconds)
By blocking on a receive from the limiter
channel
before serving each request, we limit ourselves to
1 request every 200 milliseconds.
for request in requests { limiter.channel.receive() print("request \(request) \(now)") } print("")
We may want to allow short bursts of requests in
our rate limiting scheme while preserving the
overall rate limit. We can accomplish this by
buffering our limiter channel. This burstyLimiter
channel will allow bursts of up to 3 events.
let burstyLimiter = Channel<Int64>(bufferSize: 3)
Fill up the channel to represent allowed bursting.
for _ in 0 ..< 3 { burstyLimiter.send(now) }
Every 200 milliseconds we'll try to add a new
value to burstyLimiter
, up to its limit of 3.
co { for time in Ticker(period: 200.milliseconds).channel { burstyLimiter.send(time) } }
Now simulate 5 more incoming requests. The first
3 of these will benefit from the burst capability
of burstyLimiter
.
let burstyRequests = Channel<Int>(bufferSize: 5) for request in 1... 5 { burstyRequests.send(request) } burstyRequests.close() for request in burstyRequests { burstyLimiter.receive() print("request \(request) \(now)") }
Running our program we see the first batch of requests handled once every ~200 milliseconds as desired.
For the second batch of requests we serve the first 3 immediately because of the burstable rate limiting, t