# -*- coding: utf-8 -*-
import datetime
import os
import socket
import traceback
import types
from abc import abstractmethod
import falcon
import gevent
from gevent.pool import Pool
from smapy.runnable import Runnable
from smapy.utils import get_bool, get_ms
[docs]class BaseResource(Runnable):
"""
TODO:
- implement session tracking
- implement exception tracking
- implement generic input processing and validation
"""
response_field = None # Only return this field in the response
sync = None # If True, make this resource always synchronous
audit = True # If False, skip session creation and audit tracking for this resource
@classmethod
def init(cls, api, route):
super(BaseResource, cls).init(api)
cls.route = route
cls.endpoint = api.endpoint + route
# ###########################
# ### main resource flow ####
# ###########################
[docs] @abstractmethod
def process(self, message):
"""The actual resource code should be implemented here by subclasses."""
@classmethod
def _is_sync(cls, request):
if 'sync' in request.params:
return get_bool(request.params, 'sync')
elif cls.sync is not None:
return cls.sync
return cls.conf['api'].get('sync', False)
@classmethod
def start_session(cls, request, sync):
session = {
'status': 'RUNNING',
'alive': True,
'sync': sync,
'resource': cls.name,
'in_ts': request.context['in_ts'],
'body': request.body,
'params': request.params,
'pid': os.getpid(),
'host': socket.gethostname(),
'env': {k: v for k, v in request.env.items() if '.' not in k}
}
request.context['session'] = cls.mongodb.session.insert(session)
request.context['internal'] = False
request.context['sync'] = sync
cls.logger.info("Starting new session %s.", session['_id'])
return sync
def end_session(self, response, status='OK'):
in_ts = self.context['in_ts']
out_ts = datetime.datetime.utcnow()
self.context['out_ts'] = out_ts
elapsed = get_ms(out_ts - in_ts)
self.context['elapsed'] = elapsed
if not self.context['internal']:
match = {
'_id': self.session
}
elapsed = get_ms(out_ts - in_ts)
update = {
'$set': {
'out_ts': out_ts,
'elapsed': elapsed,
'response': response,
'status': status,
'alive': False,
}
}
self.mongodb.session.update_one(match, update)
self.logger.info("Ending session %s. Status: %s, Elapsed: %sms",
self.session, status, elapsed)
[docs] def run_local(self, message):
response = self.process(message) or message
if response and self.response_field:
response = response.get(self.response_field)
return response
def _run_public(self, body):
status = 'OK'
try:
response = self.run_local(body)
except BaseException as ex:
self.logger.exception("Caught an uncontrolled Exception")
status = 'EXCEPTION'
exception = "{}: {}".format(ex.__class__.__name__, ex),
tb = traceback.format_exc().splitlines()[-3:]
response = {
'status': status,
'exception': exception,
'traceback': tb
}
raise falcon.HTTPInternalServerError("Uncontrolled Exception", tb)
finally:
if self.audit:
self.end_session(response, status)
return response
@classmethod
def run_public(cls, request, body):
sync = cls._is_sync(request)
request.context['sync'] = sync
request.context['audit'] = cls.audit
if cls.audit:
sync = cls.start_session(request, sync)
resource = cls(request)
if sync:
return resource._run_public(body)
else:
gevent.spawn(resource._run_public, body)
@classmethod
def on_get(cls, request, response):
response.body = cls.run_public(request, request.params)
@classmethod
def on_post(cls, request, response):
response.body = cls.run_public(request, request.body)
# ##########################
# ### running runnables ####
# ##########################
def _get_runnable(self, runnable):
runnable_class = self.api.get_runnable(runnable)
return runnable_class(self.request)
@staticmethod
def _is_many(messages):
if isinstance(messages, types.GeneratorType):
return True
elif not isinstance(messages, list):
return False
else:
return len(messages) > 1
def _run_one(self, runnable, messages, concurrency, remote, callback=None):
"""Run a single runnable on a single or many messages."""
if self._is_many(messages):
if concurrency == 1:
# Skip gevent usage
for message in messages:
runnable_ = self._get_runnable(runnable)
runnable_.run(message, remote, callback)
else:
# run the same runnable on many messages concurrently
pool = Pool(concurrency)
greenlets = []
for message in messages:
runnable_ = self._get_runnable(runnable)
greenlet = pool.spawn(runnable_.run, message, remote, callback)
greenlets.append(greenlet)
gevent.wait(greenlets)
else:
# messages is actually a single message, so skip the gevent part
if isinstance(messages, list):
message = messages[0]
else:
message = messages
runnable_ = self._get_runnable(runnable)
runnable_.run(message, remote, callback)
def _run_many(self, runnables, messages, concurrency, remote):
"""Run many runnables on a single or many messages."""
if isinstance(messages, list):
# each message corresponds to a single runnable, so we validate the list lengths
if len(messages) != len(runnables):
raise falcon.HTTPInternalServerError(
'Invalid Arguments',
'messages and runnables lists should have the same length'
)
else:
# We convert the message into a list of messages, to make
# the next block of code behave in a uniform way
messages = [messages] * len(runnables)
pool = Pool(concurrency)
greenlets = []
for runnable, message in zip(runnables, messages):
greenlet = pool.spawn(self._run_one, runnable, message, 1, remote)
greenlets.append(greenlet)
gevent.wait(greenlets)
def invoke(self, runnable, message=None, concurrency=None, remote=False, callback=None):
concurrency = int(concurrency) if concurrency else self.conf['api'].get('concurrency', 10)
if isinstance(runnable, list):
if callback:
raise NotImplementedError("Callback functions work only on single runnables")
self._run_many(runnable, message, concurrency, remote)
else:
self._run_one(runnable, message, concurrency, remote, callback)