Source code for zope.publisher.xmlrpc

##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""XML-RPC Publisher

This module contains the XMLRPCRequest and XMLRPCResponse
"""
__docformat__ = 'restructuredtext'

import sys
import datetime
from io import BytesIO

import zope.component
import zope.interface
from zope.interface import implementer
from zope.publisher.interfaces.xmlrpc import \
        IXMLRPCPublisher, IXMLRPCRequest, IXMLRPCPremarshaller, IXMLRPCView

from zope.publisher.http import HTTPRequest, HTTPResponse, DirectResult
from zope.security.proxy import isinstance

if sys.version_info[0] == 2:
    import xmlrpclib
else:
    import xmlrpc.client as xmlrpclib

@implementer(IXMLRPCRequest)
class XMLRPCRequest(HTTPRequest):

    _args = ()

    def _createResponse(self):
        """Create a specific XML-RPC response object."""
        return XMLRPCResponse()

    def processInputs(self):
        'See IPublisherRequest'
        # Parse the request XML structure

        # Using lines() does not work as Twisted's BufferedStream sends back
        # an empty stream here for read() (bug). Using readlines() does not
        # work with paster.httpserver. However, readline() works fine.
        lines = b''
        while True:
            line = self._body_instream.readline()
            if not line:
                break
            lines += line
        self._args, function = xmlrpclib.loads(lines)

        # Translate '.' to '/' in function to represent object traversal.
        function = function.split('.')

        if function:
            self.setPathSuffix(function)


class TestRequest(XMLRPCRequest):

    def __init__(self, body_instream=None, environ=None, response=None, **kw):

        _testEnv =  {
            'SERVER_URL':         'http://127.0.0.1',
            'HTTP_HOST':          '127.0.0.1',
            'CONTENT_LENGTH':     '0',
            'GATEWAY_INTERFACE':  'TestFooInterface/1.0',
            }

        if environ:
            _testEnv.update(environ)
        if kw:
            _testEnv.update(kw)
        if body_instream is None:
            body_instream = BytesIO(b'')

        super(TestRequest, self).__init__(body_instream, _testEnv, response)


[docs]class XMLRPCResponse(HTTPResponse): """XMLRPC response. This object is responsible for converting all output to valid XML-RPC. """
[docs] def setResult(self, result): """Sets the result of the response Sets the return body equal to the (string) argument "body". Also updates the "content-length" return header. If the body is a 2-element tuple, then it will be treated as (title,body) If is_error is true then the HTML will be formatted as a Zope error message instead of a generic HTML page. """ body = premarshal(result) if isinstance(body, xmlrpclib.Fault): # Convert Fault object to XML-RPC response. body = xmlrpclib.dumps(body, methodresponse=True) else: # Marshall our body as an XML-RPC response. Strings will be sent # as strings, integers as integers, etc. We do *not* convert # everything to a string first. try: body = xmlrpclib.dumps((body,), methodresponse=True, allow_none=True) except: # We really want to catch all exceptions at this point! self.handleException(sys.exc_info()) return headers = [('content-type', 'text/xml;charset=utf-8'), ('content-length', str(len(body)))] self._headers.update(dict((k, [v]) for (k, v) in headers)) super(XMLRPCResponse, self).setResult(DirectResult((body,)))
[docs] def handleException(self, exc_info): """Handle Errors during publsihing and wrap it in XML-RPC XML >>> import sys >>> resp = XMLRPCResponse() >>> try: ... raise AttributeError('xyz') ... except: ... exc_info = sys.exc_info() ... resp.handleException(exc_info) >>> resp.getStatusString() '200 OK' >>> resp.getHeader('content-type') 'text/xml;charset=utf-8' >>> body = ''.join(resp.consumeBody()) >>> 'Unexpected Zope exception: AttributeError: xyz' in body True """ t, value = exc_info[:2] s = '%s: %s' % (getattr(t, '__name__', t), value) # Create an appropriate Fault object. Unfortunately, we throw away # most of the debugging information. More useful error reporting is # left as an exercise for the reader. Fault = xmlrpclib.Fault fault_text = None try: if isinstance(value, Fault): fault_text = value elif isinstance(value, Exception): fault_text = Fault(-1, "Unexpected Zope exception: " + s) else: fault_text = Fault(-2, "Unexpected Zope error value: " + s) except: fault_text = Fault(-3, "Unknown Zope fault type") # Do the damage. self.setResult(fault_text) # XML-RPC prefers a status of 200 ("ok") even when reporting errors. self.setStatus(200)
@implementer(IXMLRPCView)
[docs]class XMLRPCView(object): """A base XML-RPC view that can be used as mix-in for XML-RPC views.""" def __init__(self, context, request): self.context = context self.request = request
@implementer(IXMLRPCPremarshaller)
[docs]class PreMarshallerBase(object): """Abstract base class for pre-marshallers.""" def __init__(self, data): self.data = data def __call__(self): raise Exception("Not implemented")
@zope.component.adapter(dict)
[docs]class DictPreMarshaller(PreMarshallerBase): """Pre-marshaller for dicts""" def __call__(self): return dict([(premarshal(k), premarshal(v)) for (k, v) in self.data.items()])
@zope.component.adapter(list)
[docs]class ListPreMarshaller(PreMarshallerBase): """Pre-marshaller for list""" def __call__(self): return [premarshal(x) for x in self.data]
@zope.component.adapter(tuple) class TuplePreMarshaller(ListPreMarshaller): pass @zope.component.adapter(xmlrpclib.Binary)
[docs]class BinaryPreMarshaller(PreMarshallerBase): """Pre-marshaller for xmlrpc.Binary""" def __call__(self): return xmlrpclib.Binary(self.data.data)
@zope.component.adapter(xmlrpclib.Fault)
[docs]class FaultPreMarshaller(PreMarshallerBase): """Pre-marshaller for xmlrpc.Fault""" def __call__(self): return xmlrpclib.Fault( premarshal(self.data.faultCode), premarshal(self.data.faultString), )
@zope.component.adapter(xmlrpclib.DateTime)
[docs]class DateTimePreMarshaller(PreMarshallerBase): """Pre-marshaller for xmlrpc.DateTime""" def __call__(self): return xmlrpclib.DateTime(self.data.value)
@zope.component.adapter(datetime.datetime)
[docs]class PythonDateTimePreMarshaller(PreMarshallerBase): """Pre-marshaller for datetime.datetime""" def __call__(self): return xmlrpclib.DateTime(self.data.isoformat())
[docs]def premarshal(data): """Premarshal data before handing it to xmlrpclib for marhalling The initial purpose of this function is to remove security proxies without resorting to removeSecurityProxy. This way, we can avoid inadvertently providing access to data that should be protected. """ premarshaller = IXMLRPCPremarshaller(data, alternate=None) if premarshaller is not None: return premarshaller() return data