As we now have a function integrated with flask framework and exposing two APIs i.e. /health and /vault/<secret_name>. In this step, we will see who to have protected api.

If we just need to protect all APIs then we can simply use Azure Function Identity feature, however in most cases, not all api’s needs Authz as we may have health api which needs to be open. Also, in many other cases we want to verify token for custom roles based on access to certain specific api. In order to achieve this, we need to add Authz work flow in our function code itself.

AD App

  1. Let us now create an Azure Active directory App. In Azure Portal > Azure Active Directory > App registrations > New Registration
  2. App details:
    1. Name: functiondemo-ad-server-app
    2. Supported Account Types: Accounts in this organizational directory only (Default Directory only - Single tenant)
    3. Click on Register.
  3. Set Application ID URI:
    1. Copy Application (client) ID
    2. Navigate to Expose an API tab
    3. Set Application ID URI. It should be api://<<Application (client) ID>>
  4. Configure Custom Roles:
    1. Click on App Roles tab.
    2. Click on Create app role
      1. Display Name: GetSecret
      2. Allowed member types: Both (Users/Groups + Applications)
      3. Value: GetSecret
      4. Description: GetSecret
  5. Let us know onboard this app to itself for testing.
    1. Click on API Permission > Add a Permission > My APIs > Select Ad App > Select Permission GetSecret
    2. Click Add Permission
    3. At this point status will be Not granted for Default Directory. Since we are admin of this account, we can verify this on our own, if not, we need to ask admin to approve it.
    4. Select row with GetSecret and click on Grant admin consent for Default Directory

Copy required details from overview tab. These are required for writing code.

  1. Application (client) ID (eg, xxxxxx-xxxxx-xxxxx-xxxxxx-xxxxxxxx)
  2. Directory (tenant) ID (eg, xxxxx-353f-xxxxx-xxxx-xxxxx)

Integrating Code for AD Authentication Work Flow

  1. Create two folder in src i.e. authz and config

  2. Inside src/authz create a python file verify.py

  3. Inside src/configs create two python file error_codes.py and adapp.py

  4. src/configs/error_codes.py contains custom error message that we want to return.

    from box import Box
    
    
    def auth_error():
        auth_error_codes = {
            "authorization_header_missing": {
                "message": {
                    "code": "AuthorizationHeaderMissing",
                    "description": "Authorization header is missing from the request"
                },
                "status_code": 401
            },
            "invalid_auth_header_format_bearer": {
                "message": {
                    "code": "InvalidAuthorizationHeader",
                    "description": "Authorization header must start with Bearer"
                },
                "status_code": 401
            },
            "invalid_auth_header_format_token": {
                "message": {
                    "code": "AuthorizationTokenNotFound",
                    "description": "Authorization does not contain Token"
                },
                "status_code": 401
    
            },
            "invalid_auth_header_format": {
                "message": {
                    "code": "InvalidAuthorizationHeaderFormat",
                    "description": "Authorization header must be Bearer token"
                },
                "status_code": 401
    
            },
            "invalid_auth_header": {
                "message": {
                    "code": "InvalidAuthorizationHeader",
                    "description": "Unable to parse authentication token"
                },
                "status_code": 401
    
            },
            "token_expired": {
                "message": {
                    "code": "AuthorizationTokenExpired",
                    "description": "Authorization bearer token is expired."
                },
                "status_code": 401
    
            },
            "invalid_claims": {
                "message": {
                    "code": "InvalidAuthorizationClaims",
                    "description": "Failed to validate authorization claims."
                },
                "status_code": 401
    
            },
            "invalid_header": {
                "message": {
                    "code": "InvalidAuthorizationHeader",
                    "description": "Unable to find appropriate key"
                },
                "status_code": 401
    
            },
        }
    
        return auth_error_codes
    
    
    def bad_gateway():
        bad_gateway_codes = {
            "vault_error": {
                "message": {
                    "code": "AzureKeyVaultFatalError",
                    "description": "Error getting response from Azure Key Vault."
                },
                "status_code": 502
    
            }
        }
    
        return bad_gateway_codes
    
    
    def not_found():
        not_found_codes = {
            "secret_not_found": {
                "message": {
                    "code": "SecretNotFound",
                    "description": "Requested secret not found in Key Vault."
                },
                "status_code": 404
    
            }
        }
    
        return not_found_codes
    
    
    def function_errors():
        function_error_code = {
            "health_check_failed": {
                "message": {
                    "code": "ServiceHealthCheckDown",
                    "description": "Function App health Check failed.",
                },
                "status_code": 500
            },
            "unsupported_content_type": {
                "message": {
                    "code": "InvalidContentType",
                    "description": "Content-Type is not supported"
                },
                "status_code": 500
    
            }
        }
    
        return function_error_code
    
    
    errors = Box({**auth_error(), **bad_gateway(), **not_found(), **function_errors()})
    
  5. src/configs/adapp.py add constant related to Ad App such as tenantId and AppId.

    AD_SERVER_APP_ID = "xxxxx-47cd-xxxxxxx-a16e-xxxxxx"
    API_AUDIENCE = f"api://{AD_SERVER_APP_ID}"
    TENANT_ID = "xxxxxx-353f-xxxxxx-be76-xxxxxxx"
    JWT_URL = f"https://login.microsoftonline.com/{TENANT_ID}/discovery/v2.0/keys"
    ISSUER = f"https://sts.windows.net/{TENANT_ID}/"
    PATH_SCOPE = {
                "/vault": "GetSecret"
            }
    
  6. src/authz/verify.py contains logic to verify JWT token against claims. It expose a decorator @protected which when added to a flask-route will make it protected.

    from functools import wraps
    
    import jwt
    from flask import request, _request_ctx_stack
    from jwt import PyJWKClient
    
    from src.configs.error_codes import errors
    from src.configs import adapp
    import logging
    
    logger = logging.getLogger(__name__)
    
    # Error handler
    class AppError(Exception):
        # def __init__(self, error, status_code):
        def __init__(self, error):
            self.error = error.message
            self.status_code = error.status_code
    
    
    def verify_path_scope(roles: dict):
        path = request.path
        scope = adapp.PATH_SCOPE[path]
    
        if scope in roles:
            return True
        return False
    
    
    def get_token_auth_header():
        """Obtains the Access Token from the Authorization Header
        """
        auth = request.headers.get("Authorization", None)
        if not auth:
            logger.exception("Authorization header in missing from requests")
            raise AppError(errors.authorization_header_missing)
    
        parts = auth.split()
    
        if parts[0].lower() != "bearer":
            logger.exception("Authorization header does not starts with Bearer")
            raise AppError(errors.invalid_auth_header_format_bearer)
        elif len(parts) == 1:
            logger.exception("Authorization header missing token")
            raise AppError(errors.invalid_auth_header_format_token)
        elif len(parts) > 2:
            logger.exception("Authorization header is malformed")
            raise AppError(errors.invalid_auth_header_format)
        token = parts[1]
        return token
    
    
    def protected(f):
        """Determines if the Access Token is valid
        """
    
        @wraps(f)
        def decorated(*args, **kwargs):
            try:
                token = get_token_auth_header()
                jwk_client = PyJWKClient(adapp.JWT_URL)
                unverified_header = jwt.get_unverified_header(token)
    
                # Set default Signing Key from Token
                signing_key = jwk_client.get_signing_key_from_jwt(token)
    
                # Verify kid with unverified_header and get signing Key. Azure AD has multiple Keys.
                for key in jwk_client.get_signing_keys():
                    if key.key_id == unverified_header["kid"]:
                        signing_key = jwk_client.get_signing_key(key.key_id)
                        break
    
            except Exception as error:
                logger.exception(f"Error verifying token: {str(error)}")
                if isinstance(error, AppError):
                    raise error
                else:
                    raise AppError(errors.invalid_auth_header)
            if signing_key:
                try:
                    payload = jwt.decode(
                        token,
                        signing_key.key,
                        algorithms=["RS256"],
                        audience=adapp.API_AUDIENCE,
                        issuer=adapp.ISSUER,
                        verify=True
                    )
                except jwt.ExpiredSignatureError:
                    logger.exception(f"Access token has expired.")
                    raise AppError(errors.token_expired)
                except (jwt.InvalidIssuerError,
                        jwt.InvalidAudienceError,
                        jwt.InvalidIssuedAtError,
                        jwt.InvalidTokenError) as error:
                    logger.exception(f"Access token has invalid claims: {str(error)}")
                    errors.invalid_claims.message['debug'] = str(error)
                    raise AppError(errors.invalid_claims)
                except Exception as error:
                    logger.exception(f"Authorization header has issue: {str(error)}")
                    if isinstance(error, AppError):
                        raise error
                    else:
                        raise AppError(errors.invalid_auth_header)
                if 'roles' not in payload:
                    logger.exception(f"Access token missing roles.")
                    raise AppError(errors.missing_role)
    
                if not verify_path_scope(payload['roles']):
                    logger.exception(f"Access token missing required roles to access {request.path}")
                    errors.missing_path_role.message['debug'] = f"Missing required role for accessing path {request.path}"
                    raise AppError(errors.missing_path_role)
    
                _request_ctx_stack.top.current_user = payload
                # print(_request_ctx_stack.top.current_user)
                return f(*args, **kwargs)
            raise AppError(errors.invalid_header)
    
        return decorated
    
  7. Add @protected decorator to flask-route which needs to be protected. For this example, we are adding to /vault route. In src/api/__init__.py add:

    from src.authz.verify import protected
    
    @app.route("/vault", methods=['GET', 'HEAD'])
    @protected
    @cross_origin(cors)
    def get_secret():
        args = request.args
        secret = args.get("secret", None)
        if not secret:
            return jsonify(message="Secret name is missing. Please use QP secret=<secret_name>"), 400
        return jsonify(message=f"Vault Integration is currently not implemented to get value for {secret}"), 200
    
  8. To handle custom errors and convert them as flask response, we need to add an error handler. In src/api/__init__.py add:

    @app.errorhandler(AppError)
    def handle_auth_error(ex):
        response = jsonify(ex.error)
        response.status_code = ex.status_code
        return response
    
  9. Run function locally and try accessing http://localhost:7071/vault?secret=demo. It will return an error with error code AuthorizationHeaderMissing

    {
      "code": "AuthorizationHeaderMissing",
      "description": "Authorization header is missing from the request"
    }
    
  10. Publish this to github to deploy again.