Skip to main content

Accept specific tenants only

If your multi-tenant application only should accept a few tenants, you'll have to verify issuers, or the iss field in the JWT.

We'll take the last code snippet from FastAPI configuration and change a few lines of code to make this happen:

import uvicorn
from fastapi import FastAPI, Security
from fastapi.middleware.cors import CORSMiddleware
from pydantic import AnyHttpUrl
from pydantic_settings import BaseSettings, SettingsConfigDict
from fastapi_azure_auth import MultiTenantAzureAuthorizationCodeBearer
from fastapi_azure_auth.exceptions import InvalidAuth


class Settings(BaseSettings):
BACKEND_CORS_ORIGINS: list[str | AnyHttpUrl] = ['http://localhost:8000']
OPENAPI_CLIENT_ID: str = ""
APP_CLIENT_ID: str = ""

model_config = SettingsConfigDict(
env_file='.env',
env_file_encoding='utf-8',
case_sensitive=True
)

settings = Settings()

app = FastAPI(
swagger_ui_oauth2_redirect_url='/oauth2-redirect',
swagger_ui_init_oauth={
'usePkceWithAuthorizationCodeGrant': True,
'clientId': settings.OPENAPI_CLIENT_ID,
},
)

if settings.BACKEND_CORS_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)


async def check_if_valid_tenant(tid: str) -> str:
tid_to_iss_mapping = {
'9b5ff18e-53c0-45a2-8bc2-9c0c8f60b2c6': 'https://login.microsoftonline.com/9b5ff18e-53c0-45a2-8bc2-9c0c8f60b2c6/v2.0'
}
try:
return tid_to_iss_mapping[tid]
except KeyError:
raise InvalidAuth('Tenant not allowed')

azure_scheme = MultiTenantAzureAuthorizationCodeBearer(
app_client_id=settings.APP_CLIENT_ID,
scopes={
f'api://{settings.APP_CLIENT_ID}/user_impersonation': 'user_impersonation',
},
validate_iss=True,
iss_callable=check_if_valid_tenant
)


@app.on_event('startup')
async def load_config() -> None:
"""
Load OpenID config on startup.
"""
await azure_scheme.openid_config.load_config()


@app.get("/", dependencies=[Security(azure_scheme)])
async def root():
return {"message": "Hello World"}


if __name__ == '__main__':
uvicorn.run('main:app', host='localhost', port=8000, reload=True)

We're first creating an async function, which takes a tid as an argument, and returns the tenant ID's iss if it's a valid tenant. If it's not a valid tenant, it has to raise an InvalidAuth() exception.

More sophisticated callable

If you want to cache these results in memory, you can do so by creating a more sophisticated callable:

class IssuerFetcher:
def __init__(self) -> None:
"""
Example class for multi tenant apps, that caches issuers for an hour
"""
self.tid_to_iss: dict[str, str] = {}
self._config_timestamp: Optional[datetime] = None

async def __call__(self, tid: str) -> str:
"""
Check if memory cache needs to be updated or not, and then returns an issuer for a given tenant
:raises InvalidAuth when it's not a valid tenant
"""
refresh_time = datetime.now() - timedelta(hours=1)
if not self._config_timestamp or self._config_timestamp < refresh_time:
self._config_timestamp = datetime.now()
# logic to find your allowed tenants and it's issuers here
# (This example cache in memory for 1 hour)
self.tid_to_iss = {
'intility_tenant': 'intility_tenant',
}
try:
return self.tid_to_iss[tid]
except Exception as error:
log.exception('`iss` not found for `tid` %s. Error %s', tid, error)
raise InvalidAuth('You must be an Intility customer to access this resource')


issuer_fetcher = IssuerFetcher()

azure_scheme = MultiTenantAzureAuthorizationCodeBearer(
...
validate_iss=True,
iss_callable=issuer_fetcher
)