neutron-server服务启动完成后,就能接收API请求并做出相应的处理了,本文就来分析一下neutron server是如何处理各API请求的。
neutron-server启动的过程可以参考:https://blog.try-except.com/technology/neutron-server-start.html
通过上述的文章,我们可知道neutron-server的api分为了两类:
1、对/
的访问(获取neutron version信息)
2、对/v2.0
的访问(业务处理接口)
每一类的请求对应的wsgi application都不一样:
1、对应/
的wsgi application是neutron.api.versions:Versions
对象
2、对应/v2.0
的wsgi application是neutron.api.v2.router:APIRouter
对象
在neutron-server启动的过程中会实例化这两个对象(根据一开始给出文章,可以知道是使用的paste的deploy.loadapp方法)。
先来看一下这两个中比较简单的neutron.api.versions:Versions
对象
class Versions(object):
@classmethod
def factory(cls, global_config, **local_config):
return cls(app=None)
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
"""Respond to a request for all Neutron API versions."""
version_objs = [
{
"id": "v2.0",
"status": "CURRENT",
},
]
if req.path != '/':
if self.app:
return req.get_response(self.app)
language = req.best_match_language()
msg = _('Unknown API version specified')
msg = oslo_i18n.translate(msg, language)
return webob.exc.HTTPNotFound(explanation=msg)
builder = versions_view.get_view_builder(req)
versions = [builder.build(version) for version in version_objs]
response = dict(versions=versions)
metadata = {}
content_type = req.best_match_content_type()
body = (wsgi.Serializer(metadata=metadata).
serialize(response, content_type))
response = webob.Response()
response.content_type = content_type
response.body = wsgi.encode_body(body)
return response
def __init__(self, app):
self.app = app
由配置文件(api-paste.conf)内容:
[app:neutronversions]
paste.app_factory = neutron.api.versions:Versions.factory
得到该类的实例是通过其指定的构造方法来创建的:
@classmethod
def factory(cls, global_config, **local_config):
return cls(app=None)
这个构造方法中,在实例化自身的时候设置了一个可选参数app。这样做的目的是:如果该类对象用作middleware的时候,可以传入app来完成构造。
我们知道wsgi application应该是一个可调用的对象,它可以是一个函数或者是一个实现了__call__
方法的实例,这里属于后者。当它处理请求时,就会调用到__call__
方法。
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
"""Respond to a request for all Neutron API versions."""
version_objs = [
{
"id": "v2.0",
"status": "CURRENT",
},
]
if req.path != '/':
if self.app:
return req.get_response(self.app)
language = req.best_match_language()
msg = _('Unknown API version specified')
msg = oslo_i18n.translate(msg, language)
return webob.exc.HTTPNotFound(explanation=msg)
builder = versions_view.get_view_builder(req)
versions = [builder.build(version) for version in version_objs]
response = dict(versions=versions)
metadata = {}
content_type = req.best_match_content_type()
body = (wsgi.Serializer(metadata=metadata).
serialize(response, content_type))
response = webob.Response()
response.content_type = content_type
response.body = wsgi.encode_body(body)
return response
装饰器webob.dec.wsgify
是为了将__call__
转换成符合WSGI规范的函数。
这个方法的逻辑还是很简单的,只是用来返回一个固定的版本信息。
下面重点分析一下neutron.api.v2.router:APIRouter
对象。
一样的先看一下对应的配置内容:
[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory
指定了factory方法:
@classmethod
def factory(cls, global_config, **local_config):
return cls(**local_config)
factory直接返回了一个对象的实例,所以下面看一下__init__
方法:
def __init__(self, **local_config):
mapper = routes_mapper.Mapper()
manager.init()
plugin = directory.get_plugin()
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)
col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
member_actions=MEMBER_ACTIONS)
def _map_resource(collection, resource, params, parent=None):
allow_bulk = cfg.CONF.allow_bulk
controller = base.create_resource(
collection, resource, plugin, params, allow_bulk=allow_bulk,
parent=parent, allow_pagination=True,
allow_sorting=True)
path_prefix = None
if parent:
path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
parent['member_name'],
collection)
mapper_kwargs = dict(controller=controller,
requirements=REQUIREMENTS,
path_prefix=path_prefix,
**col_kwargs)
return mapper.collection(collection, resource,
**mapper_kwargs)
mapper.connect('index', '/', controller=Index(RESOURCES))
for resource in RESOURCES:
_map_resource(RESOURCES[resource], resource,
attributes.RESOURCE_ATTRIBUTE_MAP.get(
RESOURCES[resource], dict()))
resource_registry.register_resource_by_name(resource)
for resource in SUB_RESOURCES:
_map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
attributes.RESOURCE_ATTRIBUTE_MAP.get(
SUB_RESOURCES[resource]['collection_name'],
dict()),
SUB_RESOURCES[resource]['parent'])
# Certain policy checks require that the extensions are loaded
# and the RESOURCE_ATTRIBUTE_MAP populated before they can be
# properly initialized. This can only be claimed with certainty
# once this point in the code has been reached. In the event
# that the policies have been initialized before this point,
# calling reset will cause the next policy check to
# re-initialize with all of the required data in place.
policy.reset()
super(APIRouter, self).__init__(mapper)
在理解上面的代码前需要先知道python的routes模块的一些用法,具体的可以自行百度。
- routes是用python重新实现的Rails routes系统,用来映射url与应用程序的系统
- controller是规则匹配后处理请求的对象,必须是可调用的对象。
- connect方法用来创建并注册一条新的路由
- collection方法用来同时创建并注册多条路由,collection_actions对应collection路由,member_actions对应member路由。
方法中一开始就创建了一个路由实例对象mapper,然后遍历了RESOURCES
字典,为每个resource调用_map_resource
方法:
def _map_resource(collection, resource, params, parent=None):
allow_bulk = cfg.CONF.allow_bulk
controller = base.create_resource(
collection, resource, plugin, params, allow_bulk=allow_bulk,
parent=parent, allow_pagination=True,
allow_sorting=True)
path_prefix = None
if parent:
path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
parent['member_name'],
collection)
mapper_kwargs = dict(controller=controller,
requirements=REQUIREMENTS,
path_prefix=path_prefix,
**col_kwargs)
return mapper.collection(collection, resource,
**mapper_kwargs)
在这个方法中创建了resource和collection对应的controller对象,然后通过mapper.collection方法完成对resource和collection路由的创建。
mapper中的每一个路由都必须指定一个controller作为WSGI app被调用。所以这里关注一下controller的创建,base.create_resource
方法如下:
def create_resource(collection, resource, plugin, params, allow_bulk=False,
member_actions=None, parent=None, allow_pagination=False,
allow_sorting=False):
controller = Controller(plugin, collection, resource, params, allow_bulk,
member_actions=member_actions, parent=parent,
allow_pagination=allow_pagination,
allow_sorting=allow_sorting)
return wsgi_resource.Resource(controller, FAULT_MAP)
这里需要特别注意,外部获得的controller其实是该方法最终返回的是wsgi_resource.Resource(controller, FAULT_MAP)方法创建的对象。这里我们停一下,回到上面的__init__
方法。在设置完资源的路由之后,继续调用了父类的__init__
方法,并且传入了routes模块的mapper对象:
def __init__(self, mapper):
self.map = mapper
self._router = routes.middleware.RoutesMiddleware(self._dispatch,
self.map)
@staticmethod
@webob.dec.wsgify(RequestClass=Request)
def _dispatch(req):
match = req.environ['wsgiorg.routing_args'][1]
if not match:
return webob.exc.HTTPNotFound()
app = match['controller']
return app
在__init__
方法中只做了两件事:将mapper对象传给了map属性,创建了一个middlerware对象(RoutesMiddleware对象)。
RoutesMiddleware的构造方法:
def __init__(self, wsgi_app, mapper, use_method_override=True,
path_info=True, singleton=True):
self.app = wsgi_app
self.mapper = mapper
...略...
第一个传入的参数是一个wsgi app,也就是self._dispatch
方法。在RoutesMiddleware的__call__
方法中会根据mapper中设定的路由,将请求匹配到对应的controller,匹配的结果会保存在环境变量中,然后调用构造时传入的WSGI app即self._dispatch
。
所以当一个HTTP请求被neutron接收,被一系列的filter(middleware,在api-paste.ini中指定)处理之后交给APIRouter对象处理,也就是调用了APIRouter的__call__
方法:
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, req):
return self._router
返回了_router
也就是一个RoutesMiddleware对象,通过它的__call__
方法路由匹配之后就调用到了_dispatch
方法,_dispatch
方法中返回了路由匹配到的controller对象,继续把请求交给这个controller对象处理。
被webob.dec.wsgify装饰的对象,返回的结果有以下三种情况:
- 返回字符串: wsgify将其作为body,构成HTTP响应返回。
- 返回Response对象: 直接作为HTTP响应返回。
- 返回一个wsgi app: wsgify会继续调用该app,并返回app的响应结果。
到此APIRouter干的活就结束了,后面的活由注册路由时指定的controller接管,所以继续回到controller上来。
前面已经讲到了注册路由时指定的controller对象实际是wsgi_resource.Resource(controller, FAULT_MAP)返回的对象,进入到这个方法:
def Resource(controller, faults=None, deserializers=None, serializers=None,
action_status=None):
default_deserializers = {'application/json': wsgi.JSONDeserializer()}
default_serializers = {'application/json': wsgi.JSONDictSerializer()}
format_types = {'json': 'application/json'}
action_status = action_status or dict(create=201, delete=204)
default_deserializers.update(deserializers or {})
default_serializers.update(serializers or {})
deserializers = default_deserializers
serializers = default_serializers
faults = faults or {}
@webob.dec.wsgify(RequestClass=Request)
def resource(request):
...略...
setattr(resource, 'controller', controller)
setattr(resource, 'action_status', action_status)
return resource
返回了一个闭包:resource
,所以这个闭包才是真正的controller,只不过内部包含了一个另外的同名controller对象(参数传入)。请求会被递交到resource
给它处理,代码如下:
@webob.dec.wsgify(RequestClass=Request)
def resource(request):
route_args = request.environ.get('wsgiorg.routing_args')
if route_args:
args = route_args[1].copy()
else:
args = {}
# NOTE(jkoelker) by now the controller is already found, remove
# it from the args if it is in the matchdict
args.pop('controller', None)
fmt = args.pop('format', None)
action = args.pop('action', None)
content_type = format_types.get(fmt,
request.best_match_content_type())
language = request.best_match_language()
deserializer = deserializers.get(content_type)
serializer = serializers.get(content_type)
try:
if request.body:
args['body'] = deserializer.deserialize(request.body)['body']
method = getattr(controller, action)
result = method(request=request, **args)
except Exception as e:
mapped_exc = api_common.convert_exception_to_http_exc(e, faults,
language)
if hasattr(mapped_exc, 'code') and 400 <= mapped_exc.code < 500:
LOG.info(_LI('%(action)s failed (client error): %(exc)s'),
{'action': action, 'exc': mapped_exc})
else:
LOG.exception(
_LE('%(action)s failed: %(details)s'),
{
'action': action,
'details': utils.extract_exc_details(e),
}
)
raise mapped_exc
status = action_status.get(action, 200)
body = serializer.serialize(result)
# NOTE(jkoelker) Comply with RFC2616 section 9.7
if status == 204:
content_type = ''
body = None
return webob.Response(request=request, status=status,
content_type=content_type,
body=body)
根据路由匹配时设定的环境变量,获取到对应的format, action, body等内容。调用传入的controller对象的action对应方法获取到最终的结果。action主要有index, show, create, update, delete。
传入的controller对象就开始处理实际业务,以index为例(获取资源的一个list):
Controller中的index方法如下:
@db_api.retry_db_errors
def index(self, request, **kwargs):
"""Returns a list of the requested entity."""
parent_id = kwargs.get(self._parent_id_name)
# Ensure policy engine is initialized
policy.init()
return self._items(request, True, parent_id)
转而调用_items
方法:
def _items(self, request, do_authz=False, parent_id=None):
original_fields, fields_to_add = self._do_field_list(
api_common.list_args(request, 'fields'))
filters = api_common.get_filters(request, self._attr_info,
['fields', 'sort_key', 'sort_dir',
'limit', 'marker', 'page_reverse'])
kwargs = {'filters': filters,
'fields': original_fields}
sorting_helper = self._get_sorting_helper(request)
pagination_helper = self._get_pagination_helper(request)
sorting_helper.update_args(kwargs)
sorting_helper.update_fields(original_fields, fields_to_add)
pagination_helper.update_args(kwargs)
pagination_helper.update_fields(original_fields, fields_to_add)
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])
obj_list = obj_getter(request.context, **kwargs)
obj_list = sorting_helper.sort(obj_list)
obj_list = pagination_helper.paginate(obj_list)
# Check authz
if do_authz:
obj_list = [obj for obj in obj_list
if policy.check(request.context,
self._plugin_handlers[self.SHOW],
obj,
plugin=self._plugin,
pluralized=self._collection)]
fields_to_strip = fields_to_add or []
if obj_list:
fields_to_strip += self._exclude_attributes_by_policy(
request.context, obj_list[0])
collection = {self._collection:
[self._filter_attributes(obj,
fields_to_strip=fields_to_strip)
for obj in obj_list]}
pagination_links = pagination_helper.get_links(obj_list)
if pagination_links:
collection[self._collection + "_links"] = pagination_links
# Synchronize usage trackers, if needed
resource_registry.resync_resource(
request.context, self._resource, request.context.tenant_id)
return collection
前面获取了一些helper对象,先不管,
obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])
上面这句是比较关键的,从self._plugin对象中获取一个指定名字的属性,根据下文猜测这应该是一个实际业务的方法名称。
self._plugin是在APIRouter中创建Controller时指定的,可以发现就是指的core plugin,self._plugin_handlers的设定如下:
self._plugin_handlers = {
self.LIST: 'get%s_%s' % (parent_part, self._collection),
self.SHOW: 'get%s_%s' % (parent_part, self._resource)
}
for action in [self.CREATE, self.UPDATE, self.DELETE]:
self._plugin_handlers[action] = '%s%s_%s' % (action, parent_part,
self._resource)
parent_part目前都是空的,self._resource指的就是(network, subnet, subnetpool, port),self._collection就是在前面的基础上加了个s表示复数。所以很好理解,当你想获取port的列表时,就会调用core plugin的get_ports方法,这样就走到plugin内部的处理了。