InProgress Objects

Throughout Kaa, when a function executes asynchronously (which is generally the case for any function that may otherwise block on some resource), it returns an InProgress object. The InProgress object is a Signal that callbacks can be connected to in order to handle its return value or any exception raised during the asynchronous execution. When the InProgress object is emitted, we say that it is “finished” (and the finished property is True).

InProgress objects are emitted (they are Signal objects, remember) when finished, so handlers can retrieve the return value of the asynchronous task. There is also an exception member, which is itself a Signal, and is emitted when the asynchronous task raises an exception. Exception handlers must accept three arguments: exception class, exception instance, and traceback object. (These three arguments correspond to sys.exc_info())

The following example demonstrates how one might connect callbacks to an InProgress in order to handle success result and exceptions:

import kaa

def handle_connect(result):
    print 'Connected to remote site successfully'

def handle_exception(tp, exc, tb):
    print 'Connect failed:', exc

sock = kaa.Socket()
inprogress = sock.connect('www.freevo.org:80')
inprogress.connect(handle_connect)
inprogress.exception.connect(handle_exception)
# Or a convenience function exists to replace the above 2 lines:
# inprogress.connect_both(handle_connect, handle_exception)
kaa.main.run()

Connecting callbacks to signals in this way is fairly standard and this approach is used in many other frameworks. For example, readers familiar with the Twisted framework may find similarities with Twisted’s Deferreds. InProgress objects are also very similar to futures and promises.

However, InProgress objects can be used with coroutines (covered in more detail later), a more interesting and powerful approach which allows you to handle the result of InProgress objects without the use of callbacks. The above example could be rewritten as:

import kaa

@kaa.coroutine()
def connect(site):
    sock = kaa.Socket()
    try:
        yield sock.connect(site)
    except Exception, exc:
        print 'Connect failed:', exc
    else:
        print 'Connected to remote site successfully'

connect('www.freevo.org:80')
kaa.main.run()

As seen in the above snippet, with coroutines, InProgress objects are used implicitly, where they function as a mechanism for message passing between asynchronous tasks and the coroutine machinery built into the notifier.

If an InProgress finishes with an exception (in which case the failed property is True) but it is not handled by one of the above methods (either by connecting a callback to the exception attribute, or by catching the exception raised by a yield in a coroutine), the exception will be logged to stderr with the heading “Unhandled asynchronous exception.”

class kaa.InProgress(abortable=None, frame=0)

InProgress objects are returned from functions that require more time to complete (because they are either blocked on some resource, are executing in a thread, or perhaps simply because they yielded control back to the main loop as a form of cooperative time slicing).

InProgress subclasses Signal, which means InProgress objects are themselves signals. Callbacks connected to an InProgress receive a single argument containing the result of the asynchronously executed task.

If the asynchronous task raises an exception, the exception member, which is a separate signal, is emitted instead.

Parameters:abortable (bool) – see the abortable property.

Synopsis

Class Hierarchy

kaa.Object
kaa.Signal
└─ kaa.InProgress

Methods
abort()Aborts the asynchronous task this InProgress represents.
connect()Connects a callback to be invoked when the InProgress has returned normally (no exception raised).
connect_both()Convenience function that connects a callback (or callbacks) to both the InProgress (for successful result) and exception signals.
execute()Execute the given function and finish the InProgress object with the result or exception.
finish()This method should be called when the owner (creator) of the InProgress is finished successfully (with no exception).
throw()This method should be called when the owner (creator) of the InProgress is finished because it raised an exception.
timeout()Create a new InProgress object linked to this one that will throw TimeoutException if this object is not finished by the given timeout.
wait()Blocks until the InProgress is finished.
waitfor()Connects to another InProgress object (A) to self (B). When A finishes (or throws), B is finished with the result or exception.
noabort()Create a new InProgress object for this task that cannot be aborted.
Properties
abortableread/writeTrue if the asynchronous task this InProgress represents can be aborted by a call to abort().
exceptionread-onlyA Signal emitted when the asynchronous task this InProgress represents has raised an exception.
failedread-onlyTrue if an exception was thrown to the InProgress, False if it was finished without error or if it is not yet finished.
finishedread-onlyTrue if the InProgress is finished.
resultread-onlyThe result the InProgress was finished with. If an exception was thrown to the InProgress, accessing this property will raise that exception.
Signals
abortEmitted when abort() is called.

Methods

abort(exc=None)

Aborts the asynchronous task this InProgress represents.

Parameters:exc (InProgressAborted or subclass thereof) – optional exception object with which to abort the InProgress; if None is given, a general InProgressAborted exception will be used.

Not all such tasks can be aborted. If aborting is not supported, or if the InProgress is already finished, a RuntimeError exception is raised.

connect(callback, *args, **kwargs)

Connects a callback to be invoked when the InProgress has returned normally (no exception raised).

If the asynchronous task raises an exception, the InProgress finishes with that exception and the exception signal is emitted.

connect_both(finished, exception=None)

Convenience function that connects a callback (or callbacks) to both the InProgress (for successful result) and exception signals.

This function does not accept additional args/kwargs to be passed to the callbacks. If you need that, use connect() and exception.connect().

If exception is not given, the given callable will be used for both success and exception results, and therefore must be able to handle variable arguments (as described for each callback below).

Parameters:
  • finished – callback to be invoked upon successful completion; the callback is passed a single argument, the result returned by the asynchronous task.
  • exception – (optional) callback to be invoked when the asynchronous task raises an exception; the callback is passed three arguments representing the exception: exception class, exception instance, and traceback.
execute(func, *args, **kwargs)

Execute the given function and finish the InProgress object with the result or exception.

If the function raises SystemExit or KeyboardInterrupt, those are re-raised to allow them to be properly handled by the main loop.

Parameters:
  • func (callable) – the function to be invoked
  • args – the arguments to be passed to the function
  • kwargs – the keyword arguments to be passed to the function
Returns:

the InProgress object being acted upon (self)

finish(result)

This method should be called when the owner (creator) of the InProgress is finished successfully (with no exception).

Any callbacks connected to the InProgress will then be emitted with the result passed to this method.

If result is an unfinished InProgress, then instead of finishing, we wait for the result to finish via waitfor().

Parameters:result – the result of the completed asynchronous task. (This can be thought of as the return value of the task if it had been executed synchronously.)
Returns:This method returns self, which makes it convenient to prime InProgress objects with a finished value. e.g. return InProgress().finish(42)
throw(type=None, value=None, tb=None, aborted=False)

This method should be called when the owner (creator) of the InProgress is finished because it raised an exception.

Any callbacks connected to the exception signal will then be emitted with the arguments passed to this method.

The parameters correspond to sys.exc_info(). If they are not specified then the current exception in sys.exc_info() will be used; this is analogous to a naked raise within an except block.

Note

Exceptions thrown to InProgress objects that aren’t explicitly handled will be logged at the INFO level (using a logger name with a kaa. prefix.) When the unhandled asynchronous exception is logged depends on the version of Python used.

Some versions of CPython (e.g 2.6) will log the unhandled exception immediately when the InProgress has no more references, while others (e.g. 2.7 and 3.2) will only be logged on the next garbage collection.

Parameters:
  • type – the class of the exception
  • value – the instance of the exception
  • tb – the traceback object representing where the exception took place
timeout(timeout, callback=None, abort=False)

Create a new InProgress object linked to this one that will throw TimeoutException if this object is not finished by the given timeout.

Parameters:
  • callback (callable) – called (with no additional arguments) just prior to TimeoutException
  • abort (bool) – invoke abort() on the original InProgress if the timeout occurs.
Returns:

a new InProgress object that is subject to the timeout

If the original InProgress finishes before the timeout, the new InProgress (returned by this method) is finished with the result of the original. If abort() is called on the returned InProgress, the original one will also be aborted.

If a timeout does occur and the abort argument is False, the original InProgress object is not affected: it is not finished with the TimeoutException, nor is it aborted. You can explicitly abort the original InProgress:

@kaa.coroutine()
def read_from_socket(sock):
    try:
        data = yield sock.read().timeout(3)
    except kaa.TimeoutException as e:
        print 'Error:', e.args[0]
        e.inprogress.abort()

If you set abort=True then the original InProgress is aborted automatically, but before the TimeoutException is thrown into the new InProgress returned by this method. This allows a coroutine being aborted to perform cleanup actions before relinquishing control back to the caller. In this example, sock.read() will be aborted if not completed within 3 seconds:

@kaa.coroutine()
def read_from_socket(sock):
    data = yield sock.read().timeout(3, abort=True)
wait(timeout=None)

Blocks until the InProgress is finished.

The main loop is kept alive if waiting in the main thread, otherwise the thread is blocked until another thread finishes the InProgress.

If the InProgress finishes due to an exception, that exception is raised.

Parameters:timeout – if not None, wait() blocks for at most timeout seconds (which may be fractional). If wait times out, a TimeoutException is raised.
Returns:the value the InProgress finished with
waitfor(inprogress)

Connects to another InProgress object (A) to self (B). When A finishes (or throws), B is finished with the result or exception.

Parameters:inprogress (InProgress) – the other InProgress object to link to.
noabort()

Create a new InProgress object for this task that cannot be aborted.

Returns:a new InProgress object that finishes when self finishes, but will raise if an abort() is attempted

Properties

abortable

True if the asynchronous task this InProgress represents can be aborted by a call to abort().

By default abort() will fail if there are no callbacks attached to the abort signal. This property may be explicitly set to True, in which case abort() will succeed regardless. An InProgress is therefore abortable if the abortable property has been explicitly set to True, or if there are callbacks connected to the abort signal.

This is useful when constructing an InProgress object that corresponds to an asynchronous task that can be safely aborted with no explicit action.

exception

A Signal emitted when the asynchronous task this InProgress represents has raised an exception.

Callbacks connected to this signal receive three arguments: exception class, exception instance, traceback.

failed

True if an exception was thrown to the InProgress, False if it was finished without error or if it is not yet finished.

finished

True if the InProgress is finished.

result

The result the InProgress was finished with. If an exception was thrown to the InProgress, accessing this property will raise that exception.

Signals

abort

Emitted when abort() is called.

def callback(exc)
Param exc:an exception object the InProgress was aborted with.
Type exc:InProgressAborted

If the task cannot be aborted, the callback can return False, which will cause an exception to be raised by abort().

Also see related classes CoroutineInProgress and ThreadInProgress.

class kaa.InProgressCallable(func=None, abortable=None, frame=0, throwable=True)

A callable variant of InProgress that finishes when invoked.

Parameters:
  • func (callable) – if defined, is invoked on initialization and self is passed as the only argument. Intended as a convenience.
  • frame – corresponds to the abortable property in InProgress
  • throwable – if True, if the InProgressCallable is invoked with 3 arguments (tp, exc, value) then throw() is called instead of finish().

The main value of the InProgressCallable is the translation done between the arguments passed on invocation and those passed to finish(). A single positional or named argument is passed as the value to finish(); multiple positional or multiple named arguments are passed as a tuple or dict respectively; otherwise the InProgress is finished with a 2-tuple of (args, kwargs).

If throwable is True (default) and the callable is invoked with the standard exception 3-tuple (type, value, traceback), then it’s treated as an exception which is thrown to the InProgress.

The primary use-case for the InProgressCallable is to marry conventional callbacks for asynchronous task completion with the InProgress strategy.

InProgress Collections

class kaa.InProgressAny(*objects, **kwargs)

InProgress object that finishes when any of the supplied InProgress objects (in constructor) finish.

Sequences or generators passed as arguments will be flattened, allowing for this idiom:

yield InProgressAny(func() for func in coroutines)

Arguments will be passed through kaa.inprogress() so any object that can be coerced to an InProgress (such as Signal objects) can be passed directly.

Note

Callbacks aren’t attached to the supplied InProgress objects to monitor their state until a callback is attached to the InProgressAny object. This means an InProgressAny with nothing connected will not actually finish even when one of its constituent InProgresses finishes.

Parameters:
  • finish (FINISH_IDX_RESULT (default), FINISH_IDX, or FINISH_RESULT) – controls what values the InProgressAny is finished with
  • filter (callable) – optional callback receiving two arguments (index and finished result) invoked each time an underlying InProgress object finishes which, if returns True, prevents the completion of the InProgressAny iff there are other InProgress objects that could yet finish.

The possible finish values are:

  • kaa.FINISH_IDX_RESULT: a 2-tuple (idx, result) containing the index (relative to the position of the InProgress passed to the constructor) and the result that InProgress finished with
  • kaa.FINISH_IDX: the index (offset from 0) of the InProgress that finished the InProgressAny
  • kaa.FINISH_RESULT: the result of the InProgress that finished the InProgressAny

For example:

# Read data from either sock1 or sock2, whichever happens first within
# 2 seconds.
idx, data = yield kaa.InProgressAny(kaa.delay(2), sock1.read(), sock2.read())
if idx == 0:
    print 'Nothing read from either socket after 2 seconds'
else:
    print 'Read from sock{0}: {1}'.format(idx, data)

class kaa.InProgressAll(*objects, **kwargs)

InProgress object that finishes only when all of the supplied InProgress objects (in constructor) finish.

The InProgressAll object then finishes with itself. The finished InProgressAll is useful to fetch the results of the individual InProgress objects. It can be treated as an iterator, and can be indexed:

for ip in (yield kaa.InProgressAll(sock1.read(), sock2.read())):
    print(ip.result)

InProgress Exceptions

The following exceptions can be raised by InProgress methods.

class kaa.TimeoutException(*args, **kwargs)

This exception is raised by an InProgress returned by timeout() when the timeout occurs.

For example:

sock = kaa.Socket()
try:
    yield sock.connect('deadhost.com:80').timeout(10)
except kaa.TimeoutException:
    print 'Connection timed out after 10 seconds'

class kaa.InProgressAborted(*args, **kwargs)

This exception is thrown into an InProgress object when abort() is called.

For ThreadCallable and ThreadPoolCallable this exception is raised inside the threaded callable. This makes it potentially an asynchronous exception (when used this way), and therefore it subclasses BaseException, similar in rationale to KeyboardInterrupt and SystemExit, and also (for slightly different reasons) GeneratorExit, which as of Python 2.6 also subclasses BaseException.

Functions

kaa.inprogress(obj)

Returns a suitable InProgress for the given object.

Parameters:obj – object to represent as an InProgress.
Returns:an InProgress representing obj

The precise behaviour of an object represented as an InProgress should be defined in the documentation for the class. For example, the InProgress for a Process object will be finished when the process is terminated.

This function simply calls __inprogress__() of the given obj if one exists, and if not will raise an exception. In this sense, it behaves quite similar to len() and __len__().

It is safe to call this function on InProgress objects. (The InProgress object given will simply be returned.)

A practical demonstration of this protocol is in the Signal object, which implements the __inprogress__ method. The returned InProgress in that case is finished when the signal is next emitted. Any object implementing the __inprogress__ protocol can be passed directly to the constructor of InProgressAny or InProgressAll.

kaa.delay(seconds)

Returns an InProgress that finishes after the given time in seconds.

Parameters:obj – object to represent as an InProgress.
Returns:InProgress

Table Of Contents

Previous topic

Events and Event Handlers

Next topic

Coroutines

This Page