Source code for thriftworker.app
"""ThriftWorker.
Distribute thrift requests between workers.
"""
from __future__ import absolute_import
import logging
from pyuv import Loop
from thrift.protocol import TBinaryProtocol
from .transports.base import Acceptors
from .state import set_current_app, get_current_app
from .listener import Listener, Listeners
from .hub import Hub
from .services import Services
from .utils.decorators import cached_property
from .utils.mixin import SubclassMixin
from .utils.stats import Counters, Timers
logger = logging.getLogger(__name__)
[docs]class ThriftWorker(SubclassMixin):
"""Store global application state. Also acts as factory for whole
application.
"""
acceptor_cls = 'thriftworker.transports.framed:FramedAcceptor'
def __init__(self, loop=None, protocol_factory=None, port_range=None,
pool_size=None, shutdown_timeout=None):
self.counters = Counters()
self.timeouts = Timers()
self.execution_timers = Timers()
self.dispatching_timers = Timers()
# Set provided instance if we can.
if loop is not None:
self.loop = loop
if protocol_factory is not None:
self.protocol_factory = protocol_factory
self.port_range = port_range
self.pool_size = pool_size
self.shutdown_timeout = shutdown_timeout or 30.0
super(ThriftWorker, self).__init__()
set_current_app(self)
@cached_property
def pool_size(self):
"""Return default pool size."""
return 1
@pool_size.setter
[docs] def pool_size(self, value):
if value is not None and value < 0:
raise ValueError('Pool size can not be negative.')
return int(value or 1) or 1
@cached_property
def port_range(self):
"""Return range from which we allowed to allocate ports."""
return None
@port_range.setter
[docs] def port_range(self, value):
if value is None:
return None
try:
start, stop = int(value[0]), int(value[1])
except (IndexError, ValueError):
raise ValueError('Port range must be tuple of two integers.')
return (start, stop)
@cached_property
[docs] def protocol_factory(self):
"""Specify which protocol should be used."""
return TBinaryProtocol.TBinaryProtocolAcceleratedFactory()
@classmethod
[docs] def default(cls):
"""Return default application instance."""
try:
app = cls.instance()
except RuntimeError:
app = cls()
return app
@staticmethod
[docs] def instance():
"""Return global instance."""
return get_current_app()
@cached_property
[docs] def loop(self):
"""Create event loop. Should be running in separate thread."""
return Loop()
@cached_property
[docs] def Hub(self):
"""Create bounded :class:`Hub` class."""
return self.subclass_with_self(Hub)
@cached_property
[docs] def hub(self):
"""Instance of bounded :class:`LoopContainer`."""
return self.Hub()
@cached_property
[docs] def Services(self):
"""Create bounded :class:`Processor` class."""
return self.subclass_with_self(Services)
@cached_property
[docs] def services(self):
"""Create global request processor instance."""
return self.Services()
@cached_property
[docs] def Listener(self):
"""Create bounded :class:`Listener` class."""
return self.subclass_with_self(Listener)
@cached_property
[docs] def Listeners(self):
"""Create bounded :class:`Listeners` class."""
return self.subclass_with_self(Listeners)
@cached_property
[docs] def listeners(self):
"""Create pool of listeners."""
return self.Listeners()
@cached_property
[docs] def Acceptor(self):
return self.subclass_with_self(self.acceptor_cls, reverse='Acceptor')
@cached_property
[docs] def Acceptors(self):
return self.subclass_with_self(Acceptors)
@cached_property
[docs] def acceptors(self):
"""Create pool of acceptors."""
return self.Acceptors()
@property
[docs] def worker_cls(self):
if self.pool_size == 1:
return 'thriftworker.workers.sync:SyncWorker'
else:
return 'thriftworker.workers.threads:ThreadsWorker'
@cached_property
[docs] def Worker(self):
return self.subclass_with_self(self.worker_cls, reverse='Worker')
@cached_property
[docs] def worker(self):
"""Create some worker routine."""
logger.debug('Using {0!r} worker'.format(self.Worker))
return self.Worker(self.pool_size)