Coroutines

Coroutines are special functions that have multiple entry points that allow suspending and resuming execution at specified locations. They allow you to:

  • write sequentially flowing code involving potentially blocking tasks (e.g. socket IO) that is actually completely non-blocking
  • “time slice” large, computationally expensive tasks to avoid blocking the mainloop for extended periods of time (which would prevent timers, IO handlers, and other coroutines from running)
  • help solve complex problems involving state without using explicit state machines

In the event where blocking is unavoidable, and the duration of the block is unknown (for example, connecting to a remote host, or scaling a very large image), threads can be used. These two different approaches are unified with a very similar API.

A function or method is designated a coroutine by using the @kaa.coroutine decorator. A coroutine allows a larger tasks to be broken down into smaller ones by yielding control back to the “scheduler” (the notifier), implementing a kind of cooperative multitasking. More usefully, coroutines can yield at points where they may otherwise block on resources (e.g. disk or network), and when the resource becomes available, the coroutine resumes where it left off. Without coroutines, this is typically implemented as a series of smaller callback functions. (For more information on coroutines, see Wikipedia’s treatment of the subject.)

Coroutines return an InProgress object, and the caller can connect a callback to the InProgress object in order to be notified of its return value or any exception, or it can yield the InProgress object from other coroutines.

When a coroutine yields kaa.NotFinished, control is returned to the main loop, and the coroutine will resume after the yield statement at the next main loop iteration, or, if an interval is provided with the decorator, after this time interval. Following the cooperative multitasking analogy, yielding kaa.NotFinished can be thought of as the coroutine releasing a “time slice” so that other tasks may run.

When a coroutine yields any value other than kaa.NotFinished (including None), the coroutine is considered finished and the InProgress returned to the caller will be emitted (i.e. it is finished). As with normal function return values, if no value is explicitly yielded and the coroutine terminates, the InProgress is finished with None.

There is an important exception to the above rule: if the coroutine yields an InProgress object, the coroutine will be resumed when the InProgress object is finished. This allows a coroutine to be “chained” with other InProgress tasks, including other coroutines.

To recap, if a coroutine yields:

  • kaa.NotFinished: control is returned to the main loop so that other tasks can run (such as other timers, I/O handlers, etc.) and resumed on the next main loop iteration.
  • an InProgress object: control is returned to the main loop and the coroutine is resumed when the yielded InProgress is finished. Inside the coroutine, the yield call “returns” the value that InProgress was finished with.
  • any other value: the coroutine terminates, and the InProgress the coroutine returned to the caller is finished with that value (which includes None, if no value was explicitly yielded and the coroutine reaches the end naturally).

Here is a simple example that breaks up a loop into smaller tasks:

import kaa

@kaa.coroutine()
def do_something():
   for i in range(10):
      do_something_expensive()
      yield kaa.NotFinished
   yield 42

def handle_result(result):
   print "do_something() finished with result:", result

do_something().connect(handle_result)
kaa.main.run()

A coroutine can yield other coroutines (or rather, the InProgress object the other coroutine returns):

@kaa.coroutine()
def do_something_else():
   try:
      result = yield do_something()
   except:
      print "do_something failed"
      yield

   yield True if result else False

(Note that the above syntax, in which the yield statement returns a value, was introduced in Python 2.5. kaa.base requires Python 2.5 or later.)

Classes in kaa make heavy use of coroutines and (to a lesser extent) threads when methods would otherwise block on some resource. Both coroutines and @threaded-decorated methods return InProgress objects (well, special subclasses of InProgress objects) and behave identically. These can be therefore yielded from a coroutine in the same way:

@kaa.coroutine()
def fetch_page(host):
    """
    Fetches / from the given host on port 80.
    """
    socket = kaa.Socket()
    # Socket.connect() is implemented as a thread
    yield socket.connect((host, 80))
    # Socket.read() and write() are implemented as single-thread async I/O.
    yield socket.write('GET / HTTP/1.1\n\n')
    print (yield socket.read())

In the above example, the difference between threaded functions (kaa.Socket.connect()) and coroutines (write() and read()) is transparent. Both return InProgress objects. (As an aside, we didn’t really need to yield socket.write() because writes are queued and written to the socket when it becomes writable. However, yielding a write means that when the coroutine resumes, the data has been fully sent to the socket.)

To more clearly see the benefit of implementing the above example as a coroutine, consider the following code, which is rewritten using the more traditional approach of connecting callbacks at the various stages of the task:

def fetch_page(host):
    socket = kaa.Socket()
    socket.connect((host, 80)).connect(finished_connect, socket)

def finished_connect(result, socket):
    socket.write('GET / HTTP/1.1\n\n').connect(finished_write, socket)

def finished_write(len, socket):
    socket.read().connect(finished_read)

def finished_read(data):
    print data

In practice then, coroutines can be seen as an alternative approach to the classic signal/callback pattern, allowing you to achieve the same logic but with a much more intuitive and readable code. This means that if you design your application to use signals and callbacks, it might not be clear where coroutines would be useful.

However, if you make use of the asynchronous plumbing that kaa offers early on in your design – and that includes liberal use of InProgress objects, either explicitly or implicitly through the use of the @coroutine and @threaded decorators – you should find that you’re able to produce some surprisingly elegant, non-trivial code.

Aborting Coroutines

Coroutines that have yielded and are awaiting reentry can be aborted by calling the abort() method on the CoroutineInProgress object returned when invoking a coroutine. Consider this simple case:

@kaa.coroutine()
def delay_print(s):
    yield kaa.delay(5)
    print s

delay_print('Hello Kaa').abort()
kaa.main.run()

When aborted, an InProgressAborted exception will be raised inside the coroutine. If the exception is not caught, then it is raised back to the caller of abort(). In the above example, because delay_print doesn’t catch any exception, abort() will raise. The coroutine can catch the exception and do something suitable:

@kaa.coroutine()
def delay_print(s):
    try:
        yield kaa.delay(5)
    except kaa.InProgressAborted as e:
        # Nothing special needed to abort this.
        pass
    else:
        print s

The InProgressAborted exception object has an inprogress attribute, which will always be the InProgress object of the yielded task (or none if kaa.NotFinished was yielded), and an origin attribute that is the InProgress of the task that abort() was called on.

Provided it is abortable and nothing else but the coroutine being aborted is waiting on it, any InProgress task yielded by a coroutine will be aborted before the exception is raised inside the function. So in the above example, e.inprogress refers to the InProgress object returned by kaa.delay() and e.inprogress.finished will be True.

This cascading abort can be prevented by the coroutine by instead yielding an unabortable version of the InProgress using noabort():

@kaa.coroutine()
def delay_print(s):
    try:
        yield kaa.delay(5).noabort()
    except kaa.InProgressAborted as e:
        print e.inprogress.finished, 'will be false'
    else:
        print s

This means kaa.delay() in the above example will live on even when delay_print is aborted. (Not that it does any good in this contrived example, since the timer will fire and do nothing, but you get the idea.)

Although it can catch the InProgressAborted, the coroutine is still considered aborted and it will not be reentered again. If it attempts to yield a value that suggests it expects reentry (like kaa.NotFinished or an InProgress) then a RuntimeError will be raised. There is nothing a coroutine can do to prevent its own demise. But a coroutine a() that yields another coroutine z() can prevent z() from being aborted when a() is aborted by using noabort().

If a coroutine yields a task that is aborted, then an InProgressAborted will also be raised inside the coroutine whose yielded task was aborted. The origin attribute will indicate the task that was the source of the abort:

@kaa.coroutine(policy=kaa.POLICY_SINGLETON)
def singleton():
    try:
        yield kaa.delay(5)
    except kaa.InProgressAborted as e:
        print 'singleton() aborted'

@kaa.coroutine()
def master():
    try:
        yield singleton()
    except kaa.InProgressAborted as e:
        print 'master() aborted because %s aborted' % e.origin

master()
kaa.OneShotTimer(singleton().abort).start(1)

Decorator

kaa.coroutine(interval=0, policy=None, progress=False, group=None)

Decorated functions (which must be generators) may yield control back to the mainloop and be subsequently resumed at a later time.

Functions which yield kaa.NotFinished will be resumed on the next mainloop iteration; yielding an InProgress object will cause the coroutine to be resumed when the InProgress is finished. However, yielding a finished InProgress object will cause the coroutine to be resumed immediately.

The coroutine is considered finished when the underlying generator yields a value other than kaa.NotFinished or an InProgress object.

Parameters:
  • interval – Number of seconds to delay before resuming entry into the coroutine. Set to 0 (default) to resume as soon as possible (but not sooner than the next mainloop iteration).
  • policy – None, or one of POLICY_SYNCHRONIZED, POLICY_SINGLETON, or POLICY_PASS_LAST (described below).
  • progress – if True, an InProgressStatus object is passed as the first argument to the decorated function, allowing the coroutine to report progress to the caller. (The progress parameter corresponds to the progress attribute of the InProgress object returned to the caller.)
  • group – Name of the group this coroutine shares its policy with. For example, multiple coroutines with POLICY_SYNCHRONIZED and the same group name will all be synchronized against each other. Currently only methods within the same class may belong to the same group.
Returns:

a CoroutineInProgress object representing the coroutine

Possible policies are:

  • kaa.POLICY_SYNCHRONIZED: reentry into the coroutine is not permitted, and multiple calls are queued so that they execute sequentially.
  • kaa.POLICY_SINGLETON: only one active instance of the coroutine is allowed to exist. If the coroutine is invoked while another is running, the CoroutineInProgress object returned by the first invocation until it finishes.
  • kaa.POLICY_PASS_LAST: passes the CoroutineInProgress of the most recently called, unfinished invocation of this coroutine as the ‘last’ kwarg. If no such CoroutineInProgress exists, the last kwarg will be None. This is useful to chain multiple invocations of the coroutine together, but unlike POLICY_SYNCHRONIZED, the decorated function is entered each invocation.

A function decorated with this decorator will always return a CoroutineInProgress object. It may already be finished (which happens if the coroutine’s first yielded value is one other than kaa.NotFinished or an InProgress object).

If it is not finished, the coroutine’s life can be controlled via the CoroutineInProgress it returns. It can be aborted with abort(), in which case an InProgressAborted will be raised inside the coroutine, or its interval may be adjusted via the interval property.

class kaa.CoroutineInProgress(function, function_info, interval, progress=None)

An InProgress object returned by the coroutine() decorator.

The caller of kaa.coroutine() can interact with the coroutine via the returned CoroutineInProgress object, or yield it from other coroutines.

Notably, coroutines can be aborted by invoking abort() on this object.

Synopsis

Class Hierarchy

kaa.Signal
kaa.Object
└─ kaa.InProgress
     └─ kaa.CoroutineInProgress

Methods
abort()Aborts the coroutine.
Properties
activeread-onlyTrue if the coroutine is still waiting to be processed, or False if it’s finished.
intervalread/writeThe interval between the coroutine yielding a kaa.NotFinished or InProgress and reentry.
Signals
abortEmitted when abort() is called.

Methods

abort(exc=None)

Aborts the coroutine.

See kaa.InProgress.abort() for argument details.

This will raise the supplied exception (InProgressAborted if not given) inside the coroutine. If the coroutine doesn’t catch the exception, then it will be raised back to the caller of abort().

If the coroutine being aborted is currently waiting on some other yielded InProgress which is abortable (which includes other coroutines as well as threaded() functions, both of which are abortable by default), then it will also be aborted if and only if nothing else is waiting for it.

For example, a POLICY_SINGLETON coroutine z() that is yielded from both coroutine a() and b() would not be aborted if either a() or b() were aborted. If you want z() to be aborted, then a() and/or b() would need to catch InProgressAborted when yielding z() and explicitly abort it:

@kaa.coroutine(policy=kaa.POLICY_SINGLETON)
def z():
    # do stuff ...
    yield whatever()

@kaa.coroutine()
def a():
    yield z()

@kaa.coroutine()
def b():
    try:
        yield z()
    except kaa.InProgressAborted as e:
        e.inprogress.abort(e)

In this case, if abort() was called on b(), its exception handler would abort z(), which could cause InProgressAborted to be raised inside a(). If b() didn’t exist in the above example, z() would automatically be aborted. You could prevent this by using noabort():

@kaa.coroutine()
def b():
    try:
        yield z().noabort()
    except kaa.InProgressAborted as e:
        print('b() is aborted, but z() lives on')

Properties

active

True if the coroutine is still waiting to be processed, or False if it’s finished.

interval

The interval between the coroutine yielding a kaa.NotFinished or InProgress and reentry.

Signals

abort

Emitted when abort() is called.

def callback(exc)
Param exc:an exception object the InProgress was aborted with.
Type exc:InProgressAborted

If the task cannot be aborted, the callback can return False, which will cause an exception to be raised by abort().

Table Of Contents

Previous topic

InProgress Objects

Next topic

Thread Support

This Page