在Openstack的计算节点中都会运行一个neutron的L2agent,它与neutron server端的plugin共同来提供和管理二层网络的功能,目前比较流行的一个是OVS Neutron Agent。
下面就开始对其源码进行分析,本文不会分析所有的代码,只对其中vlan相关的内容进行解读。
常规的启动方法是启动agnet服务:neutron-openvswitch-agent
service neutron-openvswitch-agent start
根据entry_point中指定的内容
[entry_points]
console-scripts =
...
neutron-openvswitch-agent = neutron.plugins.openvswitch.agent.ovs_neutron_agent:main
...
我们可以知道agent的启动方法为neutron.plugins.openvswitch.agent.ovs_neutron_agent中的main方法:
def main(bridge_classes):
prepare_xen_compute()
ovs_capabilities.register()
validate_tunnel_config(cfg.CONF.AGENT.tunnel_types, cfg.CONF.OVS.local_ip)
try:
agent = OVSNeutronAgent(bridge_classes, cfg.CONF)
capabilities.notify_init_event(n_const.AGENT_TYPE_OVS, agent)
except (RuntimeError, ValueError) as e:
LOG.error(_LE("%s Agent terminated!"), e)
sys.exit(1)
agent.daemon_loop()
在上面这个main方法中,先判断了当前主机的虚拟技术是否为xen,如果是的话将执行一些额外的配置,然后验证了一下tunnel的配置。
再往下是实例化了一个OVSNeutronAgent的对象,并执行了该对象的daemon_loop方法。
ovs agent的daemon_loop方法如下:
def daemon_loop(self):
# Start everything.
LOG.info(_LI("Agent initialized successfully, now running... "))
signal.signal(signal.SIGTERM, self._handle_sigterm)
if hasattr(signal, 'SIGHUP'):
signal.signal(signal.SIGHUP, self._handle_sighup)
with polling.get_polling_manager(
self.minimize_polling,
self.ovsdb_monitor_respawn_interval) as pm:
self.rpc_loop(polling_manager=pm)
在这个方法中先是对信号SIGTERM和SIGHUP进行了捕获,指定了各自对应的处理。然后执行rpc_loop()
方法。在该方法中间隔一定的时间循环扫描本地的ovs状态和端口信息,根据扫描的结果执行相应的操作。捕获到SIGTERM和SIGHUP信号都会停止rpc_loop()
方法。rpc_loop()
方法是agent端几乎所有操作开始的地方(也可以由plugin通知操作),我们先简单介绍一下上面讲到的agent实例化过程中的初始化操作,然后再详细讲述rpc_loop()
方法中进行了哪些操作。
在ovs agent的实现中,使用local vlan id来隔离二层网络。意思就是说:每一个网络,ovs agent都会为它分配一个local vlan id,在同一个网络中的每一个port也就会打上一个相同的tag值,这个tag值与该网络的local vlan id相同。tag相同的port在二层网络就能直接通信,tag不同的port就会被隔离开来,不能直接通信(上述内容只在同一个host上有效,如果两个host上有同一个网络,那么这两个host对该网络分配的local vlan id可能不同)。在ovs agent的init方法中,可以看到agent初始化了有效的local vlan id范围:1 ~ 4094
self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG,
p_const.MAX_VLAN_TAG))
ovs中有一个特殊的网桥br-int,每一个虚拟机的网卡都会连接到这个网桥设备上,在agent初始化过程中会检查该网桥是否存在,如果没有会将其创建出来。
def setup_integration_br(self):
'''Setup the integration bridge.
'''
# Ensure the integration bridge is created.
# ovs_lib.OVSBridge.create() will run
# ovs-vsctl -- --may-exist add-br BRIDGE_NAME
# which does nothing if bridge already exists.
self.int_br.create()
self.int_br.set_secure_mode()
self.int_br.setup_controllers(self.conf)
if self.conf.AGENT.drop_flows_on_start:
# Delete the patch port between br-int and br-tun if we're deleting
# the flows on br-int, so that traffic doesn't get flooded over
# while flows are missing.
self.int_br.delete_port(self.conf.OVS.int_peer_patch_port)
self.int_br.delete_flows()
self.int_br.setup_default_table()
服务器上的所有物理网络都需要映射到agent上的网桥上,我们通过配置文件/etc/neutron/plugins/ml2/openvswitch_agent.ini中的bridge_mapping配置项来指定物理网络与网桥的一个映射关系,agent在初始化的时候会读取这个配置,创建出所需要的网桥,并将他们与br-int相连。
def __init__(...):
...
self.bridge_mappings = self._parse_bridge_mappings(
ovs_conf.bridge_mappings)
self.setup_physical_bridges(self.bridge_mappings)
...
上面说到了同一个host中,agent为每一个网络分配了一个local vlan id,正常而言这个local vlan id应该是固定不变的,重启也不例外(如果这个网络中不再有port存在,此时当创建新的port时,local vlan id会重新分配,可能与原local vlan id不同)。所以agent在初始化时会去尝试恢复网络和local vlan id的映射关系。我们知道OVS agent会将每个port保存在它自己的数据库中,在ovs的数据库中,对每个port保存的数据如下面所示(示例显示了三个port对应的数据)
# ovsdb-client monitor Open_vSwitch Port --detach
row action bond_active_slave bond_downdelay bond_fake_iface bond_mode bond_updelay external_ids fake_bridge interfaces lacp mac name other_config qos rstp_statistics rstp_status statistics status tag trunks vlan_mode _version
------------------------------------ ------- ----------------- -------------- --------------- --------- ------------ ------------ ----------- -------------------------------------- ---- --- ---------------- ------------------------------------------------------------------------------------------------------------------------------ --- --------------- ----------- ---------- ------ ---- ------ --------- ------------------------------------
736ba298-242f-428e-a81d-709242e292fd initial [] 0 false [] 0 {} false [316f69c2-92a8-4980-9c4f-df8e43af79e0] [] [] "qvo886c063b-8a" {net_uuid="50c5b974-7def-49ec-ac2c-c575eb4b57fc", network_type=vxlan, physical_network=None, segmentation_id="5058", tag="3"} [] {} {} {} {} 3 [] [] f3b335ae-f4e9-4351-848f-3e660e29dce2
85fb7c5a-9547-4cca-92ad-7ad3ea424364 initial [] 0 false [] 0 {} false [59dc8758-248b-476a-89de-8d5d15f373fa] [] [] "qvof7c0a075-64" {net_uuid="add06796-f124-45e1-8e86-685e965093f3", network_type=flat, physical_network=provider, tag="26"} [] {} {} {} {} 26 [] [] cf159b90-6003-44e9-a469-a9be4859a4bf
b77150cb-2e76-4f67-bb24-493d0adeee90 initial [] 0 false [] 0 {} false [ea4de579-04f5-4dac-b7d6-b9be951f8b90] [] [] "qvo2ea83900-e2" {net_uuid="cf31a5e8-eede-41e7-bdb8-7ac4bbc02460", network_type=vxlan, physical_network=None, segmentation_id="5039", tag="2"} [] {} {} {} {} 2 [] [] 76bd172f-4687-4af9-8a9a-caa309fe7991
在这些数据中,other_config字段保存的内容就是该port所在网络的信息,其中的tag值就是local vlan id的值。agent就通过遍历这些数据,恢复local vlan id与网络的映射关系并保存到内存中,为其他操作提供基础。相关的代码如下:
def _restore_local_vlan_map(self):
self._local_vlan_hints = {}
# skip INVALID and UNASSIGNED to match scan_ports behavior
ofport_filter = (ovs_lib.INVALID_OFPORT, ovs_lib.UNASSIGNED_OFPORT)
cur_ports = self.int_br.get_vif_ports(ofport_filter)
port_names = [p.port_name for p in cur_ports]
port_info = self.int_br.get_ports_attributes(
"Port", columns=["name", "other_config", "tag"], ports=port_names)
by_name = {x['name']: x for x in port_info}
for port in cur_ports:
# if a port was deleted between get_vif_ports and
# get_ports_attributes, we
# will get a KeyError
try:
local_vlan_map = by_name[port.port_name]['other_config']
local_vlan = by_name[port.port_name]['tag']
except KeyError:
continue
if not local_vlan:
continue
net_uuid = local_vlan_map.get('net_uuid')
if (net_uuid and net_uuid not in self._local_vlan_hints
and local_vlan != constants.DEAD_VLAN_TAG):
self.available_local_vlans.remove(local_vlan)
self._local_vlan_hints[local_vlan_map['net_uuid']] = \
local_vlan
agent与plugin之间是通过RPC进行通信的,agent的初始化过程中自然也少不了对rpc的配置
def setup_rpc(self):
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
# RPC network init
self.context = context.get_admin_context_without_session()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.PORT, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE],
[topics.NETWORK, topics.UPDATE]]
if self.l2_pop:
consumers.append([topics.L2POPULATION, topics.UPDATE])
self.connection = agent_rpc.create_consumers([self],
topics.AGENT,
consumers,
start_listening=False)
在第1段的内容中讲到了rpc_loop()
下面就详细分析下这个方法中是如何进行操作的。
rpc_loop内部是一个while循环操作,循环条件就是上面提到的那两个信号:SIGTERM、SIGHUP。只要捕获到这两个信号中的任意一个,循环操作就终止。
def rpc_loop(...略...):
# ...略...
while self._check_and_handle_signal():
# ...略...
def _check_and_handle_signal(self):
if self.catch_sigterm:
LOG.info(_LI("Agent caught SIGTERM, quitting daemon loop."))
self.run_daemon_loop = False
self.catch_sigterm = False
if self.catch_sighup:
LOG.info(_LI("Agent caught SIGHUP, resetting."))
self.conf.reload_config_files()
config.setup_logging()
LOG.debug('Full set of CONF:')
self.conf.log_opt_values(LOG, logging.DEBUG)
self.catch_sighup = False
return self.run_daemon_loop
在循环操作刚开始的时候,agnet会去检查ovs的状态:
如果检测到OVS DEAD:睡眠一段时间,开始下一次循环,代码参考下面:
def rpc_loop(...略...):
# ...略...
while self._check_and_handle_signal():
...
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
self.setup_physical_bridges(self.bridge_mappings)
if self.enable_tunneling:
self._reset_tunnel_ofports()
self.setup_tunnel_br()
self.setup_tunnel_br_flows()
tunnel_sync = True
if self.enable_distributed_routing:
self.dvr_agent.reset_ovs_parameters(self.int_br,
self.tun_br,
self.patch_int_ofport,
self.patch_tun_ofport)
self.dvr_agent.reset_dvr_parameters()
self.dvr_agent.setup_dvr_flows()
# notify that OVS has restarted
registry.notify(
callback_resources.AGENT,
callback_events.OVS_RESTARTED,
self)
# restart the polling manager so that it will signal as added
# all the current ports
# REVISIT (rossella_s) Define a method "reset" in
# BasePollingManager that will be implemented by AlwaysPoll as
# no action and by InterfacePollingMinimizer as start/stop
if isinstance(
polling_manager, polling.InterfacePollingMinimizer):
polling_manager.stop()
polling_manager.start()
elif ovs_status == constants.OVS_DEAD:
# Agent doesn't apply any operations when ovs is dead, to
# prevent unexpected failure or crash. Sleep and continue
# loop in which ovs status will be checked periodically.
port_stats = self.get_port_stats({}, {})
self.loop_count_and_wait(start, port_stats)
continue
# ...略...
ovs agent会在内存中保存上一次查询出来的所有端口(如果是第一次查询,则上一次查询出来的端口设为空)。然后与本次查询出来的端口进行比较,分为几大类:
agent再根据这几类port分别做对应的处理。
调用流程:
rpc_loop ---> process_port_info ---> scan_ports ---> _get_port_info
def _get_port_info(self, registered_ports, cur_ports,
readd_registered_ports):
port_info = {'current': cur_ports}
# FIXME(salv-orlando): It's not really necessary to return early
# if nothing has changed.
if not readd_registered_ports and cur_ports == registered_ports:
return port_info
if readd_registered_ports:
port_info['added'] = cur_ports
else:
port_info['added'] = cur_ports - registered_ports
# Update port_info with ports not found on the integration bridge
port_info['removed'] = registered_ports - cur_ports
return port_info
ovs agent对象中有两个属性self.deleted_ports
和self.updated_ports
这两个属性都是set对象,在agent接收到plugin发来的delete port的RPC请求,则会在self.deleted_ports
中加入对应port的port id。同样的,当agent接收到plugin发来的update port的RPC请求,也会在self.updated_ports
中加入对应port的port id。
def port_update(self, context, **kwargs):
port = kwargs.get('port')
# Put the port identifier in the updated_ports set.
# Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests
self.updated_ports.add(port['id'])
LOG.debug("port_update message processed for port %s", port['id'])
def port_delete(self, context, **kwargs):
port_id = kwargs.get('port_id')
self.deleted_ports.add(port_id)
self.updated_ports.discard(port_id)
LOG.debug("port_delete message processed for port %s", port_id)
端口分类完成后,首先会处理self.deleted_ports
中记录的ports。处理之前,会从中排除掉removed分类中的端口,因为removed分类中的端口已经被删除了。处理流程如下所示:
def process_deleted_ports(self, port_info):
# don't try to process removed ports as deleted ports since
# they are already gone
if 'removed' in port_info:
self.deleted_ports -= port_info['removed']
deleted_ports = list(self.deleted_ports)
while self.deleted_ports:
port_id = self.deleted_ports.pop()
port = self.int_br.get_vif_port_by_id(port_id)
self._clean_network_ports(port_id)
self.ext_manager.delete_port(self.context,
{"vif_port": port,
"port_id": port_id})
# move to dead VLAN so deleted ports no
# longer have access to the network
if port:
# don't log errors since there is a chance someone will be
# removing the port from the bridge at the same time
self.port_dead(port, log_errors=False)
self.port_unbound(port_id)
# Flush firewall rules after ports are put on dead VLAN to be
# more secure
self.sg_agent.remove_devices_filter(deleted_ports)
其中比较重要的是port_dead和port_unbound两个方法。
port_dead方法中做了两件事情:
def port_dead(self, port, log_errors=True):
'''Once a port has no binding, put it on the "dead vlan".
:param port: an ovs_lib.VifPort object.
'''
# Don't kill a port if it's already dead
cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag",
log_errors=log_errors)
if cur_tag and cur_tag != constants.DEAD_VLAN_TAG:
self.int_br.set_db_attribute("Port", port.port_name, "tag",
constants.DEAD_VLAN_TAG,
log_errors=log_errors)
self.int_br.drop_port(in_port=port.ofport)
port_unbound方法中做的事情稍微多一点,先是取得端口对应的network id并且根据network id获取到对应的LocalVLANMapping对象。agent在这个对象中保存了该网络对应的vlan id、网络类型、包含的端口等信息。删除LocalVLANMapping对象中包含的本端口对象,如果本端口是LocalVLANMapping对象中包含的最后一个端口,则还回收该网络对应的local vlan。
def port_unbound(self, vif_id, net_uuid=None):
'''Unbind port.
Removes corresponding local vlan mapping object if this is its last
VIF.
:param vif_id: the id of the vif
:param net_uuid: the net_uuid this port is associated with.
'''
try:
net_uuid = net_uuid or self.vlan_manager.get_net_uuid(vif_id)
except vlanmanager.VifIdNotFound:
LOG.info(
_LI('port_unbound(): net_uuid %s not managed by VLAN manager'),
net_uuid)
return
lvm = self.vlan_manager.get(net_uuid)
if vif_id in lvm.vif_ports:
vif_port = lvm.vif_ports[vif_id]
self.dvr_agent.unbind_port_from_dvr(vif_port, lvm)
lvm.vif_ports.pop(vif_id, None)
if not lvm.vif_ports:
self.reclaim_local_vlan(net_uuid)
处理完需要删除的端口之后,agent接着会处理之前查询并分类好的各个端口(added,removed,updated),对应的方法:process_network_ports
。在这个方法中将这三类port分为两种情况来处理,第一种情况是added和updated的端口,并从中筛选出需要bind的端口对其执行bind操作,第二种情况是removed的端口。
下面先讲一下第一种情况,主要的处理在方法:treat_devices_added_or_updated中,刚开始会向plugin端请求这些端口的detail情报。
devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices(
self.context,
devices,
self.agent_id,
self.conf.host))
然后遍历所有的端口,通过方法treat_vif_port
判断该端口是否需要bind,在这个方法中如果admin_state_up为true的情况下调用到了一个重要方法:port_bound
这个方法与前面提到的port_unbound
方法相对应。如果admin_state_up为false,则会调用前面提到的port_dead方法:drop该端口的所有数据包。port_bound
方法中:如果端口所在的network没有对应的local vlan资源,则首先对该network分配loval vlan。然后从ovs db中获取port对应的other_config字段,并更新对应的网络信息(net_uuid, network_type, physical_network, segmentation_id)到other_config字段中。如果获取other_config失败,则将该port认为不需要bind。
def port_bound(self, port, net_uuid,
network_type, physical_network,
segmentation_id, fixed_ips, device_owner,
ovs_restarted):
'''Bind port to net_uuid/lsw_id and install flow for inbound traffic
to vm.
:param port: an ovs_lib.VifPort object.
:param net_uuid: the net_uuid this port is to be associated with.
:param network_type: the network type ('gre', 'vlan', 'flat', 'local')
:param physical_network: the physical network for 'vlan' or 'flat'
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
:param fixed_ips: the ip addresses assigned to this port
:param device_owner: the string indicative of owner of this port
:param ovs_restarted: indicates if this is called for an OVS restart.
'''
if net_uuid not in self.vlan_manager or ovs_restarted:
self.provision_local_vlan(net_uuid, network_type,
physical_network, segmentation_id)
lvm = self.vlan_manager.get(net_uuid)
lvm.vif_ports[port.vif_id] = port
self.dvr_agent.bind_port_to_dvr(port, lvm,
fixed_ips,
device_owner)
port_other_config = self.int_br.db_get_val("Port", port.port_name,
"other_config")
if port_other_config is None:
if port.vif_id in self.deleted_ports:
LOG.debug("Port %s deleted concurrently", port.vif_id)
elif port.vif_id in self.updated_ports:
LOG.error(_LE("Expected port %s not found"), port.vif_id)
else:
LOG.debug("Unable to get config for port %s", port.vif_id)
return False
vlan_mapping = {'net_uuid': net_uuid,
'network_type': network_type,
'physical_network': str(physical_network)}
if segmentation_id is not None:
vlan_mapping['segmentation_id'] = str(segmentation_id)
port_other_config.update(vlan_mapping)
self.int_br.set_db_attribute("Port", port.port_name, "other_config",
port_other_config)
return True
agent筛选出需要bind的端口后会对这些需要bind的端口执行_add_port_tag_info
和_bind_devices
两个操作。
方法_add_port_tag_info
会把port对应网络的vlan id保存到db中的other_config字段中。加上前面的port_bound
方法,other_config字段共保存了这些值:net_uuid, network_type, physical_network, segmentation_id, tag。
def _add_port_tag_info(self, need_binding_ports):
port_names = [p['vif_port'].port_name for p in need_binding_ports]
port_info = self.int_br.get_ports_attributes(
"Port", columns=["name", "tag", "other_config"],
ports=port_names, if_exists=True)
info_by_port = {
x['name']: {
'tag': x['tag'],
'other_config': x['other_config'] or {}
}
for x in port_info
}
for port_detail in need_binding_ports:
try:
lvm = self.vlan_manager.get(port_detail['network_id'])
except vlanmanager.MappingNotFound:
continue
port = port_detail['vif_port']
try:
cur_info = info_by_port[port.port_name]
except KeyError:
continue
other_config = cur_info['other_config']
if (cur_info['tag'] != lvm.vlan or
other_config.get('tag') != lvm.vlan):
other_config['tag'] = str(lvm.vlan)
self.int_br.set_db_attribute(
"Port", port.port_name, "other_config", other_config)
方法_bind_devices
为port设置tag,tag的值就是端口所在网络的local vlan id。再根据admin_state_up判断端口的状态,并整理为device_up和device_down两类。最后通知plugin端更新端口的信息。
def _bind_devices(self, need_binding_ports):
devices_up = []
devices_down = []
failed_devices = []
port_names = [p['vif_port'].port_name for p in need_binding_ports]
port_info = self.int_br.get_ports_attributes(
"Port", columns=["name", "tag"], ports=port_names, if_exists=True)
tags_by_name = {x['name']: x['tag'] for x in port_info}
for port_detail in need_binding_ports:
try:
lvm = self.vlan_manager.get(port_detail['network_id'])
except vlanmanager.MappingNotFound:
# network for port was deleted. skip this port since it
# will need to be handled as a DEAD port in the next scan
continue
port = port_detail['vif_port']
device = port_detail['device']
# Do not bind a port if it's already bound
cur_tag = tags_by_name.get(port.port_name)
if cur_tag is None:
LOG.debug("Port %s was deleted concurrently, skipping it",
port.port_name)
continue
# Uninitialized port has tag set to []
if cur_tag and cur_tag != lvm.vlan:
self.int_br.delete_flows(in_port=port.ofport)
if self.prevent_arp_spoofing:
self.setup_arp_spoofing_protection(self.int_br,
port, port_detail)
if cur_tag != lvm.vlan:
self.int_br.set_db_attribute(
"Port", port.port_name, "tag", lvm.vlan)
# update plugin about port status
# FIXME(salv-orlando): Failures while updating device status
# must be handled appropriately. Otherwise this might prevent
# neutron server from sending network-vif-* events to the nova
# API server, thus possibly preventing instance spawn.
if port_detail.get('admin_state_up'):
LOG.debug("Setting status for %s to UP", device)
devices_up.append(device)
else:
LOG.debug("Setting status for %s to DOWN", device)
devices_down.append(device)
if devices_up or devices_down:
devices_set = self.plugin_rpc.update_device_list(
self.context, devices_up, devices_down, self.agent_id,
self.conf.host)
failed_devices = (devices_set.get('failed_devices_up') +
devices_set.get('failed_devices_down'))
if failed_devices:
LOG.error(_LE("Configuration for devices %s failed!"),
failed_devices)
LOG.info(_LI("Configuration for devices up %(up)s and devices "
"down %(down)s completed."),
{'up': devices_up, 'down': devices_down})
return set(failed_devices)
接下来讲第二种情况,主要调用了treat_devices_removed
这个方法:
def treat_devices_removed(self, devices):
self.sg_agent.remove_devices_filter(devices)
LOG.info(_LI("Ports %s removed"), devices)
devices_down = self.plugin_rpc.update_device_list(self.context,
[],
devices,
self.agent_id,
self.conf.host)
failed_devices = set(devices_down.get('failed_devices_down'))
LOG.debug("Port removal failed for %s", failed_devices)
for device in devices:
self.ext_manager.delete_port(self.context, {'port_id': device})
self.port_unbound(device)
return failed_devices
在这个方法中先通知plugin端更新port为device down的状态,然后调用了上面讲到的port_unbound
方法。