Source code for heat.engine.clients.os.neutron

#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from neutronclient.common import exceptions
from neutronclient.neutron import v2_0 as neutronV20
from neutronclient.v2_0 import client as nc
from oslo_utils import uuidutils

from heat.common import exception
from heat.engine.clients import client_plugin
from heat.engine import constraints


[docs]class NeutronClientPlugin(client_plugin.ClientPlugin): exceptions_module = exceptions service_types = [NETWORK] = ['network'] def _create(self): con = self.context endpoint_type = self._get_client_option('neutron', 'endpoint_type') endpoint = self.url_for(service_type=self.NETWORK, endpoint_type=endpoint_type) args = { 'auth_url': con.auth_url, 'service_type': self.NETWORK, 'token': self.auth_token, 'endpoint_url': endpoint, 'endpoint_type': endpoint_type, 'ca_cert': self._get_client_option('neutron', 'ca_file'), 'insecure': self._get_client_option('neutron', 'insecure') } return nc.Client(**args)
[docs] def is_not_found(self, ex): if isinstance(ex, (exceptions.NotFound, exceptions.NetworkNotFoundClient, exceptions.PortNotFoundClient)): return True return (isinstance(ex, exceptions.NeutronClientException) and ex.status_code == 404)
[docs] def is_conflict(self, ex): bad_conflicts = (exceptions.OverQuotaClient,) return (isinstance(ex, exceptions.Conflict) and not isinstance(ex, bad_conflicts))
[docs] def is_over_limit(self, ex): if not isinstance(ex, exceptions.NeutronClientException): return False return ex.status_code == 413
[docs] def is_no_unique(self, ex): return isinstance(ex, exceptions.NeutronClientNoUniqueMatch)
[docs] def find_neutron_resource(self, props, key, key_type): return neutronV20.find_resourceid_by_name_or_id( self.client(), key_type, props.get(key))
def _resolve(self, props, key, id_key, key_type): if props.get(key): props[id_key] = self.find_neutron_resource( props, key, key_type) props.pop(key) return props[id_key]
[docs] def resolve_network(self, props, net_key, net_id_key): return self._resolve(props, net_key, net_id_key, 'network')
[docs] def resolve_subnet(self, props, subnet_key, subnet_id_key): return self._resolve(props, subnet_key, subnet_id_key, 'subnet')
[docs] def resolve_router(self, props, router_key, router_id_key): return self._resolve(props, router_key, router_id_key, 'router')
[docs] def resolve_port(self, props, port_key, port_id_key): return self._resolve(props, port_key, port_id_key, 'port')
[docs] def network_id_from_subnet_id(self, subnet_id): subnet_info = self.client().show_subnet(subnet_id) return subnet_info['subnet']['network_id']
[docs] def get_secgroup_uuids(self, security_groups): '''Returns a list of security group UUIDs. Args: security_groups: List of security group names or UUIDs ''' seclist = [] all_groups = None for sg in security_groups: if uuidutils.is_uuid_like(sg): seclist.append(sg) else: if not all_groups: response = self.client().list_security_groups() all_groups = response['security_groups'] same_name_groups = [g for g in all_groups if g['name'] == sg] groups = [g['id'] for g in same_name_groups] if len(groups) == 0: raise exception.PhysicalResourceNotFound(resource_id=sg) elif len(groups) == 1: seclist.append(groups[0]) else: # for admin roles, can get the other users' # securityGroups, so we should match the tenant_id with # the groups, and return the own one own_groups = [g['id'] for g in same_name_groups if g['tenant_id'] == self.context.tenant_id] if len(own_groups) == 1: seclist.append(own_groups[0]) else: raise exception.PhysicalResourceNameAmbiguity(name=sg) return seclist
[docs]class NetworkConstraint(constraints.BaseCustomConstraint): expected_exceptions = (exceptions.NeutronClientException, exception.NovaNetworkNotFound, exception.PhysicalResourceNameAmbiguity)
[docs] def validate_with_client(self, client, value): try: neutron_client = client.client('neutron') except Exception: # is not using neutron client.client_plugin('nova').get_nova_network_id(value) else: neutronV20.find_resourceid_by_name_or_id( neutron_client, 'network', value)
[docs]class PortConstraint(constraints.BaseCustomConstraint): expected_exceptions = (exceptions.NeutronClientException,)
[docs] def validate_with_client(self, client, value): neutron_client = client.client('neutron') neutronV20.find_resourceid_by_name_or_id( neutron_client, 'port', value)
[docs]class RouterConstraint(constraints.BaseCustomConstraint): expected_exceptions = (exceptions.NeutronClientException,)
[docs] def validate_with_client(self, client, value): neutron_client = client.client('neutron') neutronV20.find_resourceid_by_name_or_id( neutron_client, 'router', value)
[docs]class SubnetConstraint(constraints.BaseCustomConstraint): expected_exceptions = (exceptions.NeutronClientException,)
[docs] def validate_with_client(self, client, value): neutron_client = client.client('neutron') neutronV20.find_resourceid_by_name_or_id( neutron_client, 'subnet', value)

Project Source