keystone业务流程的代码分析(一)
# 我们分析admin [composite: admin] use = egg:Pate#urlmap /v2.0 = admin_api /v3 = api_v3 / = public_version_api [pipeline:admin_api] pipeline = sizelimit url_normalize request_id build_auth_context token_auth admin_token_auth json_body ec2_extension s3_extension crud_extension admin_service # 分析/v2.0 入口处理业务 1-[filter:sizelimit] paste.filter_factory = oslo_middleware.sizelimit:RequestBodySizeLimiter.factory _opts = [ cfg.IntOpt('max_request_body_size', default=114688, # 默认为112k help='...', deprecated_opts=_oldopts) ] class RequestBodySizeLimiter(base.Middleware): # 限制进入的多个请求body大小 def __init__(self, application, conf=None): super(RequestBodySizeLimiter, self).__init__(application, conf) # 最后会调用其factory方法 class Middleware(object): # factory会调用子类的__call__方法,在调用前需要初始化 @classmethod def factory(cls, global_conf, **local_conf): conf = global_conf.copy() if global_conf else {} conf.update(local_conf) def middleware_filter(app): return cls(app, conf) return middleware_filter def __init__(self, application, conf=None): if isinstance(conf, cfg.COnfigOPts): self.conf = [] self.oslo_conf = conf else: self.conf = conf or [] if 'oslo_config_project' in self.conf: if 'oslo_config_file' in self.conf: default_config_files = [self.conf['oslo_config_fil']] else: default_config_files = None self.oslo_conf = cfg.ConfigOpts() self.oslo_conf([], project=self.conf['oslo_config_project'], default_config_files=default_config_files, validate_default_values=True) # 上面的调用 ConfigOpts实例的__call__ else: # 直接调用cfg.CONF self.oslo_conf = cfg.CONF def _conf_get(self, key, group='oslo_middleware'): if key in self.conf: self.oslo_conf.set_override(key, self.conf[key], group=group, enfore_type=True) # 是配置有效 return getattr(getattr(self.oslo_conf, group), key) @staticmethod def process_request(req): # 对每个请求进行处理 return None @staticmethod def process_response(response, request=None): return response @webob.dec.wsgif def __call__(self, req): response = self.process_request(req) if response: return response response = req.get_response(self.application) # 这个get_response(self.application)不知道是哪里的 (args, varargs, varkw, defaults) = getargspec(self.process_response) # getargsepc 是inspec库的函数,用来检测函数的参数 # ArgSpec(args=['response', 'request'], varargs=None, keywords=None, defaults=(None,)) if 'request' in args: return self.process_response(response, request=req) return self.process_response(response) # 继承的是base.Middleware self.oslo_conf.register_opts(_opts, group='oslo_middleware') # 将上面的选项注册进ConfigOpts实例中 @webob.dec.wsgif # 这个装饰器会将func转变为app def __call__(self, req): max_size = self._conf_get('max_request_body_size') if (req.content_length is not None and req.content_length > max_size): msg = _('Request is too large') raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg) if req.content_length is None and req.is_body_readable: limiter = LimitingRead(req.body_file, max_size) class LimitingReader(object): def __init__(self, data, limit): self.data = data self.limit = limit self.bytes_read = 0 def __iter__(self): for chunk in self.data: self.bytes_read += len(chunk) if self.bytes_read > self.limit: msg = _('Request too large') raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg) else: yield chunk def read(self, i=None): # mod_wsgi 与 eventlet不同,所以不能简单的提高一个read if i is None: result = self.data.read() else: result = self.data.read(i) if self.bytes_read > self.limit: # read 加上limit判断 msg = _('Request too large') raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg) return result req.body_file = limiter return self.application 1.1流程->RequestBodySizeLimiter初始化后调用父类Middleware的factory函数继续调用RequestBodySizeLimter的__call__ 2-[filter:url_normalize] paste.filter_factory = keystone.middleware:NormalizingFilter.factory class NormalizingFilter(wsgi.Middleware): class Middleware(Application): # 基础WSGI 中间件 # 初始化需要app,然后才调用factory,只会调用__call__ @classmethod def factory(cls, global_config, **local_config): # 配置文件如下 # [filter:analytics] # redis_host = 127.0.0.1 # paste.filter_factory = keystone.analytics:Analytics.factory # 相当于如下,后返回call调用 # import keystone.analytics # keystone.analytics.Analytics(app, redis_host='127.0.0.1') def _factory(app): conf = global_config.copy() conf.update(local) return cls(app, **local_config) return _factory def __init__(self, application) super(Middleware, self).__init__() self.application = application def process_requires(self, request): return None def process_response(self, request, response): return response @webob.dec.wsgify() # 初始化后调用wsgi().__call__ def __call__(self, request): try: response = self.process_request(self.application) if response: return response response = request.get_response(self.application) # 通过get_response 可以返回一个webob response object return self.process_response(request, response) except .... # 这个__call__调用下面的process_request # 中间件过滤处理url正常化 def process_request(request): # 正常化 url # 移除多余反斜杠 if (len(request.environ['PATH_INFO'])>1 and request.environ['PATH_INFO'][-1] == '/'): request.environ['PATH_INFO'] = request.environ['PATH_INFO'][:-1] # 重写path,将其置为root elif not request.environ['PATH_INFO']: request.environ['PATH_INFO'] = '/' 3-[filter:request_id] paste.filter_factory = oslo_middleware:RequestId.factory ENV_REQUEST_ID = 'openstack.request_id' HTTP_RESP_HEADER_ID = 'x-openstack-request-id' class RequestId(base.Middleware): # 中间件确保request ID # 确保分配ID对应每个API请求,将其置为request env,request ID同样被添加到API response @webob.dec.wsgify def __call__(self, req): req_id = context.generate_request_id() def generate_request_id(): return b'req-' + str(uuid.uuid4()).encode('ascii') req.environ[ENV_REQUEST_ID] = req.id response = req.get_response(self.application) if HTTP_RESP_HEADER_REQUEST_ID not in response.headers: response.headers.add(HTTP_RESP_HEADER_REQUEST_ID, req_id) return response 4-[filter:build_auth_context] paste.filter_factory = keystone.middleware:AuthContextMiddleware.factory class AuthContextMiddleware(wsgi.Middleware): # 从 request auth token 中构建 authentication context def _build_auth_context(self, request): token_id = request.headers.get(AUTH_TOKEN_HEADER).strip() if token_id == CONF.admin_token: return {} context = {'token_id': token_id} context['environment'] = request.environ try: token_ref = token_model.KeystoneToken( token_id=token_id, token_data=self.token_provider_api.validate_token(token_id) # Application 有token_provider_api ) class KeystoneToken(dict): def __init__(self, token_id, token_data): self.token_data = token_data if 'access' in token_access: super(KeystoneToken, self).__init__(**token_data['access']) elif 'token' in token_data and 'methods' in token_data['token']: super(KeystoneToken, self).__init__(**token_data['token']) else: raise exception.UnsupportedTokenVersionException() self.token_id = token_id self.short_id = cms.cms_has_token(token_id, mode=CONF.token.has_algorithm) # mode = 'md5' return hash code if self.project_scoped and self.domain_scoped: raise exception.UnexpectedError(_('Found invalid token: scoped to ' 'both project and domain')) wsgi.validate_token_bind(context, token_ref) CONTEXT_ENV = 'openstack.context' PARAMS_ENV = 'openstack.params' JSON_ENCODE_CONTENT_TYPES = set(['application/json', 'application/json-home']) def validate_token_bind(context, token_ref): bind_mode = CONF.token.enforce_token_bind # permissive if not isinstance(token_ref, token_model.KeystoneToken): raise exception.UnexpectedError(_('token reference must be a KeystoneToken type, got: %s') % type(token_ref)) bind = token_ref.bind permissive = bind_mode in ('permissive', 'strict') name = None if permissive or bind_name == 'required' else bind_mode if not bind: if permissive: # 需要绑定而且permissive不为空 return else: LOG(_LI('No bind information present in token')) raise exception.Unauthorized() if name and name not in bind: raise exception.Unauthorized() for bind_type, identifier in bind.items(): if bind_type == 'kerberos': if not (context['environment'].get('AUTH_TYPE', '').lower() == 'negotiate'): LOG.info(_LI("Kerberos credentials required and not present")) raise exception.Unauthorized() if not context['environment'].get('REMOTE_USER') == identifier: LOG.info(_LI("Kerberos credentials do not match " "those in bind")) raise exception.Unauthorized() LOG.info(_LI("Kerberos bind authentication successful")) elif bind_mode == 'permissive': LOG.debug(("Ignoring unknown bind for permissive mode: " "{%(bind_type)s: %(identifier)s}"), {'bind_type': bind_type, 'identifier': identifier}) else: LOG.info(_LI("Couldn't verify unknown bind: " "{%(bind_type)s: %(identifier)s}"), {'bind_type': bind_type, 'identifier': identifier}) raise exception.Unauthorized() return authorization.token_to_auth_context(token_ref) AUTH_CONTEXT_ENV = 'KEYSTONE_AUTH_CONTEXT' def token_to_auth_context(token): if not isinstance(token, token_model.KeystoneToken): raise exception.UnexpectedError(_('token refernce must be a KeystoneToken type, got: %s') % type(token)) auth_context = {'token': token, 'is_delegated_auth': False} try: auth_context['user_id'] = token.user_id except KeyError: LOG.warning(_LW('RBAC:Invalid user data in token')) raise exception.Unauthorized() if token.project_scoped: auth_context['project_id'] = token.project_id elif token.domain_scoped: auth_context['domain_id'] = token.domain_id auth_context['domain_name'] = token.domain_name else: LOG.debug('RBAC: proceeding without project or domain scope') if token.trust_scoped: auth_context['is_delegated_auth'] = True auth_context['trust_id'] = token.trust_id auth_context['trustor_id'] = token.trustor_user_id auth_conetxt['trustee_id'] = token.trustee_user_id else: auth_context['trust_id'] = None auth_context['trustor_id'] = None auth_context['trustee_id'] = None roles = token.role_names if roles: auth_context['roles'] = roles if token.oauth_scoped: auth_context['is_delegated_auth'] = True auth_context['consumer_id'] = token.oauth_consumer_id auth_context['access_token_id'] = token.oauth_access_token_id if token.is_federated_user: auth_context['group_ids'] = token.federation_group_ids return auth_context except exception.TokenNotFound: LOG.warning(_LW('RBAC:Invalid token')) raise exception.Unauthorized def process_request(self, request): if AUTH_TOKEN_HEADER not in request.headers: LOG.debug(('Auth token not in the request header.' 'will not build auth context')) return if authorization.AUTH_CONTEXT_ENV in request.environ: # 已经存在,返回 return auth_context = self._build_auth_context(request) request.environ[authorization.AUTH_CONTEXT_ENV] = auth_context # request.environ['KEYSTONE_AUTH_CONTEXT'] = auth_context 5-[filter:token_auth] paste.filter_factory = keystone.middleware:TokenAuthMiddleware.factory AUTH_TOKEN_HEADER = 'X-Auth-Token' CONTEXT_ENV = 'openstack.context' PARAMS_ENV = 'openstack.params' SUBJECT_TOKEN_HEADER = 'X-Subject-Token' class TokenAuthMIddleware(wsgi.Middleware): def process_request(self, request): token = request.headers.get(AUTH_TOKEN_HEADER) context = request.environ.get(CONTEXT_ENV, {}) context['token_id'] = token if SUBJECT_TOKEN_HEADER in request.headers: context['subject_token_id'] = request.headers[SUBJECT_TOKEN_HEADER] request.environ[CONTEXT_ENV] = context 6-[filter:admin_token_auth] paste.filter_factory = keystone.middleware:AdminTokenAuthMiddleware.factory class AdminAuthMiddleware(wsgi.Middleware): def process_request(self, request): token = request.headers.get(AUTH_TOKEN_HEADER) context = request.environ.get(CONTEXT_ENV, {}) context['is_admin'] = (token == CONF.admin_token) request.environ[CONTEXT_ENV] = context # 验证token == CONF.admin_token,置is_admin的布尔值 7-[filter:json_body] paste.filter_factory = keystone.middleware:JsonBodyMiddleware.factory class JsonBodyMiddleware(wsgi.Middleware): # 允许方法参数以序列化json形式通过 def process_request(self, request): params_json = request.body: # 如果为空,早早结束 if not params_json: return # 识别context_type是否为json if request.context_type not in ('application/json', ''): e = exception.ValidationError(attribute='application/json', target='Content-Type header') return wsgi.render_exception(e, request=request) params_parsed = {} try: params_parsed = jsonutils.loads(params_json) except ValueError: e = exception.ValidationError(attribute='valid JSON', target='request body') return wsgi.render_exception(e, request=request) finally: if not params_parsed: params_parsed = {} if not isinstance(params_parsed, dict): e = exception.ValidationError(attribute='valid JSON object', target='request body') return wsgi.render_exception(e, request=request) params = {} for k, v in params_parsed.items(): if k in (self, 'context'): continue if k.startswith('_'): continue params[k] = v request.environ[PARAMS_ENV] = params # 将request.body json化 ,然后以字典形式 放到 request.environ['openstack.params'] 8-[filter:ec2_extension] paste.filter_factory = keystone.contrib.ec2:Ec2Extension.factory build_resource_relation = functools.partial( json_home.build_v3_extension_resource_relation, extension_name='OS-EC2', extension_version='1.0') build_parameter_relation = functools.partial( json_home.build_v3_extension_parameter_relation.extension_name='OS-EC2', extension_version='1.0') class EC2Extension(wsgi.ExtensionRouter): class ExtensionRouter(Router): class Router(object): def __init__(self, mapper): # 为给定的routes.Mapper创建一个router # 每个在Mapper里的router都需要指定一个controller,这就是要被调用的WSGI app # action 就是controller里面的方法 self.map = mapper self._router = routes.middleware.RouterMiddleware(self._dispatch, self.map) @webob.dec.wsgify() # 下面就是调用的wsgfiy().__call__ def __call__(self, req): @staticmethod @webob.dec.wsgify() def _dispatch(req): # 将请求转发到匹配的controller # self._router调用它, 匹配request,并将信息装入req.environ match = req.environ['wsgiorg.routing_args'][1] if not match: msg = _('The resource could not be found.') return render_exception(exception.NotFound(msg), request=req, user_local=best_match_language(req)) app = match['controller'] return app return self._router def __init__(self, application, mapper=None): if mapper is None: mapper = routes.Mapper() self.application = application self.add_routes(mapper) mapper.connect('/{path_info:.*}', controller=self.application) super(ExtensionRouter, self).__init__(mapper) def add_router(self, mapper)# 需要覆写 return pass @classmethod def factory(cls, global_config, **local_config): def _factory(app): conf = global_config.copy() conf.update(local_config) return cls(app, **local_config) # 调用类初始化 return _factory def add_routes(self, mapper): ec2_controller = controllers.EC2Controller() @dependency.requires('policy_api', 'token_provider_api') class EC2Controller(Ec2ControllerCommon, controller.V2Controller): @controller.v2_deprecated def authenticate(self, context, credentials=None, ec2Credentials=None): (user_ref, tenant_ref, metadata_ref, role_ref, catalog_ref) = self._authenticate(credentials=credentials, ec2credentials=ec2credentials) # 继承EC2ControllerCommon._authenticate def _authenticate(self, credentials=None, ec2credentials=None): # credentials: ec2 签名字典 # ec2credentials : 丢弃的ec2 签名字典 if not credentials and ec2credentials: 只要ec2credentials credentials = ec2credentials if 'access' not in credentials: raise exception.Unauthorized(message='EC2 signature not supplied.') creds_ref = self._get_credentials(credentials['access']) # credentials['access'] 是ID def _get_credentials(self, credentials): # return: credentials ec2 credentials 字典 # utils 在 keystone.common ec2_credentials_id = utils.hash_access_key(credential_id) # 对credential_id进行hash creds = self.credential_api.get_credential(ec2_credential_id) self.credential_api 就是下面的类的实例 # Manager->Driver的过程 class Credential(credential.Driver): def get_credential(self, credential_id): session = sql.get_session() return self._get_credential(session, credential_id).to_dict() # self._get_credential(session, credential_id)返回的是CredentialModel def _get_credential(self, session, credential_id): ref = session.query(CredentialModel).get(credential_id) # 查询表CredentialModel if ref is None: raise exception.CredentialNotFound(credential_id=credential_id) return ref if not creds: raise exception.Unauthorized(message='EC2 access key not found.') return self._convert_v3_to_ec2_credential(creds) self.check_signature(creds_ref, credentials) tenant_ref = self.resource_api.get_project(creds_ref['tenant_id']) user_ref = self.identity_api.get_user(creds_ref['user_id']) metadata_ref = {} metadata_ref[roles] = ( self.assignment_api.get_roles_for_user_and_project(user_ref['id'], tenant_ref['id']) trust_id = creds_ref.get('trust_id') if trust_id: metadata_trf['trust_id'] = trust_id metadata_ref['trustee_user_id'] = user_ref['id'] try: self.identity_api.assert_user_enable( user_id=user_ref['id'], user=user_ref) self.resource_api.assert_domain_enabled( domain_id=user_ref['domain_id']) self.resource_api.assert_project_enabled( project_id=tenant_ref['id'], project=tenant_ref) except AssertionError as e: six.reraise(exception.Unauthorized, exception.Unauthorized(e), sys.exc_info()[2]) roles = metadata_ref.get('roles', []) if not roles: raise exception.Unauthorized(message='User not valid for tenant.') roles_ref = [self.role_api.get_role(role_id) for role_id in roles] catalog_ref = self.catalog_api.get_catalog(user_ref['id'], tenant_ref['id']) return user_ref, tenant_ref, metadata_ref, roles_ref, catalog_ref # 生效 mapper.connect( '/ec2tokens', # /v2.0/ec2tokens url是组装还是就直接 /ec2tokens ? controller=ec2_controller, action='authenticate', conditions=dict(method=['POST']) ) # crub mapper.connect( '/users/{user_id}/credentials/OS-EC2', controllers=ec2_controller, action='create_credential', conditions=dict(method=['POST'])) mapper.connect( '/users/{user_id}/credentials/OS-EC2', controllers=ec2_controller, action='get_credential', conditions=dict(method=['GET'])) mapper.connect( '/users/{user_id}/credentials/OS-EC2/{credential_id}', controllers=ec2_controller, action='get_credential', conditions=dict(method=['GET'])) mapper.connect( '/users/{user_id}/credentials/OS-EC2/{credential_id}', controllers=ec2_controller, action='delete_credential', conditions=dict(method=['DELETE'])) 9-[filter:s3_extension] paste.filter_factory = keystone.contrib.s3:S3Extension.factory 10-[filter:crud_extension] paste.filter_factory = keystone.contrib.admin_crud:CrudExtension.factory 11-[filter:admin_service] paste.app_factory = keystone.service:admin_app_factory
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~