Installation
Install the SDK using pip or your preferred package manager:Copy
pip install trusec
Requirements
- Python 3.8 or later
httpxfor HTTP requests (installed automatically)
Quick Start
Copy
from trusec import TruSec
import os
# Initialize the client
trusec = TruSec(
api_key=os.environ.get("TRUSEC_SECRET_KEY")
)
# Use the SDK
user = trusec.users.get(id="user_123")
print(user.email)
Configuration
Copy
from trusec import TruSec
trusec = TruSec(
# Required
api_key=os.environ.get("TRUSEC_SECRET_KEY"),
# Optional
base_url="https://api.trusec.io", # Custom API URL
timeout=30.0, # Request timeout (seconds)
retries=3, # Retry attempts
debug=False, # Enable debug logging
)
Async Support
The SDK supports both sync and async usage:Copy
from trusec import AsyncTruSec
import asyncio
async def main():
trusec = AsyncTruSec(
api_key=os.environ.get("TRUSEC_SECRET_KEY")
)
user = await trusec.users.get(id="user_123")
print(user.email)
await trusec.close()
asyncio.run(main())
Copy
async with AsyncTruSec(api_key="...") as trusec:
user = await trusec.users.get(id="user_123")
Usage Examples
Authentication
Copy
# Create a session
session = trusec.sessions.create(
email="[email protected]",
password="secure-password"
)
print(f"Session token: {session.token}")
# Verify a session
verification = trusec.sessions.verify(token=session.token)
if verification.valid:
print(f"User: {verification.user}")
# Revoke a session
trusec.sessions.revoke(token=session.token)
User Management
Copy
# List users
users = trusec.users.list(limit=20, role="role_admin")
for user in users.data:
print(user.email)
# Pagination
while users.pagination.has_more:
users = trusec.users.list(cursor=users.pagination.next_cursor)
for user in users.data:
print(user.email)
# Create a user
new_user = trusec.users.create(
email="[email protected]",
name="New User",
roles=["role_user"]
)
# Update a user
trusec.users.update(
id=new_user.id,
name="Updated Name"
)
# Delete a user
trusec.users.delete(id=new_user.id)
Policies
Copy
# Create a policy
policy = trusec.policies.create(
name="read-own-profile",
effect="allow",
principals=["user:${self}"],
actions=["read"],
resources=["users/${self}"]
)
# Check permissions
check = trusec.permissions.check(
principal="user:user_123",
action="read",
resource="users/user_123"
)
print(f"Allowed: {check.allowed}")
# Simulate policy evaluation
simulation = trusec.policies.simulate(
principal="user:user_123",
action="delete",
resource="documents/confidential/report.pdf",
context={
"ip": "192.168.1.100"
}
)
print(f"Would be allowed: {simulation.allowed}")
print(f"Matched policies: {simulation.matched_policies}")
Audit Logs
Copy
from datetime import datetime, timedelta
# Query logs
logs = trusec.logs.list(
events=["user.login", "user.logout"],
from_date=(datetime.now() - timedelta(days=7)).isoformat(),
limit=100
)
for log in logs.data:
print(f"{log.timestamp}: {log.event} by {log.actor.email}")
# Search logs
results = trusec.logs.search(
query="password reset failed",
from_date="2024-01-01T00:00:00Z"
)
for log in results.data:
print(f"Found: {log.event}")
# Export logs
export = trusec.logs.export(
from_date="2024-01-01T00:00:00Z",
to_date="2024-01-31T23:59:59Z",
format="csv"
)
print(f"Export ID: {export.export_id}")
Error Handling
The SDK raises typed exceptions for different failure scenarios:Copy
from trusec import TruSec
from trusec.exceptions import (
TruSecError,
AuthenticationError,
RateLimitError,
NotFoundError,
ValidationError
)
try:
user = trusec.users.get(id="invalid_id")
except AuthenticationError:
print("Invalid API key")
except RateLimitError as e:
print(f"Rate limited. Retry after: {e.retry_after}")
except NotFoundError:
print("User not found")
except ValidationError as e:
print(f"Validation error: {e.details}")
except TruSecError as e:
print(f"API error: {e.code} - {e.message}")
Type Hints
The SDK includes comprehensive type hints for better IDE support:Copy
from trusec import TruSec
from trusec.types import User, Session, Policy
trusec = TruSec(api_key="...")
# Types are inferred automatically
user: User = trusec.users.get(id="user_123")
print(user.email) # IDE provides autocomplete
# Custom metadata types
from typing import TypedDict
class CustomMetadata(TypedDict):
department: str
employee_id: int
user = trusec.users.get(id="user_123")
metadata: CustomMetadata = user.metadata
print(metadata["department"])
Testing
Use the mock client for unit tests:Copy
from trusec.testing import MockTruSec
from unittest.mock import MagicMock
def test_authenticate_user():
# Setup mock
mock_trusec = MockTruSec()
mock_trusec.sessions.verify.return_value = MagicMock(
valid=True,
user=MagicMock(id="user_123", email="[email protected]")
)
# Run test
my_service = MyService(mock_trusec)
result = my_service.authenticate_user("token")
assert result.authenticated is True
mock_trusec.sessions.verify.assert_called_once_with(token="token")
Copy
import pytest
from trusec.testing import mock_trusec
@pytest.fixture
def trusec_client():
return mock_trusec()
def test_user_creation(trusec_client):
trusec_client.users.create.return_value = MagicMock(
id="user_new",
email="[email protected]"
)
user = trusec_client.users.create(
email="[email protected]",
name="New User"
)
assert user.id == "user_new"
Framework Integrations
FastAPI
Copy
from fastapi import FastAPI, Depends, HTTPException, Header
from trusec import AsyncTruSec
app = FastAPI()
trusec = AsyncTruSec(api_key=os.environ["TRUSEC_SECRET_KEY"])
async def get_current_user(authorization: str = Header(...)):
token = authorization.replace("Bearer ", "")
session = await trusec.sessions.verify(token=token)
if not session.valid:
raise HTTPException(status_code=401, detail="Invalid token")
return session.user
@app.get("/profile")
async def get_profile(user = Depends(get_current_user)):
return {"user": user}
Django
Copy
# middleware.py
from django.http import JsonResponse
from trusec import TruSec
trusec = TruSec(api_key=settings.TRUSEC_SECRET_KEY)
class TruSecAuthMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
token = request.headers.get("Authorization", "").replace("Bearer ", "")
if token:
session = trusec.sessions.verify(token=token)
if session.valid:
request.trusec_user = session.user
return self.get_response(request)
# views.py
from django.http import JsonResponse
def profile_view(request):
if not hasattr(request, "trusec_user"):
return JsonResponse({"error": "Unauthorized"}, status=401)
return JsonResponse({"user": request.trusec_user.__dict__})
Flask
Copy
from flask import Flask, request, g, jsonify
from functools import wraps
from trusec import TruSec
app = Flask(__name__)
trusec = TruSec(api_key=os.environ["TRUSEC_SECRET_KEY"])
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get("Authorization", "").replace("Bearer ", "")
if not token:
return jsonify({"error": "Missing token"}), 401
session = trusec.sessions.verify(token=token)
if not session.valid:
return jsonify({"error": "Invalid token"}), 401
g.user = session.user
return f(*args, **kwargs)
return decorated
@app.route("/profile")
@require_auth
def profile():
return jsonify({"user": g.user.__dict__})
Logging
Enable debug logging to see request/response details:Copy
import logging
# Enable TruSec debug logging
logging.getLogger("trusec").setLevel(logging.DEBUG)
# Or configure when initializing
trusec = TruSec(
api_key="...",
debug=True # Enables debug logging
)