Thread Support

The threaded decorator

Any function or method may be decorated with @kaa.threaded() which takes two optional arguments: a thread name, and a priority. If a thread name is specified, the decorated function is wrapped in ThreadPoolCallable, and invocations of that function are queued to be executed across one or more threads. If the thread name is kaa.MAINTHREAD the decorated function is invoked from the main thread. If no thread name is specified, the function is wrapped in ThreadCallable so that each invocation is executed in a separate thread. Because these callables return ThreadInProgress objects, which are derived from InProgress, they may be yielded from coroutines.

For example:

@kaa.threaded()
def do_blocking_task():
   [...]
   return 42

@kaa.coroutine()
def do_something_else():
   try:
      result = yield do_blocking_task()
   except:
      print "Exception raised in thread"

   print "Thread returned", result

The threaded decorator also supports a async kwarg, which is by default True. When True, the decorated function returns a ThreadInProgress object. When False, however, invocation of the function blocks until the decorated function completes, and its return value is passed back. Internally, the decorator merely invokes wait() on the InProgress returned by the threaded function, which means the main loop is otherwise kept alive for timers and I/O handlers. This allows a threaded function to be used as a standard callback (but in practice it is not used often).

kaa.threaded(pool=None, priority=0, async=True, progress=False, wait=False)

Decorator causing the decorated function to be executed within a thread when invoked.

Parameters:
  • pool (ThreadPool, str, kaa.MAINTHREAD, or None) – a ThreadPool object or name of a registered thread pool; if None, a new thread will be created for each invocation.
  • priority (int) – priority for the job in the thread pool
  • async (bool) – if False, blocks until the decorated function completes
  • progress (bool) – if True, the first argument passed to the decorated function will be an InProgressStatus object in order to indicate execution progress to the caller.
  • wait (bool) – corresponds to kaa.ThreadCallable.wait_on_exit. It may be necessary to set this to True if the kaa main loop is not running. (Default: False)
Returns:

ThreadedInProgress if async=False, or the return value or the decorated function if async=True

A special pool constant kaa.MAINTHREAD is available, which causes the decorated function to always be invoked from the main thread. In this case, currently the priority argument is ignored.

If pool is None, the decorated function will be wrapped in a ThreadCallable for execution. Otherwise, pool specifies either a ThreadPool object or pool name previously registered with kaa.register_thread_pool(), and the decorated function will be wrapped in a ThreadPoolCallable for execution.

As a rule of thumb, if you have a function that must always be called in the main thread, you would use @kaa.threaded(kaa.MAINTHREAD) as mentioned above. If you need to decide case-by-case, don’t decorate it and use MainThreadCallable when needed.

The synchronized decorator

class kaa.synchronized(obj=None)

synchronized decorator and with statement similar to synchronized in Java. When decorating a non-member function, a lock or any class inheriting from object may be provided.

Parameters:obj – object were all calls should be synchronized to. if not provided it will be the object for member functions or an RLock for functions.

Create a synchronized object. Note: when used on classes a new member _kaa_synchronized_lock will be added to that class.

Some functions may need to block concurrent access to certain data structures, or prevent concurrent entry to the whole function. In these cases, kaa.synchronized can be used, which serves as both a decorator as well as a context manager for use with Python’s with statement:

class Test(object):

    def foo(self):
        # call to do_something() can be done concurrently by other threads.
        do_something()
        with kaa.synchronized(self):
            # Anything in this block however is synchronized between threads.
            do_something_else()


    # bar() is a protected function
    @kaa.synchronized()
    def bar(self, x, y):
        do_something_else()

The decorator will synchronize on the actual object. Two different objects can access the same function in two threads. On the other hand it is not possible that one thread is in the protected block of foo and another one calling bar.

The decorator can also be used for functions outside a class. In that case the decorator only protects this one function. If more functions should be protected against each other, a Python RLock object can be provided:

# threading.Lock does NOT work
lock = threading.RLock()

@kaa.synchronized(lock)
def foo():
    # foo and bar synchronized
    do_something()

@kaa.synchronized(lock)
def bar(x):
    # foo and bar synchronized
    do_something()

@kaa.synchronized()
def baz():
    # only_baz_synchronized
    do_something()

Thread Functions

The following thread-related functions are available:

kaa.is_mainthread()

Return True if the current thread is the main thread.

Note that the “main thread” is considered to be the thread in which the kaa main loop is running. This is usually, but not necessarily, what Python considers to be the main thread. (If you call kaa.main.run() in the main Python thread, then they are equivalent.)

kaa.main.wakeup()

Wakes up the mainloop.

The mainloop sleeps when there are no timers to process and no activity on any registered IOMonitor objects. This function can be used by another thread to wake up the mainloop. For example, when a MainThreadCallable is invoked, it calls wakeup().

kaa.register_thread_pool(name, pool)

Registers a ThreadPool under the given name.

Parameters:
  • name (str) – the name under which to register this thread pool
  • pool (ThreadPool) – the thread pool object
Returns:

the supplied ThreadPool object

Once registered, the thread pool may be referenced by name when using the @kaa.threaded() decorator or ThreadPoolCallable class.

Thread pool names are arbitrary strings, but the recommended convention is to format the pool name as appname::poolname, where appname uniquely identifies the application, and poolname describes the purpose of the thread pool. An example might be beacon::thumbnailer.

kaa.get_thread_pool(name)

Returns the ThreadPool previously registered with the given name, or None if no ThreadPool was registered with that name.

Callables and Supporting Classes

Kaa provides a ThreadCallable class which can be used to invoke a callable in a new thread every time the ThreadCallable object is invoked.

With the ThreadPoolCallable class, invocations are queued and each executed in an available thread within a pool of one or more threads. A priority may also be specified, and ThreadPoolCallable objects with the highest priority are first in the queue (and hence executed first). This allows you to create a priority-based job queue that executes asynchronously.

Although the @kaa.threaded() decorator provides a more convenient means to make use of these classes, they may still be used directly.

Instances of the two classes above are callable, and they return ThreadInProgress objects:

def handle_result(result):
    # This runs in the main thread.
    print 'Thread returned with', result

kaa.ThreadCallable(do_blocking_task)(arg1, arg2).connect(handle_result)

Or, alternatively:

@kaa.coroutine()
def some_coroutine():
    [...]
    result = yield kaa.ThreadCallable(do_blocking_task)(arg1, arg2)

class kaa.ThreadInProgress(func, *args, **kwargs)

An InProgress class that represents threaded tasks. You will likely not need to instantiate this class directly, but ThreadInProgress objects are returned when invoking ThreadCallable or ThreadPoolCallable objects, or functions decorated with @kaa.threaded().

Callbacks connected to this InProgress are invoked from the main thread.

Synopsis

Class Hierarchy

kaa.Signal
kaa.Object
└─ kaa.InProgress
     └─ kaa.ThreadInProgress

Methods
abort()Aborts the callable being executed inside a thread. (Or attempts to.)
Properties
activeread-onlyTrue if the callable is still waiting to be processed.

Methods

abort(exc=None)

Aborts the callable being executed inside a thread. (Or attempts to.)

See kaa.InProgress.abort() for argument details.

Invocation of a ThreadCallable or ThreadPoolCallable will return a ThreadInProgress object which may be aborted by calling this method. When an in-progress thread is aborted, an InProgressAborted exception is raised inside the thread.

Just prior to raising InProgressAborted inside the thread, the abort signal will be emitted. Callbacks connected to this signal are invoked within the thread from which abort() was called. If any of the callbacks return False, InProgressAborted will not be raised in the thread.

It is possible to catch InProgressAborted within the thread to deal with cleanup, but any return value from the threaded callable will be discarded. It is therefore not possible abort an abort within the thread itself. However, if the InProgress is aborted before the thread has a chance to start, the thread is not started at all, and so obviously the threaded callable will not receive InProgressAborted.

Warning

This method raises an exception asynchronously within the thread, and this is unreliable. The asynchronous exception may get inadvertently cleared internally, and if it doesn’t, it will in any case take up to 100 ticks for it to trigger within the thread.

A tick is one or more Python VM bytecodes, which means that if the thread is currently executing non-CPython C code, the thread cannot be interrupted. The worst case scenario would be a blocking system call, which cannot be reliably interrupted when running inside a thread.

This approach still has uses as a general-purposes aborting mechanism, but, if possible, it is preferable for you to implement custom logic by attaching an abort handler to the ThreadCallable or ThreadPoolCallable object.

Properties

active

True if the callable is still waiting to be processed.

class kaa.ThreadCallable(func, *args, **kwargs)

A special Callable used to execute a function or method inside a thread. A new thread is created each time the ThreadCallable is invoked.

Parameters:
  • func – callable function or object
  • args – arguments to be passed to func when invoked
  • kwargs – keyword arguments to be passed to func when invoked

Synopsis

Class Hierarchy

kaa.Callable
kaa.Object
└─ kaa.thread.ThreadCallableBase
     └─ kaa.ThreadCallable

Methods
This class has no methods.
Properties
wait_on_exitread/writeIf True (default), wait for the thread on application exit.
Signals
abortEmitted when the threaded callable is aborted.

Properties

wait_on_exit

If True (default), wait for the thread on application exit.

If False, this causes the thread to be a so-called “daemon thread.” A Python program exits when no non-daemon threads are left running.

Warning

If the main loop is not running (via kaa.main.run()) it is recommended this not be set to False, otherwise you may experience intermittent exceptions during interpreter shutdown.

Signals

abort

Emitted when the threaded callable is aborted.

def callback()

This callback takes no arguments

See abort() for a more detailed discussion.

Handlers may return False to prevent InProgressAborted from being raised inside the thread. However, the ThreadInProgress is still considered aborted regardless. Handlers of this signal are intended to implement more appropriate logic to cancel the threaded callable.

class kaa.ThreadPool(size=1)

Manages a pool of one or more threads for use with the @kaa.threaded() decorator, or ThreadPoolCallable objects.

ThreadPool objects may be assigned a name by calling kaa.register_thread_pool(). When done, the name can be referenced instead of passing the ThreadPool object.

Parameters:size (int) – maximum number of threads this thread pool will grow to.

Synopsis

Class Hierarchy

kaa.ThreadPool

Methods
dequeue()Removes the given job from the thread queue.
enqueue()Creates a job from the given callback and adds it to the thread pool work queue.
Properties
nameread-onlyThe name under which this thread pool was registered.
sizeread/writeThe maxium number of threads this pool may grow to.
timeoutread/writeNumber of seconds a thread pool member will wait for a job before stopping.
Signals
This class has no signals.

Methods

dequeue(job)

Removes the given job from the thread queue.

Parameters:job (ThreadInProgress object) – the job as returned by enqueue()
Returns:True if the job existed and was removed, and False if the job was not found.
enqueue(callback, priority=0)

Creates a job from the given callback and adds it to the thread pool work queue.

Parameters:
  • callback (callable) – a callable which will be invoked inside one of the pool threads.
  • priority (int) – determines the relative priority of the job; higher values are higher priority.
Returns:

a ThreadInProgress object for this job.

It should generally not be necessary to call this method directly. It is called implicitly when using the @kaa.threaded() decorator, or ThreadPoolCallable objects.

Properties

name

The name under which this thread pool was registered.

Thread pools are registered via kaa.register_thread_pool(). Once registered, the thread pool may subsequently be referenced by name when using ThreadPoolCallable or the @kaa.threaded() decorator.

An ThreadPool which has not been registered is called an anonymous thread pool, and may be passed directly to @kaa.threaded() and ThreadPoolCallable.

size

The maxium number of threads this pool may grow to.

If this value is increased and jobs are waiting to be processed, new threads will be spawned as needed (but will not exceed the limit).

If this value is decreased and there are too many active pool members as a result, the necessary number of pool members will be stopped. If those members are currently processing jobs, they will exit once the job is complete.

timeout

Number of seconds a thread pool member will wait for a job before stopping.

A thread which stopped due to timeout may be restarted if new jobs are enqueued that would put that thread to work.

class kaa.ThreadPoolCallable(poolinfo, func, *args, **kwargs)

A special Callable used to execute a function or method inside a thread as part of a thread pool. If all threads in the thread pool are busy, it is queued and will be invoked when one of the threads become available.

Parameters:
  • poolinfo – a ThreadPool object or name of a previously registered thread pool, or a 2-tuple (pool, priority), where pool is a ThreadPool object or registered name, and priority is the integer priority that controls where in the queue the job will be placed.
  • func – the underlying callable that will be called from within a thread.

Synopsis

Class Hierarchy

kaa.Callable
kaa.Object
└─ kaa.thread.ThreadCallableBase
     └─ kaa.ThreadPoolCallable

Methods
This class has no methods.
Properties
This class has no properties.
Signals
abortEmitted when the threaded callable is aborted.

Signals

abort

Emitted when the threaded callable is aborted.

def callback()

This callback takes no arguments

See abort() for a more detailed discussion.

Handlers may return False to prevent InProgressAborted from being raised inside the thread. However, the ThreadInProgress is still considered aborted regardless. Handlers of this signal are intended to implement more appropriate logic to cancel the threaded callable.

class kaa.MainThreadCallable(func, *args, **kwargs)

Wraps a callable and ensures it is invoked from the main thread.

Parameters:
  • func – callable function or object
  • args – arguments to be passed to func when invoked
  • kwargs – keyword arguments to be passed to func when invoked

The MainThreadCallable ensures that the wrapped function or method is executed via main loop. The thread calling this function will return immediately after calling the MainThreadCallable, without waiting for the result. Invoking MainThreadCallables always returns an InProgress object:

def needs_to_be_called_from_main(param):
    print param
    return 5

# ... suppose we are in a thread here ...
cb = kaa.MainThreadCallable(needs_to_be_called_from_main)
print cb(3).wait()

Synopsis

Class Hierarchy

kaa.Callable
└─ kaa.MainThreadCallable

Methods
This class has no methods.
Properties
This class has no properties.
Signals
This class has no signals.

Table Of Contents

Previous topic

Coroutines

Next topic

Generators

This Page