rsb.util

Various helper classes and methods.

Code author: jwienke

Functions

get_logger_by_class(klass)

Get a python logger instance based on a class instance.

prefix()

Try to return the prefix that this code was installed into.

time_to_unix_microseconds(time)

Convert a floating point, seconds based time to a unix microseconds.

unix_microseconds_to_time(value)

Classes

OrderedQueueDispatcherPool(thread_pool_size, …)

A thread pool that dispatches messages to a list of receivers.

class rsb.util.OrderedQueueDispatcherPool(thread_pool_size, del_func, filter_func=None)

Bases: object

A thread pool that dispatches messages to a list of receivers.

The number of threads is usually smaller than the number of receivers and for each receiver it is guaranteed that messages arrive in the order they were published. No guarantees are given between different receivers. All methods except #start and #stop are reentrant.

The pool can be stopped and restarted at any time during the processing but these calls must be single-threaded.

Assumptions:
  • same subscriptions for multiple receivers unlikely, hence filtering done per receiver thread

Code author: jwienke

Construct a new pool.

Parameters
  • thread_pool_size – number of threads for this pool

  • del_func – the strategy used to deliver messages of type M to receivers of type R. This will most likely be a simple delegate function mapping to a concrete method call. Must be reentrant. callable with two arguments. First is the receiver of a message, second is the message to deliver

  • filter_func – Reentrant function used to filter messages per receiver. Default accepts every message. callable with two arguments. First is the receiver of a message, second is the message to filter. Must return a bool, true means to deliver the message, false rejects it.

Type thread_pool_size

int >= 1

Type del_func

callable

Type filter_func

callable

push(message)

Push a new message to be dispatched to all receivers in this pool.

Parameters

message – message to dispatch

register_receiver(receiver)

Register a new receiver at the pool.

Multiple registrations of the same receiver are possible resulting in being called multiple times for the same message (but effectively this destroys the guarantee about ordering given above because multiple message queues are used for every subscription).

Parameters

receiver – new receiver

start()

Start processing and return immediately.

Raises RuntimeError

if the pool was already started and is running

stop()

Block until every thread has stopped working.

unregister_receiver(receiver)

Unregister all registrations of one receiver.

Parameters

receiver – receiver to unregister

Returns

True if one or more receivers were unregistered, else False

rsb.util.get_logger_by_class(klass)

Get a python logger instance based on a class instance.

The logger name will be a dotted string containing python module and class name.

Parameters

klass – class instance

Returns

logger instance

rsb.util.prefix()

Try to return the prefix that this code was installed into.

This is done by guessing the install location from some rules.

Adapted from http://ttboj.wordpress.com/2012/09/20/finding-your-software-install-prefix-from-inside-python/

Returns

string path with the install prefix or empty string if not known

rsb.util.time_to_unix_microseconds(time)

Convert a floating point, seconds based time to a unix microseconds.

Parameters

time – time since epoch in seconds + fractional part.

Returns

time as integer with microseconds precision.

rsb.util.unix_microseconds_to_time(value)