Neutron核心资源API流程分析

发表于 2017-12-14   |   分类于 技术

neutron-server服务启动完成后,就能接收API请求并做出相应的处理了,本文就来分析一下neutron server是如何处理各API请求的。
neutron-server启动的过程可以参考:https://blog.try-except.com/technology/neutron-server-start.html

通过上述的文章,我们可知道neutron-server的api分为了两类:
1、对/的访问(获取neutron version信息)
2、对/v2.0的访问(业务处理接口)
每一类的请求对应的wsgi application都不一样:
1、对应/的wsgi application是neutron.api.versions:Versions对象
2、对应/v2.0的wsgi application是neutron.api.v2.router:APIRouter对象

在neutron-server启动的过程中会实例化这两个对象(根据一开始给出文章,可以知道是使用的paste的deploy.loadapp方法)。
先来看一下这两个中比较简单的neutron.api.versions:Versions对象

class Versions(object):

    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(app=None)

    @webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):
        """Respond to a request for all Neutron API versions."""
        version_objs = [
            {
                "id": "v2.0",
                "status": "CURRENT",
            },
        ]

        if req.path != '/':
            if self.app:
                return req.get_response(self.app)
            language = req.best_match_language()
            msg = _('Unknown API version specified')
            msg = oslo_i18n.translate(msg, language)
            return webob.exc.HTTPNotFound(explanation=msg)

        builder = versions_view.get_view_builder(req)
        versions = [builder.build(version) for version in version_objs]
        response = dict(versions=versions)
        metadata = {}

        content_type = req.best_match_content_type()
        body = (wsgi.Serializer(metadata=metadata).
                serialize(response, content_type))

        response = webob.Response()
        response.content_type = content_type
        response.body = wsgi.encode_body(body)

        return response

    def __init__(self, app):
        self.app = app

由配置文件(api-paste.conf)内容:

[app:neutronversions]
paste.app_factory = neutron.api.versions:Versions.factory

得到该类的实例是通过其指定的构造方法来创建的:

    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(app=None)

这个构造方法中,在实例化自身的时候设置了一个可选参数app。这样做的目的是:如果该类对象用作middleware的时候,可以传入app来完成构造。
我们知道wsgi application应该是一个可调用的对象,它可以是一个函数或者是一个实现了__call__方法的实例,这里属于后者。当它处理请求时,就会调用到__call__方法。

    @webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):
        """Respond to a request for all Neutron API versions."""
        version_objs = [
            {
                "id": "v2.0",
                "status": "CURRENT",
            },
        ]

        if req.path != '/':
            if self.app:
                return req.get_response(self.app)
            language = req.best_match_language()
            msg = _('Unknown API version specified')
            msg = oslo_i18n.translate(msg, language)
            return webob.exc.HTTPNotFound(explanation=msg)

        builder = versions_view.get_view_builder(req)
        versions = [builder.build(version) for version in version_objs]
        response = dict(versions=versions)
        metadata = {}

        content_type = req.best_match_content_type()
        body = (wsgi.Serializer(metadata=metadata).
                serialize(response, content_type))

        response = webob.Response()
        response.content_type = content_type
        response.body = wsgi.encode_body(body)

        return response

装饰器webob.dec.wsgify是为了将__call__转换成符合WSGI规范的函数。
这个方法的逻辑还是很简单的,只是用来返回一个固定的版本信息。
下面重点分析一下neutron.api.v2.router:APIRouter对象。
一样的先看一下对应的配置内容:

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

指定了factory方法:

    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(**local_config)

factory直接返回了一个对象的实例,所以下面看一下__init__方法:

    def __init__(self, **local_config):
        mapper = routes_mapper.Mapper()
        manager.init()
        plugin = directory.get_plugin()
        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

        col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                          member_actions=MEMBER_ACTIONS)

        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=True,
                allow_sorting=True)
            path_prefix = None
            if parent:
                path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)

        mapper.connect('index', '/', controller=Index(RESOURCES))
        for resource in RESOURCES:
            _map_resource(RESOURCES[resource], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              RESOURCES[resource], dict()))
            resource_registry.register_resource_by_name(resource)

        for resource in SUB_RESOURCES:
            _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              SUB_RESOURCES[resource]['collection_name'],
                              dict()),
                          SUB_RESOURCES[resource]['parent'])

        # Certain policy checks require that the extensions are loaded
        # and the RESOURCE_ATTRIBUTE_MAP populated before they can be
        # properly initialized. This can only be claimed with certainty
        # once this point in the code has been reached. In the event
        # that the policies have been initialized before this point,
        # calling reset will cause the next policy check to
        # re-initialize with all of the required data in place.
        policy.reset()
        super(APIRouter, self).__init__(mapper)

在理解上面的代码前需要先知道python的routes模块的一些用法,具体的可以自行百度。

  • routes是用python重新实现的Rails routes系统,用来映射url与应用程序的系统
  • controller是规则匹配后处理请求的对象,必须是可调用的对象。
  • connect方法用来创建并注册一条新的路由
  • collection方法用来同时创建并注册多条路由,collection_actions对应collection路由,member_actions对应member路由。

方法中一开始就创建了一个路由实例对象mapper,然后遍历了RESOURCES字典,为每个resource调用_map_resource方法:

        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=True,
                allow_sorting=True)
            path_prefix = None
            if parent:
                path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)

在这个方法中创建了resource和collection对应的controller对象,然后通过mapper.collection方法完成对resource和collection路由的创建。
mapper中的每一个路由都必须指定一个controller作为WSGI app被调用。所以这里关注一下controller的创建,base.create_resource方法如下:

def create_resource(collection, resource, plugin, params, allow_bulk=False,
                    member_actions=None, parent=None, allow_pagination=False,
                    allow_sorting=False):
    controller = Controller(plugin, collection, resource, params, allow_bulk,
                            member_actions=member_actions, parent=parent,
                            allow_pagination=allow_pagination,
                            allow_sorting=allow_sorting)

    return wsgi_resource.Resource(controller, FAULT_MAP)

这里需要特别注意,外部获得的controller其实是该方法最终返回的是wsgi_resource.Resource(controller, FAULT_MAP)方法创建的对象。这里我们停一下,回到上面的__init__方法。在设置完资源的路由之后,继续调用了父类的__init__方法,并且传入了routes模块的mapper对象:

    def __init__(self, mapper):
        self.map = mapper
        self._router = routes.middleware.RoutesMiddleware(self._dispatch,
                                                          self.map)

    @staticmethod
    @webob.dec.wsgify(RequestClass=Request)
    def _dispatch(req):
        match = req.environ['wsgiorg.routing_args'][1]
        if not match:
            return webob.exc.HTTPNotFound()
        app = match['controller']
        return app

__init__方法中只做了两件事:将mapper对象传给了map属性,创建了一个middlerware对象(RoutesMiddleware对象)。
RoutesMiddleware的构造方法:

 def __init__(self, wsgi_app, mapper, use_method_override=True, 
                 path_info=True, singleton=True):
        self.app = wsgi_app
        self.mapper = mapper
        ...略...

第一个传入的参数是一个wsgi app,也就是self._dispatch方法。在RoutesMiddleware的__call__方法中会根据mapper中设定的路由,将请求匹配到对应的controller,匹配的结果会保存在环境变量中,然后调用构造时传入的WSGI app即self._dispatch

所以当一个HTTP请求被neutron接收,被一系列的filter(middleware,在api-paste.ini中指定)处理之后交给APIRouter对象处理,也就是调用了APIRouter的__call__方法:

    @webob.dec.wsgify(RequestClass=Request)
    def __call__(self, req):
        return self._router

返回了_router也就是一个RoutesMiddleware对象,通过它的__call__方法路由匹配之后就调用到了_dispatch方法,_dispatch方法中返回了路由匹配到的controller对象,继续把请求交给这个controller对象处理。

被webob.dec.wsgify装饰的对象,返回的结果有以下三种情况:

  • 返回字符串: wsgify将其作为body,构成HTTP响应返回。
  • 返回Response对象: 直接作为HTTP响应返回。
  • 返回一个wsgi app: wsgify会继续调用该app,并返回app的响应结果。

到此APIRouter干的活就结束了,后面的活由注册路由时指定的controller接管,所以继续回到controller上来。

前面已经讲到了注册路由时指定的controller对象实际是wsgi_resource.Resource(controller, FAULT_MAP)返回的对象,进入到这个方法:

def Resource(controller, faults=None, deserializers=None, serializers=None,
             action_status=None):
    default_deserializers = {'application/json': wsgi.JSONDeserializer()}
    default_serializers = {'application/json': wsgi.JSONDictSerializer()}
    format_types = {'json': 'application/json'}
    action_status = action_status or dict(create=201, delete=204)

    default_deserializers.update(deserializers or {})
    default_serializers.update(serializers or {})

    deserializers = default_deserializers
    serializers = default_serializers
    faults = faults or {}

    @webob.dec.wsgify(RequestClass=Request)
    def resource(request):
        ...略...

    setattr(resource, 'controller', controller)
    setattr(resource, 'action_status', action_status)
    return resource

返回了一个闭包:resource,所以这个闭包才是真正的controller,只不过内部包含了一个另外的同名controller对象(参数传入)。请求会被递交到resource给它处理,代码如下:

    @webob.dec.wsgify(RequestClass=Request)
    def resource(request):
        route_args = request.environ.get('wsgiorg.routing_args')
        if route_args:
            args = route_args[1].copy()
        else:
            args = {}

        # NOTE(jkoelker) by now the controller is already found, remove
        #                it from the args if it is in the matchdict
        args.pop('controller', None)
        fmt = args.pop('format', None)
        action = args.pop('action', None)
        content_type = format_types.get(fmt,
                                        request.best_match_content_type())
        language = request.best_match_language()
        deserializer = deserializers.get(content_type)
        serializer = serializers.get(content_type)

        try:
            if request.body:
                args['body'] = deserializer.deserialize(request.body)['body']

            method = getattr(controller, action)

            result = method(request=request, **args)
        except Exception as e:
            mapped_exc = api_common.convert_exception_to_http_exc(e, faults,
                                                                  language)
            if hasattr(mapped_exc, 'code') and 400 <= mapped_exc.code < 500:
                LOG.info(_LI('%(action)s failed (client error): %(exc)s'),
                         {'action': action, 'exc': mapped_exc})
            else:
                LOG.exception(
                    _LE('%(action)s failed: %(details)s'),
                    {
                        'action': action,
                        'details': utils.extract_exc_details(e),
                    }
                )
            raise mapped_exc

        status = action_status.get(action, 200)
        body = serializer.serialize(result)
        # NOTE(jkoelker) Comply with RFC2616 section 9.7
        if status == 204:
            content_type = ''
            body = None

        return webob.Response(request=request, status=status,
                              content_type=content_type,
                              body=body)

根据路由匹配时设定的环境变量,获取到对应的format, action, body等内容。调用传入的controller对象的action对应方法获取到最终的结果。action主要有index, show, create, update, delete。
传入的controller对象就开始处理实际业务,以index为例(获取资源的一个list):
Controller中的index方法如下:

    @db_api.retry_db_errors
    def index(self, request, **kwargs):
        """Returns a list of the requested entity."""
        parent_id = kwargs.get(self._parent_id_name)
        # Ensure policy engine is initialized
        policy.init()
        return self._items(request, True, parent_id)

转而调用_items方法:

    def _items(self, request, do_authz=False, parent_id=None):

        original_fields, fields_to_add = self._do_field_list(
            api_common.list_args(request, 'fields'))
        filters = api_common.get_filters(request, self._attr_info,
                                         ['fields', 'sort_key', 'sort_dir',
                                          'limit', 'marker', 'page_reverse'])
        kwargs = {'filters': filters,
                  'fields': original_fields}
        sorting_helper = self._get_sorting_helper(request)
        pagination_helper = self._get_pagination_helper(request)
        sorting_helper.update_args(kwargs)
        sorting_helper.update_fields(original_fields, fields_to_add)
        pagination_helper.update_args(kwargs)
        pagination_helper.update_fields(original_fields, fields_to_add)
        if parent_id:
            kwargs[self._parent_id_name] = parent_id
        obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])
        obj_list = obj_getter(request.context, **kwargs)
        obj_list = sorting_helper.sort(obj_list)
        obj_list = pagination_helper.paginate(obj_list)
        # Check authz
        if do_authz:

            obj_list = [obj for obj in obj_list
                        if policy.check(request.context,
                                        self._plugin_handlers[self.SHOW],
                                        obj,
                                        plugin=self._plugin,
                                        pluralized=self._collection)]

        fields_to_strip = fields_to_add or []
        if obj_list:
            fields_to_strip += self._exclude_attributes_by_policy(
                request.context, obj_list[0])
        collection = {self._collection:
                      [self._filter_attributes(obj,
                          fields_to_strip=fields_to_strip)
                       for obj in obj_list]}
        pagination_links = pagination_helper.get_links(obj_list)
        if pagination_links:
            collection[self._collection + "_links"] = pagination_links
        # Synchronize usage trackers, if needed
        resource_registry.resync_resource(
            request.context, self._resource, request.context.tenant_id)
        return collection

前面获取了一些helper对象,先不管,

obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])

上面这句是比较关键的,从self._plugin对象中获取一个指定名字的属性,根据下文猜测这应该是一个实际业务的方法名称。
self._plugin是在APIRouter中创建Controller时指定的,可以发现就是指的core plugin,self._plugin_handlers的设定如下:

        self._plugin_handlers = {
            self.LIST: 'get%s_%s' % (parent_part, self._collection),
            self.SHOW: 'get%s_%s' % (parent_part, self._resource)
        }
        for action in [self.CREATE, self.UPDATE, self.DELETE]:
            self._plugin_handlers[action] = '%s%s_%s' % (action, parent_part,
                                                         self._resource)

parent_part目前都是空的,self._resource指的就是(network, subnet, subnetpool, port),self._collection就是在前面的基础上加了个s表示复数。所以很好理解,当你想获取port的列表时,就会调用core plugin的get_ports方法,这样就走到plugin内部的处理了。

发表新评论

© 2017 Powered by Typecho
苏ICP备15035969号-3