Source code for fantastico.oauth2.middleware.exceptions_middleware

'''
Copyright 2013 Cosnita Radu Viorel

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

.. codeauthor:: Radu Viorel Cosnita <radu.cosnita@gmail.com>
.. py:module:: fantastico.oauth2.middleware.exceptions_middleware
'''
from fantastico.oauth2.exceptions import OAuth2MissingQueryParamError, OAuth2AuthenticationError, OAuth2InvalidClientError, \
    OAuth2Error, OAuth2UnsupportedGrantError
from fantastico.routing_engine.custom_responses import RedirectResponse
from fantastico.settings import SettingsFacade
from webob.response import Response
import json
from fantastico.exception_formatters import ExceptionFormattersFactory

[docs]class OAuth2ExceptionsMiddleware(object): '''This class provides the support for dynamically casting OAuth2 errors into concrete error responses. At the moment responses are returned only in english and have the format specified in RFC6749. Mainly, at each intercepted OAuth2 exceptions a json response is returned to the client.''' def __init__(self, app, settings_facade_cls=SettingsFacade, exceptions_factory_cls=ExceptionFormattersFactory): self._app = app self._doc_base = settings_facade_cls().get("doc_base") self._exceptions_factory = exceptions_factory_cls() def __call__(self, environ, start_response): body = None http_code = None try: return self._app(environ, start_response) except OAuth2MissingQueryParamError as ex: body = {"error": "invalid_request", "error_description": "%s query parameter is mandatory." % ex.param_name, "error_uri": self._get_error_uri(ex.error_code)} http_code = ex.http_code except OAuth2AuthenticationError as ex: body = {"error": "access_denied", "error_description": str(ex), "error_uri": self._get_error_uri(ex.error_code)} http_code = ex.http_code except OAuth2InvalidClientError as ex: body = {"error": "invalid_client", "error_description": str(ex), "error_uri": self._get_error_uri(ex.error_code)} http_code = ex.http_code except OAuth2UnsupportedGrantError as ex: body = {"error": "unsupported_grant_type", "error_description": str(ex), "error_uri": self._get_error_uri(ex.error_code)} http_code = ex.http_code except OAuth2Error as ex: body = {"error": "server_error", "error_description": str(ex), "error_uri": self._get_error_uri(ex.error_code)} if ex.http_code >= 500: http_code = 400 else: http_code = ex.http_code response = self._build_response(environ, body, http_code) start_response(response.status, response.headerlist) return [response.body] def _get_error_uri(self, error_code): '''This method return the error_uri value based on the given error code. It points to a Fantastico official documentation page.''' return "%sfeatures/oauth2/exceptions/%s.html" % (self._doc_base, error_code) def _build_response(self, environ, body, http_code): '''This method builds transforms the given body into a wsgi response object.''' curr_request = environ["fantastico.request"] redirect_uri = curr_request.params.get("redirect_uri") ex_format = curr_request.params.get("error_format", ExceptionFormattersFactory.JSON) ex_formatter = self._exceptions_factory.get_formatter(ex_format) response = ex_formatter.format_ex(body, ctx={"redirect_uri": redirect_uri}) if ex_format == ExceptionFormattersFactory.JSON: response = Response(body=json.dumps(response).encode(), content_type="application/json; charset=UTF-8", status=http_code) self._add_cors_headers(response) return response response = RedirectResponse(response) self._add_cors_headers(response) return response def _add_cors_headers(self, response): '''This method adds the cors headers required to support cross domain requests exception serialization.''' response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "OPTIONS,GET,POST,PUT,DELETE"