Source code for geeteventbus.eventbus
''' Main module implementing the event bus '''
from atexit import register
from time import time
from threading import Lock, Thread, current_thread
import logging
import sys
from zlib import crc32
from geeteventbus.event import event
from geeteventbus.subscriber import subscriber
PY_VER_2 = True
if sys.version[0] == '2':
from Queue import Queue, Empty
else:
from queue import Queue, Empty
PY_VER_2 = False
MAX_TOPIC_INDEX = 16 # Must be power of 2
DEFAULT_EXECUTOR_COUNT = 8
MAX_EXECUTOR_COUNT = 1024
MIN_EXECUTOR_COUNT = 1
MAX_EXECUTOR_COUNT = 128
MAXIMUM_QUEUE_LENGTH = 25600
MINIMUM_QUEUE_LENGTH = 16
[docs]def get_crc32(data):
'''Returns the crc32 value of the input string. '''
if PY_VER_2:
return crc32(data)
strbytes = bytes(data, encoding='UTF-8')
return crc32(strbytes)
[docs]class eventbus:
def __init__(self, max_queued_event=10000, executor_count=DEFAULT_EXECUTOR_COUNT,
synchronus=False, subscribers_thread_safe=True):
'''
Creates an eventbus object
:param max_queued_event: total number of un-ordered events queued.
:type max_queued_event: int
:param executor_count: number of threads to process the queued event by calling the
corresponding subscribers.
:type executor_count: int
:param synchronus: if the events are processed synchronously, i.e. if they are proccessed
by subscribers on the same event submitting thread.
:type synchronus: bool
:param subscribers_thread_safe: if the subscribers can be invoked for processing multiple
events simultaneously.
:type subscribers_thread_safe: bool
'''
register(self.shutdown)
self.synchronus = synchronus
self.subscribers_thread_safe = subscribers_thread_safe
self.topics = MAX_TOPIC_INDEX * [{}]
self.index_locks = []
self.consumers = {}
self.consumers_lock = Lock()
self.shutdown_lock = Lock()
self.subscriber_locks = {}
self.keep_running = True
self.stop_time = 0
i = 0
while i < MAX_TOPIC_INDEX:
self.index_locks.append(Lock())
i += 1
if not self.synchronus:
self.event_queue = Queue(max_queued_event)
self.event_queue_size = MAXIMUM_QUEUE_LENGTH
if max_queued_event >= MINIMUM_QUEUE_LENGTH and max_queued_event <= \
MAXIMUM_QUEUE_LENGTH:
self.event_queue_size = max_queued_event
self.executor_count = executor_count
if executor_count < MIN_EXECUTOR_COUNT or executor_count > MAX_EXECUTOR_COUNT:
self.executor_count = DEFAULT_EXECUTOR_COUNT
self.executors = []
self.grouped_events = []
self.thread_specific_queue = {}
i = 0
while i < self.executor_count:
name = 'executor_thread_' + str(i)
thrd = Thread(target=self, name=name)
self.executors.append(thrd)
grouped_events_queue = Queue()
self.grouped_events.append(grouped_events_queue)
self.thread_specific_queue[name] = grouped_events_queue
i += 1
for thrd in self.executors:
thrd.start()
else:
def __post_synchronous(eventobj):
topic = eventobj.get_topic()
subscribers = self.get_subscribers(topic)
if subscribers is not None:
for subscr in subscribers:
try:
subscr.process(eventobj)
except Exception as e:
logging.error(e)
self.post_synchronous = __post_synchronous
[docs] def post(self, eventobj):
'''
posts an event to the eventbus
:param eventobj: the event posted. It must be type of event class or its subclass.
:returns: True if event is successfully postedb, False otherwise.
'''
if not isinstance(eventobj, event):
logging.error('Invalid data passed. You must pass an event instance')
return False
if not self.keep_running:
return False
if not self.synchronus:
ordered = eventobj.get_ordered()
if ordered is not None:
indx = (abs(get_crc32(ordered)) & (MAX_EXECUTOR_COUNT - 1)) % self.executor_count
queue = self.grouped_events[indx]
queue.put(eventobj)
else:
self.event_queue.put(eventobj)
else:
self.post_synchronous(eventobj)
return True
[docs] def register_consumer_topics(self, consumer, topic_list):
'''
registers a consumer to the eventbus that subscribes to the list of topics in topic_list
:param consumer: the subscriber object
:param topic_list: the list of topics the consumer will subscribe for
'''
for topic in topic_list:
self.register_consumer(consumer, topic)
[docs] def register_consumer(self, consumer, topic):
'''
registers a consumer to the eventbus that subscribes to a topic
:param consumer: the subscriber object
:type consumer: subscriber subclass
:param topic: the topic the consumer will subscribe for
:type topic: str
'''
if not isinstance(consumer, subscriber):
return False
indexval = get_crc32(topic) & (MAX_TOPIC_INDEX - 1)
with self.consumers_lock:
with self.index_locks[indexval]:
if topic not in self.topics[indexval]:
self.topics[indexval][topic] = [consumer]
elif consumer not in self.topics[indexval][topic]:
self.topics[indexval][topic].append(consumer)
if consumer not in self.consumers:
self.consumers[consumer] = [topic]
elif topic not in self.consumers[consumer]:
self.consumers[consumer].append(topic)
if not self.subscribers_thread_safe:
if consumer not in self.subscriber_locks:
self.subscriber_locks[consumer] = Lock()
[docs] def unregister_consumer(self, consumer):
'''
Unregister the consumer.
The consumer will no longer receieve any event to process for any topic
:param conumer: the subscriber object to unregister
'''
with self.consumers_lock:
subscribed_topics = None
if consumer in self.consumers:
subscribed_topics = self.consumers[consumer]
del self.consumers[consumer]
if self.subscribers_thread_safe and (consumer in self.subscriber_locks):
del self.subscriber_locks[consumer]
if subscribed_topics is None:
return
for topic in subscribed_topics:
indexval = get_crc32(topic) & (MAX_TOPIC_INDEX - 1)
with self.index_locks[indexval]:
if (topic in self.topics[indexval]) and (consumer in
self.topics[indexval][topic]):
self.topics[indexval][topic].remove(consumer)
if len(self.topics[indexval][topic]) == 0:
del self.topics[indexval][topic]
[docs] def is_subscribed(self, consumer, topic):
'''
Checks if a subscriber is a consumer for some topic
:returns: True if the consumer is subscribing to the topic
:rtype: bool
'''
if not isinstance(consumer, subscriber):
logging.error('Invalid object passed')
return False
indexval = get_crc32(topic) & (MAX_TOPIC_INDEX - 1)
with self.index_locks[indexval]:
if topic not in self.topics[indexval]:
return False
return consumer in self.topics[indexval][topic]
[docs] def get_subscribers(self, topic):
'''
Returns the list of subscribes currently registered for the topic.
'''
indexval = get_crc32(topic) & (MAX_TOPIC_INDEX - 1)
with self.index_locks[indexval]:
if topic not in self.topics[indexval]:
return None
ret = self.topics[indexval][topic][:]
return ret
def __call__(self):
thread_specific_queue = self.thread_specific_queue[current_thread().getName()]
fromqueue = None
while True:
if self.stop_time > 0:
if time() < self.stop_time:
break
eventobj = None
try:
if not thread_specific_queue.empty():
eventobj = thread_specific_queue.get()
fromqueue = thread_specific_queue
else:
eventobj = self.event_queue.get(timeout=0.1)
fromqueue = self.event_queue
except Empty as e:
continue
except Exception as e:
logging.error(e)
continue
fromqueue.task_done() # No harm, announce task done upfront
topic = eventobj.get_topic()
subscribers = self.get_subscribers(topic)
if subscribers is not None:
for subscr in subscribers:
lock = None
if not self.subscribers_thread_safe:
try:
lock = self.subscriber_locks[subscr]
except KeyError as e:
logging.error(e)
continue
if lock is not None:
lock.acquire()
try:
subscr.process(eventobj)
except Exception as e:
logging.error(e)
if lock is not None:
lock.release()
[docs] def shutdown(self):
'''
Stops the event bus. The event bus will stop all its executor threads.
It will try to flush out already queued events by calling the subscribers
of the events. This flush wait time is 2 seconds.
'''
with self.shutdown_lock:
if not self.keep_running:
return
self.keep_running = False
self.stop_time = time() + 2
if not self.synchronus:
for thrd in self.executors:
thrd.join()