Concurrency
Fibers, channels, and select
Lira's concurrency is Go-shaped: lightweight fibers that communicate over typed channels, coordinated with select. No async keywords, no callbacks — just sequential code on many fibers.
Concurrency in Lira is built on three things you already saw on the landing page: fibers (cheap green threads scheduled by the VM), channels (typed conduits that move values between fibers), and select (the construct that waits on several channel operations and runs whichever is ready). Put together, they let you write programs that do several things at once without locks-by-default or shared-memory guesswork.
Everything on this page is a real .li file in the repository,
verified to run with lira run before the site is built.
Spawning fibers
A fiber is started with spawn, which takes a
function call — not a block. The named function runs on a fresh
fiber; the spawning fiber continues immediately. The VM schedules fibers
cooperatively, and fiber_yield() hands control to other
runnable fibers.
// spawn takes a function CALL and runs it on a new fiber.
// Fibers are lightweight green threads scheduled by the VM.
fn greet(name: string) {
println("hello from ${name}")
}
fn main() {
spawn greet("fiber A")
spawn greet("fiber B")
// The main fiber yields so the spawned fibers get a turn to run.
fiber_yield()
fiber_yield()
println("main is done")
}Channels
Channels carry typed values from one fiber to another. Create one with
chan(n) for a buffered channel that holds up to
n values, or chan() for an unbuffered channel
where each send rendezvous-pairs with a receive. A function that accepts a
channel declares its element type as Channel<T>.
// Channels carry typed values between fibers. A function that receives a
// channel declares its element type with Channel<T>.
fn produce(out: Channel<int>) {
send(out, 1)
send(out, 2)
send(out, 3)
}
fn main() {
// chan(n) is buffered (holds up to n values); chan() is unbuffered.
let ch = chan(3)
spawn produce(ch)
// Bind each received value with a select arm (see below for why).
var seen = 0
while seen < 3 {
select {
n = <-ch => {
println("recv ${n}")
seen = seen + 1
}
}
}
}
The basic operations are free functions: send(ch, v) puts a
value in, recv(ch) takes one out (blocking until one is
available), and close(ch) closes the channel. For the type
rules behind Channel<T>, see
Types.
Why receiving uses select
Here is the one gotcha worth internalizing early.
recv(ch) blocks until a value arrives — but
as an expression it evaluates to an ok-bool
(true while the channel is open), not to the value you received. So
let v = recv(ch) binds a boolean, not your data.
// recv(ch) BLOCKS until a value is available -- but as an EXPRESSION it
// evaluates to an ok-bool (true while the channel is open), not the value.
// So you cannot bind the received value this way:
fn main() {
let ch = chan(1)
send(ch, 99)
let ok = recv(ch) // ok is `true`, NOT 99
println("recv returned: ${ok}")
// To BIND the value, use a select arm -- that is the idiom.
send(ch, 99)
select {
v = <-ch => println("v is ${v}")
}
}
To actually bind a received value, use a select
receive arm: v = <-ch => …. That is the idiom — and it
scales naturally to waiting on more than one channel at a time.
select
select waits on a set of channel operations and runs the
first arm that becomes ready. It supports three kinds of arm:
-
Receive arm —
v = <-ch => …binds the value received fromchtov. -
Send arm —
x -> ch => …sends the in-scope valuexintoch. -
Default arm —
_ => …runs immediately if no other arm is ready, making the wholeselectnon-blocking.
// select waits on several channel operations at once and runs the first
// arm that is ready. It supports three kinds of arm:
// v = <-ch => receive arm (binds the received value to v)
// x -> ch => send arm (sends the in-scope value x into ch)
// _ => default arm (runs immediately if nothing else is ready)
fn main() {
let ch = chan(1)
// Nothing has been sent yet, so the default arm fires: select is
// non-blocking when a default arm is present.
select {
v = <-ch => println("got ${v}")
_ => println("nothing ready yet")
}
let payload = 7
select {
payload -> ch => println("sent ${payload}")
_ => println("send would block")
}
// Now the value is buffered, so the receive arm wins.
select {
v = <-ch => println("got ${v}")
_ => println("nothing ready yet")
}
}
Without a default arm, select blocks until one of its arms
can proceed. With one, it polls and falls through — handy for
"do work if there's work, otherwise move on" loops.
A worker pool
These pieces compose into the classic producer/consumer pattern. Below, a
single producer fills a jobs channel, three worker fibers
pull jobs and fan results back through a results channel, and
a WaitGroup lets main wait until every worker
has drained the queue. Each worker uses a default arm to retire once no
jobs remain.
// A worker pool: one producer fills a jobs channel, three workers pull jobs,
// square them, and fan results back in. A WaitGroup tells main when every
// worker has drained the queue.
import std.sync
fn worker(jobs: Channel<int>, results: Channel<int>, wg: WaitGroup) {
var running = true
while running {
select {
j = <-jobs => send(results, j * j)
_ => running = false // no jobs left: this worker retires
}
}
wg.done()
}
fn main() {
let jobs = chan(8)
let results = chan(8)
let wg = new_wait_group()
var i = 1
while i <= 8 {
send(jobs, i)
i = i + 1
}
spawn worker(jobs, results, wg)
spawn worker(jobs, results, wg)
spawn worker(jobs, results, wg)
wg.wait(3) // block until all three workers signal done
var total = 0
var got = 0
while got < 8 {
select {
r = <-results => {
total = total + r
got = got + 1
}
}
}
println("sum of squares 1..8 = ${total}")
}The sum of the squares of 1 through 8 is 204 — and because work is handed out over a channel, you can add or remove workers without touching the producer or the result-collection loop.
Deadlock detection
If every fiber ends up blocked with no way to make progress, the scheduler detects it and stops with a located error instead of hanging forever. Here a single fiber waits on an unbuffered channel that no one will ever send to:
// @expect-error
// The scheduler detects deadlock: if every fiber is blocked with no way to
// make progress, the program halts with a located error instead of hanging.
fn main() {
let ch = chan() // unbuffered: a send needs a matching receiver
// No other fiber is running, so this recv can never be satisfied.
let ok = recv(ch)
println("unreachable: ${ok}")
}Running it reports the deadlock at the offending line and column:
$ lira run deadlock.li 8:5: deadlock: all fibers are blocked
Like the rest of Lira's diagnostics, deadlock errors carry a
line:column location — see
Patterns for more on how the language reports
problems precisely.
Shared state with std.sync
Channels are the preferred way to coordinate, but sometimes you genuinely
want shared mutable state. import std.sync provides a small
set of primitives for that. The mutexes are typed:
IntMutex and StringMutex, each with
lock / unlock / with.
The example below proves real mutual exclusion: two fibers each increment
a shared IntMutex a thousand times, and the total comes out
to exactly 2000 — no lost updates.
// std.sync gives you shared-state primitives. Here two fibers each increment
// a shared IntMutex 1000 times; mutual exclusion guarantees no lost updates,
// so the total is exactly 2000.
import std.sync
fn worker(m: IntMutex, wg: WaitGroup, iters: int) {
var i = 0
while i < iters {
let v = m.lock() // lock returns the current value
m.unlock(v + 1) // unlock writes the new value back
i = i + 1
}
wg.done()
}
fn main() {
let m = new_int_mutex(0)
let wg = new_wait_group()
let iters = 1000
spawn worker(m, wg, iters)
spawn worker(m, wg, iters)
wg.wait(2)
let total = m.lock()
m.unlock(total)
println("total: ${total}")
println("expected: ${2 * iters}")
}
std.sync also offers a WaitGroup
(done() to signal completion, wait(n) to block
until n signals arrive — used above to join the workers) and
a Semaphore (acquire() / release())
to bound how many fibers enter a section at once. Here eight workers share
a Semaphore(2), and the observed maximum concurrency stays at
2:
// A Semaphore bounds how many fibers run a section at once. Eight workers
// share a Semaphore(2): each one bumps an "active" counter on entry and
// records the running maximum, so we can prove concurrency never exceeds 2.
import std.sync
fn worker(sem: Semaphore, active: IntMutex, maxseen: IntMutex, wg: WaitGroup) {
sem.acquire() // blocks if 2 permits are already taken
let now = active.lock() + 1
active.unlock(now)
let mx = maxseen.lock()
if now > mx { maxseen.unlock(now) } else { maxseen.unlock(mx) }
fiber_yield() // give peers a chance to overlap
fiber_yield()
let a = active.lock()
active.unlock(a - 1)
sem.release()
wg.done()
}
fn main() {
let limit = 2
let sem = new_semaphore(limit)
let active = new_int_mutex(0)
let maxseen = new_int_mutex(0)
let wg = new_wait_group()
var i = 0
while i < 8 {
spawn worker(sem, active, maxseen, wg)
i = i + 1
}
wg.wait(8)
let mx = maxseen.lock()
maxseen.unlock(mx)
println("limit: ${limit}")
println("max concurrent: ${mx}")
}What is not implemented yet
Fibers, channels, and select — coordinated by
std.sync where you need shared state — are the whole model
today, and it is intentionally Go-shaped. Some things you might expect
from other languages are not part of Lira yet:
Next, see how channel and generic types are checked in Types, how results arriving over channels can be destructured in Patterns, or what ships in the standard library. New to the language? Start with the guide.