import logging
import os
from mantelo import HttpException, KeycloakAdmin
from pydantic import BaseModel, Field, HttpUrl, computed_field
from pydantic_settings import BaseSettings, SettingsConfigDict
logger = logging.getLogger(__name__)
[docs]
class User(BaseModel):
"""BaseModel for informations of a keycloak user."""
username: str
email: str
temp_password: str
first_name: str
last_name: str
[docs]
class Config(BaseSettings):
"""Config model for keycloak setup script."""
model_config = SettingsConfigDict(
extra="ignore",
env_prefix="lomas_kc_setup_",
env_file=".env.lomas_kc_setup",
env_nested_delimiter="__",
case_sensitive=False,
)
keycloak_url: HttpUrl
keycloak_authentication_realm: str
keycloak_admin_client_id: str
keycloak_admin_user: str
keycloak_admin_pwd: str
lomas_realm: str = "lomas"
lomas_gateway_url: HttpUrl
lomas_gateway_client_id: str = "lomas_oauth_proxy"
lomas_gateway_client_secret: str
lomas_admin_client_id: str = "lomas_admin"
lomas_admin_client_secret: str
lomas_api_client_id: str = "lomas_api"
lomas_api_client_secret: str
# We make this a dict to be able to split it into multiple env variables.
lomas_admin_users: dict[int, User]
overwrite_realm: bool = Field(default=True)
@computed_field
def keycloak_use_tls(self) -> bool:
"""Using TLS ?"""
return self.keycloak_url.scheme == "https"
[docs]
def get_admin_session(config: Config) -> KeycloakAdmin:
"""Returns a keycloak admin session using the.
Args:
config (Config): The config to create the connection.
Returns:
KeycloakAdmin: KeycloakAdmin session.
"""
return KeycloakAdmin.from_username_password(
server_url=config.keycloak_url,
realm_name=config.keycloak_authentication_realm,
client_id=config.keycloak_admin_client_id,
username=config.keycloak_admin_user,
password=config.keycloak_admin_pwd,
authentication_realm_name=config.keycloak_authentication_realm,
)
[docs]
def create_realm(config: Config, kc_admin: KeycloakAdmin) -> None:
"""Creates the application realm if it does not already exist.
This removes any existing realms with the same name if they already exist!
This does not reset the application realm!
Args:
config (Config): Config for creating the realm.
kc_admin (KeycloakAdmin): A KeycloakAdmin session.
"""
try:
kc_admin.realms.post({"realm": config.lomas_realm, "enabled": True})
logger.info(f"Created application realm: {config.lomas_realm}")
except HttpException as e:
if e.status_code == 409 and "Conflict detected" in e.json["errorMessage"]:
logger.info("Application realm already exists.")
if config.overwrite_realm:
kc_admin.realms(config.lomas_realm).delete()
kc_admin.realms.post({"realm": config.lomas_realm, "enabled": True})
logger.info(f"Replaced existing realm: {config.lomas_realm}")
[docs]
def create_lomas_clients(config: Config, kc_admin: KeycloakAdmin) -> None:
"""Creates clients for the lomas application:
- lomas_admin
- lomas_api
Args:
config (Config): Config for creating the clients.
kc_admin (KeycloakAdmin): A KeycloakAdmin session.
"""
create_confidential_client(
kc_admin,
config.lomas_admin_client_id,
config.lomas_admin_client_secret,
{"realm-management": ["manage-users", "manage-clients"]},
)
create_confidential_client(kc_admin, config.lomas_api_client_id, config.lomas_api_client_secret)
create_gateway_client(
kc_admin, config.lomas_gateway_client_id, config.lomas_gateway_client_secret, config.lomas_gateway_url
)
[docs]
def create_lomas_admin_users(config: Config, kc_admin: KeycloakAdmin) -> None:
"""Creates standard User."""
realm_role_name = "authp/admin"
try:
kc_admin.realms(config.lomas_realm).roles.post(
{"name": realm_role_name, "description": "admin role", "attributes": {}}
)
except HttpException as e:
if e.status_code == 409:
logger.info("Realm role authp/admin already exists")
roles = kc_admin.realms(config.lomas_realm).roles.get()
try:
kc_admin.groups.post({"name": "lomas-admin"})
except HttpException as e:
if e.status_code == 409:
logger.info("Lomas Admins group already exists")
for group in kc_admin.realms(config.lomas_realm).groups.get():
match group:
case {"name": "lomas-admin", "id": gid, **_rest}:
try:
role = next(r for r in roles if r["name"] == realm_role_name)
getattr(kc_admin.realms(config.lomas_realm).groups, gid).role_mappings.realm.post(
data=[role]
)
except HttpException as e:
if e.status_code == 409:
logger.info("Lomas Admins role-mappings already exists")
case _:
pass
for user in config.lomas_admin_users.values():
try:
kc_admin.users.post(
{
"username": user.username,
"enabled": True,
"emailVerified": True,
"firstName": user.first_name,
"lastName": user.last_name,
"email": user.email,
"requiredActions": ["UPDATE_PASSWORD", "CONFIGURE_TOTP"],
"groups": ["lomas-admin"],
"credentials": [{"type": "password", "value": user.temp_password, "temporary": True}],
}
)
except HttpException as e:
if e.status_code == 409:
logger.info(f"User {user.username} group already exists")
[docs]
def create_confidential_client(
kc_admin: KeycloakAdmin, client_id: str, client_secret: str, roles: dict[str, list[str]] = {}
) -> None:
"""Creates a confidential client with an associated service account.
Allows only for the client credentials flow and assigns the roles
listed in the provided dictionary.
Only creates the account if it does not already exist.
Args:
kc_admin (KeycloakAdmin): A KeycloakAdmin session.
client_id (str): The client id to use.
client_secret (str): The client secret to use.
roles (Dict[str, List[str]]): A dictionary mapping of (realm, list of roles) pairs
to assign to the associated service account.
"""
# Idempotent creation
match kc_admin.clients.get(clientId=client_id):
case [{"id": client_id, **_rest}]:
getattr(kc_admin.clients, client_id).delete()
logger.info(f"Deleting existing client {client_id}")
# Create client
kc_admin.clients.post(
{
"clientId": client_id,
"secret": client_secret,
"name": "test_client",
"clientAuthenticatorType": "client-secret",
"standardFlowEnabled": False,
"directAccessGrantsEnabled": False,
"serviceAccountsEnabled": True,
"publicClient": False,
"protocol": "openid-connect",
"defaultClientScopes": [],
"optionalClientScopes": [],
}
)
# Fetch service account uid
lomas_admin = kc_admin.clients.get(clientId="lomas_admin")
if len(lomas_admin) == 0:
return
lomas_admin_service_account_uid = kc_admin.clients(lomas_admin[0]["id"]).service_account_user.get()["id"]
for client, roles_list in roles.items():
# Fetch realm management and manage-clients role uids
client_uid = kc_admin.clients.get(clientId=client)[0]["id"]
logger.debug(f"Setting up client {client_uid}")
# Create role config for user and client combination
roles_to_add = []
for role in roles_list:
role_uid = kc_admin.clients(client_uid).roles(role).get()["id"]
roles_to_add.append({"id": role_uid, "name": role})
logger.debug(f"Adding role: {role} (realm {client})")
kc_admin.users(lomas_admin_service_account_uid).role_mappings.clients(client_uid).post(roles_to_add)
logger.info("Created new confidential client.")
[docs]
def create_gateway_client(
kc_admin: KeycloakAdmin, client_id: str, client_secret: str, gateway_hostname: HttpUrl
) -> None:
"""Create a confidential client for the gateway.
This client will handle auth of the admin users to the various dashboards.
Args:
kc_admin (KeycloakAdmin): The KeycloakAdmin instance.
client_id (str): The client id.
client_secret (str): The client secret.
gateway_hostname (HttpUrl): The hostname (url) of the gateway.
"""
# Idempotent creation
match kc_admin.clients.get(clientId=client_id):
case [{"id": client_id, **_rest}]:
getattr(kc_admin.clients, client_id).delete()
kc_admin.clients.post(
{
"clientId": client_id,
"secret": client_secret,
"name": client_id,
"rootUrl": str(gateway_hostname).rstrip("/"),
"clientAuthenticatorType": "client-secret",
"redirectUris": ["/oauth2/callback"],
"webOrigins": ["/*"],
"standardFlowEnabled": True,
"implicitFlowEnabled": False,
"directAccessGrantsEnabled": False,
"serviceAccountsEnabled": False,
"publicClient": False,
"frontchannelLogout": True,
"protocol": "openid-connect",
"attributes": {
"realm_client": "false",
"oidc.ciba.grant.enabled": "false",
"backchannel.logout.session.required": "true",
"frontchannel.logout.session.required": "true",
"display.on.consent.screen": "false",
"oauth2.device.authorization.grant.enabled": "false",
"backchannel.logout.revoke.offline.tokens": "false",
},
"fullScopeAllowed": True,
"protocolMappers": [
{
"name": "aud-mapper",
"protocol": "openid-connect",
"protocolMapper": "oidc-audience-mapper",
"consentRequired": False,
"config": {
"included.client.audience": client_id,
"id.token.claim": "true",
"lightweight.claim": "false",
"access.token.claim": "true",
"introspection.token.claim": "true",
},
}
],
}
)
logger.info(f"Created client for lomas gateway: {client_id}.")
[docs]
def misc_realm_cleanup(realm: str, kc_admin: KeycloakAdmin) -> None:
"""Remove deprecated key Provider."""
kc_admin.realm_name = realm
for kp in kc_admin.components.get(type="org.keycloak.keys.KeyProvider"):
if kp["name"] != "rsa-generated":
getattr(kc_admin.components, kp["id"]).delete()
logging.debug(f"Removed bad provider: {kp['name']}")
[docs]
def kc_setup() -> None:
"""Lomas keycloak setup script."""
logging.basicConfig()
logger.setLevel(logging.DEBUG)
# Load config and get admin session
config = Config()
if not config.keycloak_use_tls:
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
kc_admin = get_admin_session(config)
# 1. Create realm
create_realm(config, kc_admin)
# Switch realm
kc_admin.realm_name = config.lomas_realm
# 2. Create clients
create_lomas_clients(config, kc_admin)
# 3. Create users
create_lomas_admin_users(config, kc_admin)
# 4. Misc cleanup
misc_realm_cleanup(config.lomas_realm, kc_admin)
if __name__ == "__main__":
kc_setup()