Golang like Done channel in Rustlang

KrishnaKumar
FAUN — Developer Community 🐾
5 min readAug 15, 2023

--

Golang channels are used to establish communication between go routines. The same channels are also used to inform Done state of a go routine.

Here is how we typically do it in Golang if you have to spin off a bunch of go routines and stop/conclude them using a dedicated Done channel

package main

import (
"sync"
"time"
)

func main() {
// Initialise a done channel
doneChan := make(chan bool)
var wg sync.WaitGroup

// Creates 10 goroutines
for i := 0; i < 10; i++ {
wg.Add(1)
go waitOnDone(i, &wg, doneChan)
}

// Wait for 10 seconds and close the doneChan
time.Sleep(time.Second * 10)
close(doneChan)

// main program to wait until all go routines are closed
wg.Wait()
}

func waitOnDone(idx int, wg *sync.WaitGroup, doneChan chan bool) {
defer wg.Done()
for {
// Do some computation here
select {
// If its able to read, it means channel is closed
case <-doneChan:
println("received done on: ", idx)
return
}
}
}

Here are the couple of things about Golang that made this possible:

1. when a channel is closed it can be read at least once by all the waiting go routines on the channel.

2. A select statement can read from multiple channels at once.

Usually a separate channel can be used to signal end of a go routine. This channel is commonly referred as Done channel, although its not a specific type but the name itself.

The Rust way

Rust also has channels as part of the standard library. But I chose to use the channels from crossbeam crate.

There are two distinct ways to read data from a Rust channel. One is to use recv() method and other one is try_recv() method. The first one is blocking call while the second one is non-blocking and returns immediately with an empty error value when there is nothing sent on the channel.

Would Rust allow you to read a single done channel from multiple threads when tx is dropped/closed? The answer is Yes.

Unlike Golang, Rust do not have in built support to read from multiple channels at once. This can be achieved in the following way.

Sequentially calling try_recv() methods for different channels ignoring the empty error on each call would effectively simulate what select does in Golang.

extern crate crossbeam;

use std::thread;
use std::time;

fn main() {

// done channel
let (done_tx, done_rx) = crossbeam::channel::unbounded::<()>();
let mut handles = vec![];

// some other compute channel which you would use for business logic
let (compute_tx, compute_rx) = crossbeam::channel::unbounded::<u8>();

// Spin off 10 worker threads
for i in 0..10 {
let done_rx_clone = done_rx.clone();
let compute_rx_clone = compute_rx.clone();

let h = thread::spawn(move || {
loop {

// Try checking on channel for disconnection
match done_rx_clone.try_recv() {
Err(e) => {
match e {
crossbeam::channel::TryRecvError::Empty => {},
crossbeam::channel::TryRecvError::Disconnected => {
println!("looks like done on thread {}", i);
break;
},
}
},
_ => {}
}

// Try checking on compute channel if there is any data to process
match compute_rx_clone.try_recv() {
Ok(v) => {
println!("received value on thread {}: {}", i, v)
},
Err(e) => {
match e {
crossbeam::channel::TryRecvError::Empty => {},
crossbeam::channel::TryRecvError::Disconnected => {
println!("err on compute channel {}: {}", i, e);
break;
},
}
},
}

}
});

handles.push(h);
}


// Drop done_tx after certain time to emulate done channel
let _wh = thread::spawn(move || {
thread::sleep(time::Duration::from_secs(10));
println!("sending signal");
drop(done_tx);
});

// Send some data on compute_tx
let _ch = thread::spawn(move || {
loop {
thread::sleep(time::Duration::from_secs(2));
compute_tx.send(u8::MAX).unwrap();
}
});

// Wait for all handles to finish
for h in handles.into_iter() {
h.join().expect("error joining thread");
}

println!("all threads handled");
}

Beware of tight loops

Would non blocking read from multiple channels without a delay result in an effect of on infinite while loop? Let us find out.

When I ran the above program on my Mac, CPU shot up to ~1000% ⏫ consistently. Here is a snippet from top command

rust-chan process showing 1000% cpu 😮

This CPU spike seems to happen as we are doing nothing when the channel is empty error resulting in an infinite tight loop.

// Try checking on compute channel if there is any data to process
match compute_rx_clone.try_recv() {
Ok(v) => {
println!("received value on thread {}: {}", i, v)
},
Err(e) => {
match e {
crossbeam::channel::TryRecvError::Empty => {},
crossbeam::channel::TryRecvError::Disconnected => {
println!("err on compute channel {}: {}", i, e);
break;
},
}
},
}

While I don’t have a perfect answer for this. A quirk helped to resolve this. By adding a sleep on receiving an empty error on the compute channel brought down the CPU utilisation to almost ~ 0.1%

rust-chan process cpu brought down 😃

Here is the updated snippet where the sleep is added.

// Try checking on compute channel if there is any data to process
match compute_rx_clone.try_recv() {
Ok(v) => {
println!("received value on thread {}: {}", i, v)
},
Err(e) => {
match e {
crossbeam::channel::TryRecvError::Empty => {
// sleep for 1 second on empty channel
thread::sleep(time::Duration::from_secs(1))
},
crossbeam::channel::TryRecvError::Disconnected => {
println!("err on compute channel {}: {}", i, e);
break;
},
}
},
}

You should have noticed that adding sleep to one of the try_recv() reduced the CPU significantly. You wouldn’t have to do it for the other channel as the tight loop is already broken.

Thoughts to ponder

Is there a better way to do this?

I am not sure, this has worked for me so far.

Can you avoid sleep on compute channel?

You can bring down the sleep to a reasonable value based on your need. As long it breaks the tight loop, you are good.

Can we use the same compute channel for the purpose checking for Done without a need for dedicated channel?

Of course you could do that too as long you as you have ability to drop all references to compute_tx, otherwise stick to a dedicated done channel as described and keep it consistent across app.

Have I explored select! from crossbeam?

Not yet.

👋 If you find this helpful, please click the clap 👏 button below a few times to show your support for the author 👇

🚀Join FAUN Developer Community & Get Similar Stories in your Inbox Each Week

--

--

Go, Rust, Erlang, Java at work || Blogging, Open source tools, Embedded systems, Industrial robotics as hobby