05: API Authentication
As we now have a function integrated with flask framework and exposing two APIs i.e. /health
and /vault/<secret_name>
. In this step, we will see who to have protected api.
Identity
feature, however in most cases, not all api’s needs Authz as we may have health
api which needs to be open. Also, in many other cases we want to verify token for custom roles based on access to certain specific api. In order to achieve this, we need to add Authz work flow in our function code itself.
AD App
- Let us now create an Azure Active directory App. In
Azure Portal
>Azure Active Directory
>App registrations
>New Registration
- App details:
- Name:
functiondemo-ad-server-app
- Supported Account Types:
Accounts in this organizational directory only (Default Directory only - Single tenant)
- Click on Register.
- Name:
- Set Application ID URI:
- Copy
Application (client) ID
- Navigate to
Expose an API
tab - Set
Application ID URI
. It should beapi://<<Application (client) ID>>
- Copy
- Configure Custom Roles:
- Click on
App Roles
tab. - Click on
Create app role
- Display Name:
GetSecret
- Allowed member types:
Both (Users/Groups + Applications)
- Value:
GetSecret
- Description:
GetSecret
- Display Name:
- Click on
- Let us know onboard this app to itself for testing.
- Click on
API Permission
>Add a Permission
>My APIs
>Select Ad App
> Select PermissionGetSecret
- Click
Add Permission
- At this point status will be
Not granted for Default Directory
. Since we are admin of this account, we can verify this on our own, if not, we need to ask admin to approve it. - Select row with
GetSecret
and click onGrant admin consent for Default Directory
- Click on
Copy required details from overview tab. These are required for writing code.
- Application (client) ID
(eg, xxxxxx-xxxxx-xxxxx-xxxxxx-xxxxxxxx)
- Directory (tenant) ID
(eg, xxxxx-353f-xxxxx-xxxx-xxxxx)
Integrating Code for AD Authentication Work Flow
-
Create two folder in src i.e.
authz
andconfig
-
Inside
src/authz
create a python fileverify.py
-
Inside
src/configs
create two python fileerror_codes.py
andadapp.py
-
src/configs/error_codes.py
contains custom error message that we want to return.from box import Box def auth_error(): auth_error_codes = { "authorization_header_missing": { "message": { "code": "AuthorizationHeaderMissing", "description": "Authorization header is missing from the request" }, "status_code": 401 }, "invalid_auth_header_format_bearer": { "message": { "code": "InvalidAuthorizationHeader", "description": "Authorization header must start with Bearer" }, "status_code": 401 }, "invalid_auth_header_format_token": { "message": { "code": "AuthorizationTokenNotFound", "description": "Authorization does not contain Token" }, "status_code": 401 }, "invalid_auth_header_format": { "message": { "code": "InvalidAuthorizationHeaderFormat", "description": "Authorization header must be Bearer token" }, "status_code": 401 }, "invalid_auth_header": { "message": { "code": "InvalidAuthorizationHeader", "description": "Unable to parse authentication token" }, "status_code": 401 }, "token_expired": { "message": { "code": "AuthorizationTokenExpired", "description": "Authorization bearer token is expired." }, "status_code": 401 }, "invalid_claims": { "message": { "code": "InvalidAuthorizationClaims", "description": "Failed to validate authorization claims." }, "status_code": 401 }, "invalid_header": { "message": { "code": "InvalidAuthorizationHeader", "description": "Unable to find appropriate key" }, "status_code": 401 }, } return auth_error_codes def bad_gateway(): bad_gateway_codes = { "vault_error": { "message": { "code": "AzureKeyVaultFatalError", "description": "Error getting response from Azure Key Vault." }, "status_code": 502 } } return bad_gateway_codes def not_found(): not_found_codes = { "secret_not_found": { "message": { "code": "SecretNotFound", "description": "Requested secret not found in Key Vault." }, "status_code": 404 } } return not_found_codes def function_errors(): function_error_code = { "health_check_failed": { "message": { "code": "ServiceHealthCheckDown", "description": "Function App health Check failed.", }, "status_code": 500 }, "unsupported_content_type": { "message": { "code": "InvalidContentType", "description": "Content-Type is not supported" }, "status_code": 500 } } return function_error_code errors = Box({**auth_error(), **bad_gateway(), **not_found(), **function_errors()})
-
src/configs/adapp.py
add constant related to Ad App such as tenantId and AppId.AD_SERVER_APP_ID = "xxxxx-47cd-xxxxxxx-a16e-xxxxxx" API_AUDIENCE = f"api://{AD_SERVER_APP_ID}" TENANT_ID = "xxxxxx-353f-xxxxxx-be76-xxxxxxx" JWT_URL = f"https://login.microsoftonline.com/{TENANT_ID}/discovery/v2.0/keys" ISSUER = f"https://sts.windows.net/{TENANT_ID}/" PATH_SCOPE = { "/vault": "GetSecret" }
-
src/authz/verify.py
contains logic to verify JWT token against claims. It expose a decorator@protected
which when added to a flask-route will make it protected.from functools import wraps import jwt from flask import request, _request_ctx_stack from jwt import PyJWKClient from src.configs.error_codes import errors from src.configs import adapp import logging logger = logging.getLogger(__name__) # Error handler class AppError(Exception): # def __init__(self, error, status_code): def __init__(self, error): self.error = error.message self.status_code = error.status_code def verify_path_scope(roles: dict): path = request.path scope = adapp.PATH_SCOPE[path] if scope in roles: return True return False def get_token_auth_header(): """Obtains the Access Token from the Authorization Header """ auth = request.headers.get("Authorization", None) if not auth: logger.exception("Authorization header in missing from requests") raise AppError(errors.authorization_header_missing) parts = auth.split() if parts[0].lower() != "bearer": logger.exception("Authorization header does not starts with Bearer") raise AppError(errors.invalid_auth_header_format_bearer) elif len(parts) == 1: logger.exception("Authorization header missing token") raise AppError(errors.invalid_auth_header_format_token) elif len(parts) > 2: logger.exception("Authorization header is malformed") raise AppError(errors.invalid_auth_header_format) token = parts[1] return token def protected(f): """Determines if the Access Token is valid """ @wraps(f) def decorated(*args, **kwargs): try: token = get_token_auth_header() jwk_client = PyJWKClient(adapp.JWT_URL) unverified_header = jwt.get_unverified_header(token) # Set default Signing Key from Token signing_key = jwk_client.get_signing_key_from_jwt(token) # Verify kid with unverified_header and get signing Key. Azure AD has multiple Keys. for key in jwk_client.get_signing_keys(): if key.key_id == unverified_header["kid"]: signing_key = jwk_client.get_signing_key(key.key_id) break except Exception as error: logger.exception(f"Error verifying token: {str(error)}") if isinstance(error, AppError): raise error else: raise AppError(errors.invalid_auth_header) if signing_key: try: payload = jwt.decode( token, signing_key.key, algorithms=["RS256"], audience=adapp.API_AUDIENCE, issuer=adapp.ISSUER, verify=True ) except jwt.ExpiredSignatureError: logger.exception(f"Access token has expired.") raise AppError(errors.token_expired) except (jwt.InvalidIssuerError, jwt.InvalidAudienceError, jwt.InvalidIssuedAtError, jwt.InvalidTokenError) as error: logger.exception(f"Access token has invalid claims: {str(error)}") errors.invalid_claims.message['debug'] = str(error) raise AppError(errors.invalid_claims) except Exception as error: logger.exception(f"Authorization header has issue: {str(error)}") if isinstance(error, AppError): raise error else: raise AppError(errors.invalid_auth_header) if 'roles' not in payload: logger.exception(f"Access token missing roles.") raise AppError(errors.missing_role) if not verify_path_scope(payload['roles']): logger.exception(f"Access token missing required roles to access {request.path}") errors.missing_path_role.message['debug'] = f"Missing required role for accessing path {request.path}" raise AppError(errors.missing_path_role) _request_ctx_stack.top.current_user = payload # print(_request_ctx_stack.top.current_user) return f(*args, **kwargs) raise AppError(errors.invalid_header) return decorated
-
Add
@protected
decorator to flask-route which needs to be protected. For this example, we are adding to/vault
route. Insrc/api/__init__.py
add:from src.authz.verify import protected
@app.route("/vault", methods=['GET', 'HEAD']) @protected @cross_origin(cors) def get_secret(): args = request.args secret = args.get("secret", None) if not secret: return jsonify(message="Secret name is missing. Please use QP secret=<secret_name>"), 400 return jsonify(message=f"Vault Integration is currently not implemented to get value for {secret}"), 200
-
To handle custom errors and convert them as flask response, we need to add an error handler. In
src/api/__init__.py
add:@app.errorhandler(AppError) def handle_auth_error(ex): response = jsonify(ex.error) response.status_code = ex.status_code return response
-
Run function locally and try accessing http://localhost:7071/vault?secret=demo. It will return an error with error code
AuthorizationHeaderMissing
{ "code": "AuthorizationHeaderMissing", "description": "Authorization header is missing from the request" }
-
Publish this to github to deploy again.