pychanasync

pychanasync is a lightweight python package which brings Go-style channels to python's asyncio concurrency world. It is an async-channel implementation, providing a channel shaped tool for channel shaped problems.

It allows for safe, easy and efficient communication between coroutines scheduled on the asyncio event loop by providing a medium through which items/messages can be pushed into from one end and pulled from the other sequentially as opposed to sharing memory.

pychanasync is implemented entirely around pythons asyncio event loop. The implementation is lock free ,taking advantage of the single threaded cooperative concurrency model.

It is designed and implemented to work with coroutines and not threads, providing safe and deterministic communication patterns without blocking the event loop.

pychanasync is built on the idea;

"Don't communicate by sharing memory; share memory by communicating"


The point was to implement a package tool that allow coroutines to exchange messages instead of going the shared mutable state and locks route.

So if you are working on async program in python which fits the producer-consumer pattern or need a channel shaped solution, pychanasync is highly recommended.

It makes async programs cleaner and easy to reason about!


As mentioned , pychanasync is inspired by channels in Go but blends it with Pythonic conventions. It provides clean, high level abstractions to pass messages between coroutines/tasks and other features that feel naturally python.

  • Buffered and unbuffered channel semantics - use either synchronous or buffered communication
  • Async iteration over channels - Consume messages from a channel using async for loops.
  • Context manager support - close channels and release resources when done with async with.
  • Blocking/ awaitable operations - await chan.push(value) and await chan.pull() for safe, cooperative communication.
  • Non-blocking operations - chan.push_nowait(value) and chan.pull_nowait() for buffered channels when you don’t want to suspend.
  • Select-like utility - wait on multiple channel operations concurrently, similar to Go’s select statement, in a clean and Pythonic way

Installation

pychanasync is available on PyPi

pip install pychanasync

or you can install it from source

git clone https://github.com/Gwali-1/PY_CHANNELS_ASYNC
cd PY_CHANNELS_ASYNC
pip install -e .

Quickstart

Channels can be both buffered and unbuffered.

unbuffered channels have no internal buffer capacity. What this means is every producer (push) will block/suspend until there is a ready consumer on the other end of the channel (pull) and every consumer until there is a ready producer on the other end of the channel.

Communication happens synchronously. Both sender and receiver must be present until any operation is completed.

This is great in scenarios where you want to properly synchronize operation of two components. ensuring one can only proceed if the other acknowledges them.

from pychanasync import channel

#create unbuffered channel
ch = Channel()

# send
async ch.push("item") #blocks here 

# receive
value = async ch.pull()

buffered channels have an internal buffer capacity and can hold (N) number of items at a time. When doing a push into a buffered channel, the operation will only block when the buffer is full and until there is available space to send the new item. Other than that the operation completes and returns quickly.

On the other hand when you pull from a buffered channel , the operation will only block or be suspended when the buffer is empty, until there are items available in the buffer. Other than that the operation completes and returns and item from the channel quickly.

Here, unlike unbuffered channels , both senders and receivers don't have to be in sync. The communication is asynchronous up to the buffers capacity limit (N).

This is great in scenarios for example when you want a smooth outburst of work, decoupling producer and consumer speed.

Below is a buffered channel that can hold 300 items at a time.

from pychanasync import channel

ch = Channel(buffer=300)

# send
async ch.push("item")

# receive
value = async ch.pull()

Basic consumer-producer example

import asyncio
from pychanasync import Channel

async def producer(ch):
    for i in range(3):
        await ch.push(f"msg {i}")
        print(f"Sent msg {i}")
    ch.close()  # gracefully close when done

async def consumer(ch):
    while True:
        try:
            msg = await ch.pull()
            print(f"Received {msg}")
        except Channel.Closed:
            break

async def main():
    ch = Channel(buffer=2)
    await asyncio.gather(producer(ch), consumer(ch))

asyncio.run(main())

The code above follows typical structure of asynchronous code in asyncio python. Here we try to implement a simple producer which is a coroutine function that that loops and sends a message into a buffered channel. We have another coroutine function which continuously reads from the buffered channel until it is closed. Both coroutines are scheduled to run on the event-loop using asyncio.gather

It’s clear how pychanasync makes this pattern easy to implement, allowing coroutines to remain decoupled in their execution while still communicating seamlessly in a clean and concise way.

One thing worth noting in this example is after the producer pushes the second item , it waits until the consumer pulls an item before continuing . it does this seamlessly by suspending and resuming on the event loop in a cooperative manner just as a task is supposed to behave . pychanasync does not get in the way of the event loop.

You can find more practical code examples in the pychanasync GitHub repository.
It includes implementations of several concurrency patterns from Rob Pike’s talk.

These examples demonstrate how to model real-world coroutine coordination problems using pychanasync such as fan-in , Generator pattern etc. Check it out.

Features

Async Iteration

pychanasync supports async iteration, allowing you to consume items from a channel in a clean way using async for loop.

We can rewrite our consumer above as

async def consumer(ch):
    async for msg in ch:
        print(f"Received: {msg}")

Once the producer closes the channel, the iteration ends .

Context manager support

pychanasync has support for asynchronous context managers for automatic cleanup.

We can rewrite out producer component as

async def producer(channel):
  async with channel  as ch:
    for i in range(3):
        await ch.push(f"msg {i}")
        print(f"Sent msg {i}")
When the async-with block exits, the channel is closed automatically.

chanselect

The chanselect utility method allows you to start and wait on multiple channel operations simultaneously, returning the one that completes first.

It behaves similarly to Go’s select statement.

The chanselect function takes one or more tuples, each containing a channel and a channel operation (such as chan.push(value) or chan.pull()).

It concurrently waits on all provided operations and returns the first one to complete.

The function returns a tuple depending on the type of operation that finished first:

  • For pull operation, it returns (channel, value).
  • For push operation, it returns (channel, None).

synthax

    chan, value = await chanselect(
        (chan_a, chan_a.pull()),
        (chan_b, chan_b.pull())
    )

Example

import asyncio
from pychanasync import Channel, chanselect

async def wait_and_push(chan, delay, item):
    await asyncio.sleep(delay)
    await chan.push(item)

async def main():
    chan_a = Channel(bound=2)
    chan_b = Channel(bound=2)
    chan_c = Channel(bound=2)

    asyncio.create_task(wait_and_push(chan_a, 0.01, "item_a"))
    asyncio.create_task(wait_and_push(chan_b, 0.05, "item_b"))
    asyncio.create_task(wait_and_push(chan_c, 0.02, "item_c"))

    chan, value = await chanselect(
        (chan_a, chan_a.pull()),
        (chan_b, chan_b.pull()),
        (chan_c, chan_c.pull())
    )

    if chan = chan_a:
        print(f"{value} was received from chan a ")

    if chan = chan_b:
        print(f"{value} was received from chan b ")

    if chan = chan_c:
        print(f"{value} was received from chan c ")

asyncio.run(main())

In the example above 3 channels are created and populated at different times. The chanselect call waits for the first available value among the three. It returns as soon as the first pull operation succeed which in this case is chan_a

Non-blocking channel operations

pychanasync provides non-blocking variants of push and pull on buffered channels. In this case , the coroutine will not block or suspend.

These methods will raise exceptions when the operation cannot proceed immediately.

When you try to send an item with push_nowait into a buffered channel which is full, it raises a ChannelFull exception.

When you try to pull an item with push_nowait from a buffered channel which is empty, it raises a ChannelEmpty exception.

push_nowait

ch = Channel(bound=2)

ch.push_nowait("A")
ch.push_nowait("B")

try:
    ch.push_nowait("C")
except ChannelFull:
    print("Buffer is full — could not push!")

pull_nowait

try:
    value = ch.pull_nowait()
except ChannelEmpty:
    print("Buffer is empty — nothing to receive.")

Channel closing behaviour

Closing the channel signals that no more items can be sent to it or read from it. But what happens to already pending receive or send operations depends on the type of channel.

Buffered channel

When you close a buffered channel, the internal buffer is drained. Any pending readers will receive the items in the buffer at the time of closing. Once the buffer is empty and there are more pending readers , They are woken up/terminated with a ChannelClosed exceptions.

Also, all pending senders , thus those waiting to push into the channel but the buffer was full are terminated with a ChannelClosed exception.

After closing, no new send or receive operations are allowed. Any attempt raises ChannelClosed.

unbuffered channel

For unbuffered channels ie those without a bound , closing the channel immediately terminates all pending senders and receivers with a ChannelClosed

Because there’s no buffer to drain, no additional values are delivered after closing.

And as with buffered channels , no further operations can be performed after channel's closure.

API Reference

await ch.push(val)

Will suspend until item can be sent (or buffer space is available)

await ch.pull()

Will suspend until value is available to be read.

ch.push_nowait(val)

Raises exception if buffer is full (only for buffered channels)

ch.pull_nowait(val)

Raises exception if buffer is empty (only for buffered channels)

ch.close()

Closes the channel and wakes up all waiting tasks/coroutines with pending channel operations.

ch.csize()

Return the number of items in the channel(None for unbuffered).

ch.full()

Returns True if there are maxsize items in the channel.

ch.empty()

Returns True if the channel is empty, False otherwise.

ch.closed

Returns True if the channel is closed.

Contributing

To contribute or set up the project locally.

find the project source code on GitHub

Clone the project

git clone https://github.com/Gwali-1/PY_CHANNELS_ASYNC
cd PY_CHANNELS_ASYNC

Install dependencies

pipenv install --dev

Running tests From the project root

pipenv run pytest

Installing the package locally From the project root

pip install -e .