# Copyright (c) The PyAMF Project. # See LICENSE.txt for details. """ Twisted server implementation. This gateway allows you to expose functions in Twisted to AMF clients and servers. @see: U{Twisted homepage} @since: 0.1.0 """ import sys import os.path try: sys.path.remove('') except ValueError: pass try: sys.path.remove(os.path.dirname(os.path.abspath(__file__))) except ValueError: pass twisted = __import__('twisted') __import__('twisted.internet.defer') __import__('twisted.internet.threads') __import__('twisted.web.resource') __import__('twisted.web.server') defer = twisted.internet.defer threads = twisted.internet.threads resource = twisted.web.resource server = twisted.web.server from pyamf import remoting from pyamf.remoting import gateway, amf0, amf3 __all__ = ['TwistedGateway'] class AMF0RequestProcessor(amf0.RequestProcessor): """ A Twisted friendly implementation of L{amf0.RequestProcessor} """ def __call__(self, request, *args, **kwargs): """ Calls the underlying service method. @return: A C{Deferred} that will contain the AMF L{Response}. @rtype: C{twisted.internet.defer.Deferred} """ try: service_request = self.gateway.getServiceRequest( request, request.target) except gateway.UnknownServiceError: return defer.succeed(self.buildErrorResponse(request)) response = remoting.Response(None) deferred_response = defer.Deferred() def eb(failure): errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) if self.gateway.logger: self.gateway.logger.error(errMesg) self.gateway.logger.info(failure.getTraceback()) deferred_response.callback(self.buildErrorResponse( request, (failure.type, failure.value, failure.tb))) def response_cb(result): if self.gateway.logger: self.gateway.logger.debug("AMF Response: %s" % (result,)) response.body = result deferred_response.callback(response) def preprocess_cb(result): d = defer.maybeDeferred(self._getBody, request, response, service_request, **kwargs) d.addCallback(response_cb).addErrback(eb) def auth_cb(result): if result is not True: response.status = remoting.STATUS_ERROR response.body = remoting.ErrorFault(code='AuthenticationError', description='Authentication failed') deferred_response.callback(response) return d = defer.maybeDeferred(self.gateway.preprocessRequest, service_request, *args, **kwargs) d.addCallback(preprocess_cb).addErrback(eb) # we have a valid service, now attempt authentication d = defer.maybeDeferred(self.authenticateRequest, request, service_request, **kwargs) d.addCallback(auth_cb).addErrback(eb) return deferred_response class AMF3RequestProcessor(amf3.RequestProcessor): """ A Twisted friendly implementation of L{amf3.RequestProcessor} """ def _processRemotingMessage(self, amf_request, ro_request, **kwargs): ro_response = amf3.generate_acknowledgement(ro_request) try: service_name = ro_request.operation if hasattr(ro_request, 'destination') and ro_request.destination: service_name = '%s.%s' % (ro_request.destination, service_name) service_request = self.gateway.getServiceRequest(amf_request, service_name) except gateway.UnknownServiceError: return defer.succeed(remoting.Response( self.buildErrorResponse(ro_request), status=remoting.STATUS_ERROR)) deferred_response = defer.Deferred() def eb(failure): errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) if self.gateway.logger: self.gateway.logger.error(errMesg) self.gateway.logger.error(failure.getTraceback()) ro_response = self.buildErrorResponse(ro_request, (failure.type, failure.value, failure.tb)) deferred_response.callback(remoting.Response(ro_response, status=remoting.STATUS_ERROR)) def response_cb(result): ro_response.body = result res = remoting.Response(ro_response) if self.gateway.logger: self.gateway.logger.debug("AMF Response: %r" % (res,)) deferred_response.callback(res) def process_cb(result): d = defer.maybeDeferred(self.gateway.callServiceRequest, service_request, *ro_request.body, **kwargs) d.addCallback(response_cb).addErrback(eb) d = defer.maybeDeferred(self.gateway.preprocessRequest, service_request, *ro_request.body, **kwargs) d.addCallback(process_cb).addErrback(eb) return deferred_response def __call__(self, amf_request, **kwargs): """ Calls the underlying service method. @return: A C{deferred} that will contain the AMF L{Response}. @rtype: C{Deferred} """ deferred_response = defer.Deferred() ro_request = amf_request.body[0] def cb(amf_response): deferred_response.callback(amf_response) def eb(failure): errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) if self.gateway.logger: self.gateway.logger.error(errMesg) self.gateway.logger.error(failure.getTraceback()) deferred_response.callback(self.buildErrorResponse(ro_request, (failure.type, failure.value, failure.tb))) d = defer.maybeDeferred(self._getBody, amf_request, ro_request, **kwargs) d.addCallback(cb).addErrback(eb) return deferred_response class TwistedGateway(gateway.BaseGateway, resource.Resource): """ Twisted Remoting gateway for C{twisted.web}. @ivar expose_request: Forces the underlying HTTP request to be the first argument to any service call. @type expose_request: C{bool} """ allowedMethods = ('POST',) def __init__(self, *args, **kwargs): if 'expose_request' not in kwargs: kwargs['expose_request'] = True gateway.BaseGateway.__init__(self, *args, **kwargs) resource.Resource.__init__(self) def _finaliseRequest(self, request, status, content, mimetype='text/plain'): """ Finalises the request. @param request: The HTTP Request. @type request: C{http.Request} @param status: The HTTP status code. @type status: C{int} @param content: The content of the response. @type content: C{str} @param mimetype: The MIME type of the request. @type mimetype: C{str} """ request.setResponseCode(status) request.setHeader("Content-Type", mimetype) request.setHeader("Content-Length", str(len(content))) request.setHeader("Server", gateway.SERVER_NAME) request.write(content) request.finish() def render_POST(self, request): """ Read remoting request from the client. @type request: The HTTP Request. @param request: C{twisted.web.http.Request} """ def handleDecodeError(failure): """ Return HTTP 400 Bad Request. """ errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) if self.logger: self.logger.error(errMesg) self.logger.error(failure.getTraceback()) body = "400 Bad Request\n\nThe request body was unable to " \ "be successfully decoded." if self.debug: body += "\n\nTraceback:\n\n%s" % failure.getTraceback() self._finaliseRequest(request, 400, body) request.content.seek(0, 0) timezone_offset = self._get_timezone_offset() d = threads.deferToThread(remoting.decode, request.content.read(), strict=self.strict, logger=self.logger, timezone_offset=timezone_offset) def cb(amf_request): if self.logger: self.logger.debug("AMF Request: %r" % amf_request) x = self.getResponse(request, amf_request) x.addCallback(self.sendResponse, request) # Process the request d.addCallback(cb).addErrback(handleDecodeError) return server.NOT_DONE_YET def sendResponse(self, amf_response, request): def cb(result): self._finaliseRequest(request, 200, result.getvalue(), remoting.CONTENT_TYPE) def eb(failure): """ Return 500 Internal Server Error. """ errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) if self.logger: self.logger.error(errMesg) self.logger.error(failure.getTraceback()) body = "500 Internal Server Error\n\nThere was an error encoding " \ "the response." if self.debug: body += "\n\nTraceback:\n\n%s" % failure.getTraceback() self._finaliseRequest(request, 500, body) timezone_offset = self._get_timezone_offset() d = threads.deferToThread(remoting.encode, amf_response, strict=self.strict, logger=self.logger, timezone_offset=timezone_offset) d.addCallback(cb).addErrback(eb) def getProcessor(self, request): """ Determines the request processor, based on the request. @param request: The AMF message. @type request: L{Request} """ if request.target == 'null': return AMF3RequestProcessor(self) return AMF0RequestProcessor(self) def getResponse(self, http_request, amf_request): """ Processes the AMF request, returning an AMF L{Response}. @param http_request: The underlying HTTP Request @type http_request: C{twisted.web.http.Request} @param amf_request: The AMF Request. @type amf_request: L{Envelope} """ response = remoting.Envelope(amf_request.amfVersion) dl = [] def cb(body, name): response[name] = body for name, message in amf_request: processor = self.getProcessor(message) http_request.amf_request = message d = defer.maybeDeferred( processor, message, http_request=http_request) dl.append(d.addCallback(cb, name)) def cb2(result): return response def eb(failure): """ Return 500 Internal Server Error. """ errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) if self.logger: self.logger.error(errMesg) self.logger.error(failure.getTraceback()) body = "500 Internal Server Error\n\nThe request was unable to " \ "be successfully processed." if self.debug: body += "\n\nTraceback:\n\n%s" % failure.getTraceback() self._finaliseRequest(http_request, 500, body) d = defer.DeferredList(dl) return d.addCallback(cb2).addErrback(eb) def authenticateRequest(self, service_request, username, password, **kwargs): """ Processes an authentication request. If no authenticator is supplied, then authentication succeeds. @return: C{Deferred}. @rtype: C{twisted.internet.defer.Deferred} """ authenticator = self.getAuthenticator(service_request) if self.logger: self.logger.debug('Authenticator expands to: %r' % authenticator) if authenticator is None: return defer.succeed(True) args = (username, password) if hasattr(authenticator, '_pyamf_expose_request'): http_request = kwargs.get('http_request', None) args = (http_request,) + args return defer.maybeDeferred(authenticator, *args) def preprocessRequest(self, service_request, *args, **kwargs): """ Preprocesses a request. """ processor = self.getPreprocessor(service_request) if self.logger: self.logger.debug('Preprocessor expands to: %r' % processor) if processor is None: return args = (service_request,) + args if hasattr(processor, '_pyamf_expose_request'): http_request = kwargs.get('http_request', None) args = (http_request,) + args return defer.maybeDeferred(processor, *args)