I/O Channels

FIXME: Write some basic stuff about IO in kaa

Monitoring I/O

class kaa.IOMonitor(callback, *args, **kwargs)

Creates an IOMonitor to monitor IO activity via the mainloop.

Once a file descriptor is registered using the register() method, the given callback is invoked upon I/O activity.

Synopsis

Class Hierarchy

kaa.Callable
└─ kaa.nf_wrapper.NotifierCallback
     └─ kaa.IOMonitor

Methods
register()Register the IOMonitor to a specific file descriptor.
unregister()Unregister the IOMonitor
Properties
This class has no properties.
Signals
This class has no signals.

Methods

register(fd, condition=1)

Register the IOMonitor to a specific file descriptor.

The IOMonitor is registered with the notifier, which means that the notifier holds a reference to the IOMonitor until it is explicitly unregistered (or until the file descriptor is closed).

Parameters:
  • fd (File descriptor or any file-like object) – The file descriptor to monitor.
  • condition – IO_READ, IO_WRITE, or IO_EXCEPT
unregister()

Unregister the IOMonitor

class kaa.WeakIOMonitor(func, *args, **kwargs)

IOMonitor using weak references for the callback.

Any previously registered file descriptor will become unregistered from the notifier when the callback (or any arguments) are destroyed.

Synopsis

Class Hierarchy

kaa.Callable
└─ kaa.nf_wrapper.NotifierCallback
└─ kaa.WeakCallable
     └─ kaa.IOMonitor
     └─ kaa.nf_wrapper.WeakNotifierCallback
          └─ kaa.WeakIOMonitor

Communication over I/O channels

class kaa.IOChannel(channel=None, mode=3, chunk_size=1048576, delimiter='n')

Base class for read-only, write-only or read-write stream-based descriptors such as Socket and Process. Implements logic common to communication over such channels such as async read/writes and read/write buffering.

It may also be used directly with file descriptors or (probably less usefully) file-like objects. e.g. IOChannel(file('somefile'))

Parameters:
  • channel (integer file descriptor, file-like object, or other IOChannel) – file descriptor to wrap into an IOChannel
  • mode (bitmask of kaa.IO_READ and/or kaa.IO_WRITE) – indicates whether the channel is readable, writable, or both.
  • chunk_size – maximum number of bytes to be read in from the channel at a time; defaults to 1M.
  • delimiter – string used to split data for use with readline; defaults to ‘\n’.

Writes may be performed to an IOChannel that is not yet open. These writes will be queued until the queue size limit (controlled by the queue_size property) is reached, after which an exception will be raised. The write queue will be written to the channel once it becomes writable.

Reads are asynchronous and non-blocking, and may be performed using two possible approaches:

  1. Connecting a callback to the read or readline signals.
  2. Invoking the read() or readline() methods, which return InProgress objects.

It is not possible to use both approaches with readline. (That is, it is not permitted to connect a callback to the readline signal and subsequently invoke the readline() method when the callback is still connected.)

However, read() and readline() will work predictably when a callback is connected to the read signal. Such a callback always receives all data from the channel once connected, but will not interfere with (or “steal” data from) calls to read() or readline().

Data is not consumed from the channel if no one is interested in reads (that is, when there are no read() or readline() calls in progress, and there are no callbacks connected to the read and readline signals). This is necessary for flow control.

Data is read from the channel in chunks, with the maximum chunk being defined by the queue_size property. Unlike other APIs, read() does not block and will not consume all data to the end of the channel, but rather returns between 0 and chunk_size bytes when it becomes available. If read() returns a zero-byte string, it means the channel is closed. (Here, “returns X” means the InProgress object read() actually returns is finished with X.)

In order for readline to work properly, a read queue is maintained, which may grow up to queue_size. See the readline() method for more details.

Synopsis

Class Hierarchy

kaa.Object
└─ kaa.IOChannel

Methods
close()Closes the channel.
make_native_proxy()
read()Reads a chunk of data from the channel.
readline()Reads a line from the channel.
steal()Steal all state from the given channel, assuming control over the underlying file descriptor or socket.
wrap()Make the IOChannel represent a new descriptor or file-like object.
write()Writes the given data to the channel.
Properties
aliveread-onlyTrue if the channel exists and is open.
channelread-onlyThe original object this IOChannel is wrapping.
chunk_sizeread/writeNumber of bytes to attempt to read from the channel at a time.
close_on_eofread/writeWhether the channel automatically closes when EOF is encountered or on unexpected exceptions.
delimiterread/writeString used to split data for use with readline().
filenoread-onlyThe file descriptor (integer) for this channel, or None if no channel has been set.
moderead-onlyWhether the channel is read-only, or read/write.
queue_sizeread/writeThe size limit in bytes for the read and write queues.
read_queue_usedread-onlyThe number of bytes in the read queue.
readableread-onlyTrue if read() may be called.
writableread-onlyTrue if write() may be called.
write_queue_usedread-onlyThe number of bytes queued in memory to be written to the channel.
Signals
readEmitted for each chunk of data read from the channel.
readlineEmitted for each line read from the channel.
closedEmitted when the channel is closed.

Methods

close(immediate=False, expected=True)

Closes the channel.

Parameters:immediate (bool) – if False and there is data in the write buffer, the channel is closed once the write buffer is emptied. Otherwise the channel is closed immediately and the closed signal is emitted.
make_native_proxy(close_on_eof=None)
read()

Reads a chunk of data from the channel.

Returns:An InProgress object. If the InProgress is finished with the empty string, it means that no data was collected and the channel was closed (or the channel was already closed when read() was called).

It is therefore possible to busy-loop by reading on a closed channel:

while True:
    data = yield channel.read()
    # Or: channel.read().wait()

So the return value of read() should be checked. Alternatively, the readable property could be tested:

while channel.readable:
     data = yield process.read()
readline()

Reads a line from the channel.

The line delimiter is included in the string to avoid ambiguity. If no delimiter is present then either the read queue became full or the channel was closed before a delimiter was received.

Returns:An InProgress object. If the InProgress is finished with the empty string, it means that no data was collected and the channel was closed (or the channel was already closed when readline() was called).

Data from the channel is read and queued in until the delimiter (\n by default, but may be changed by the delimiter property) is found. If the read queue size exceeds the queue limit, then the InProgress returned here will be finished prematurely with whatever is in the read queue, and the read queue will be purged.

This method may not be called when a callback is connected to the IOChannel’s readline signal. You must use either one approach or the other.

steal(channel)

Steal all state from the given channel, assuming control over the underlying file descriptor or socket.

Parameters:channel (IOChannel) – the channel to steal from
Returns:self

The use-case for this method is primarily to convert one type of IOChannel into another. For example, it’s possible to convert a standard Socket into a TLSSocket in the middle of a session. This method returns self so that this idiom is possible:

from kaa.net.tls import TLSSocket
sock = TLSSocket().steal(sock)

This method is similar to wrap(), but additionally all state is moved from the supplied IOChannel, including read/write queues, and all callbacks connected to signals are added to self, and removed from channel.

Once stolen, the given channel is rendered basically inert.

wrap(channel, mode)

Make the IOChannel represent a new descriptor or file-like object.

This is implicitly called by the initializer. If the IOChannel is already wrapping another channel, it will be closed before the given one is wrapped.

Parameters:
  • channel (integer file descriptor, file-like object, or other IOChannel) – file descriptor to wrap into the IOChannel
  • mode (bitmask of kaa.IO_READ and/or kaa.IO_WRITE) – indicates whether the channel is readable, writable, or both. Only applies to file descriptor channels or IOChannel objects; for file-like objects, the underlying channel’s mode will be assumed.
write(data)

Writes the given data to the channel.

Parameters:data (string) – the data to be written to the channel.
Returns:An InProgress object which is finished when the given data is fully written to the channel. The InProgress is finished with the number of bytes sent in the last write required to commit the given data to the channel. (This may not be the actual number of bytes of the given data.)

If the channel closes unexpectedly before the data was written, an IOError is thrown to the InProgress.

It is not required that the channel be open in order to write to it. Written data is queued until the channel open and then flushed. As writes are asynchronous, all written data is queued. It is the caller’s responsibility to ensure the internal write queue does not exceed the desired size by waiting for past write() InProgress to finish before writing more data.

If a write does not complete because the channel was closed prematurely, an IOError is thrown to the InProgress.

Properties

alive

True if the channel exists and is open.

channel

The original object this IOChannel is wrapping.

This may be a file object, socket object, file descriptor, etc., depending what was passed during initialization or to wrap().

This is None if the channel is closed.

chunk_size

Number of bytes to attempt to read from the channel at a time.

The default is 1M. A ‘read’ signal is emitted for each chunk read from the channel. (The number of bytes read at a time may be less than the chunk size, but will never be more.)

close_on_eof

Whether the channel automatically closes when EOF is encountered or on unexpected exceptions.

The channel is considered EOF when a read returns an empty string. A write that fails due to IOError or OSError will also close the channel if this property is True.

This behaviour makes sense for stream-based channels (e.g. a subprocess or socket), but may not for file-based channels. The default is True unless the underlying wrapped channel object contains a seek method, in which case it is treated as a file and this property is False. In either case it can be overridden by explicitly setting this property.

delimiter

String used to split data for use with readline().

Delimiter may also be a list of strings, in which case any one of the elements in the list will be used as a delimiter. For example, if you want to delimit based on either \r or \n, specify [‘\r’, ‘\n’].

fileno

The file descriptor (integer) for this channel, or None if no channel has been set.

mode

Whether the channel is read-only, or read/write.

A bitmask of IO_READ and/or IO_WRITE.

queue_size

The size limit in bytes for the read and write queues.

Each queue can consume at most this size plus the chunk size. Setting a value does not affect any data currently in any of the the queues.

read_queue_used

The number of bytes in the read queue.

The read queue is only used if either readline() or the readline signal is.

readable

True if read() may be called.

The channel is readable if it’s open and its mode has IO_READ, or if the channel is closed but a read() call would still succeed (due to buffered data).

Note

A value of True does not mean there is data available, but rather that there could be and that a read() call is possible (however that read() call may return None, in which case the readable property will subsequently be False).

writable

True if write() may be called.

(However, if you pass too much data to write() such that the write queue limit is exceeded, the write will fail.)

write_queue_used

The number of bytes queued in memory to be written to the channel.

Signals

read

Emitted for each chunk of data read from the channel.

def callback(chunk, ...)
Param chunk:data read from the channel
Type chunk:str

When a callback is connected to the read signal, data is automatically read from the channel as soon as it becomes available, and the signal is emitted.

It is allowed to have a callback connected to the read signal and simultaneously use the read() and readline() methods.

readline

Emitted for each line read from the channel.

def callback(line, ...)
Param line:line read from the channel
Type line:str

It is not allowed to have a callback connected to the readline signal and simultaneously use the readline() method.

Refer to readline() for more details.

closed

Emitted when the channel is closed.

def callback(expected, ...)
Param expected:True if the channel is closed because close() was called.
Type expected:bool

Table Of Contents

Previous topic

Generators

Next topic

Socket I/O

This Page