appwrite-python
Appwrite Python SDK
Installation
pip install appwrite
Setting Up the Client
from appwrite.client import Client
from appwrite.id import ID
from appwrite.query import Query
from appwrite.services.users import Users
from appwrite.services.tables_db import TablesDB
from appwrite.services.storage import Storage
from appwrite.services.functions import Functions
from appwrite.enums.o_auth_provider import OAuthProvider
import os
client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project(os.environ['APPWRITE_PROJECT_ID'])
.set_key(os.environ['APPWRITE_API_KEY']))
Code Examples
User Management
users = Users(client)
# Create user
user = users.create(ID.unique(), 'user@example.com', None, 'password123', 'User Name')
# List users
result = users.list([Query.limit(25)])
# Get user
fetched = users.get('[USER_ID]')
# Delete user
users.delete('[USER_ID]')
Database Operations
Note: Use
TablesDB(not the deprecatedDatabasesclass) for all new code. Only useDatabasesif the existing codebase already relies on it or the user explicitly requests it.Tip: Prefer keyword arguments (e.g.,
database_id='...') over positional arguments for all SDK method calls. Only use positional style if the existing codebase already uses it or the user explicitly requests it.
tables_db = TablesDB(client)
# Create database
db = tables_db.create(ID.unique(), 'My Database')
# Create row
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
'title': 'Hello World'
})
# Query rows
results = tables_db.list_rows('[DATABASE_ID]', '[TABLE_ID]', [
Query.equal('title', 'Hello World'),
Query.limit(10)
])
# Get row
row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
# Update row
tables_db.update_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]', {
'title': 'Updated'
})
# Delete row
tables_db.delete_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
String Column Types
Note: The legacy
stringtype is deprecated. Use explicit column types for all new columns.
| Type | Max characters | Indexing | Storage |
|---|---|---|---|
varchar |
16,383 | Full index (if size ≤ 768) | Inline in row |
text |
16,383 | Prefix only | Off-page |
mediumtext |
4,194,303 | Prefix only | Off-page |
longtext |
1,073,741,823 | Prefix only | Off-page |
varcharis stored inline and counts towards the 64 KB row size limit. Prefer for short, indexed fields like names, slugs, or identifiers.text,mediumtext, andlongtextare stored off-page (only a 20-byte pointer lives in the row), so they don't consume the row size budget.sizeis not required for these types.
# Create table with explicit string column types
tables_db.create_table(
database_id='[DATABASE_ID]',
table_id=ID.unique(),
name='articles',
columns=[
{'key': 'title', 'type': 'varchar', 'size': 255, 'required': True}, # inline, fully indexable
{'key': 'summary', 'type': 'text', 'required': False}, # off-page, prefix index only
{'key': 'body', 'type': 'mediumtext', 'required': False}, # up to ~4 M chars
{'key': 'raw_data', 'type': 'longtext', 'required': False}, # up to ~1 B chars
]
)
Query Methods
# Filtering
Query.equal('field', 'value') # == (or pass list for IN)
Query.not_equal('field', 'value') # !=
Query.less_than('field', 100) # <
Query.less_than_equal('field', 100) # <=
Query.greater_than('field', 100) # >
Query.greater_than_equal('field', 100) # >=
Query.between('field', 1, 100) # 1 <= field <= 100
Query.is_null('field') # is null
Query.is_not_null('field') # is not null
Query.starts_with('field', 'prefix') # starts with
Query.ends_with('field', 'suffix') # ends with
Query.contains('field', 'sub') # contains (string or array)
Query.search('field', 'keywords') # full-text search (requires index)
# Sorting
Query.order_asc('field')
Query.order_desc('field')
# Pagination
Query.limit(25) # max rows (default 25, max 100)
Query.offset(0) # skip N rows
Query.cursor_after('[ROW_ID]') # cursor pagination (preferred)
Query.cursor_before('[ROW_ID]')
# Selection & Logic
Query.select(['field1', 'field2']) # return only specified fields
Query.or_queries([Query.equal('a', 1), Query.equal('b', 2)]) # OR
Query.and_queries([Query.greater_than('age', 18), Query.less_than('age', 65)]) # AND (default)
File Storage
from appwrite.input_file import InputFile
storage = Storage(client)
# Upload file
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'))
# List files
files = storage.list_files('[BUCKET_ID]')
# Delete file
storage.delete_file('[BUCKET_ID]', '[FILE_ID]')
InputFile Factory Methods
from appwrite.input_file import InputFile
InputFile.from_path('/path/to/file.png') # from filesystem path
InputFile.from_bytes(byte_data, 'file.png') # from bytes
InputFile.from_string('Hello world', 'hello.txt') # from string content
Teams
from appwrite.services.teams import Teams
teams = Teams(client)
# Create team
team = teams.create(ID.unique(), 'Engineering')
# List teams
team_list = teams.list()
# Create membership (invite user by email)
membership = teams.create_membership('[TEAM_ID]', roles=['editor'], email='user@example.com')
# List memberships
members = teams.list_memberships('[TEAM_ID]')
# Update membership roles
teams.update_membership('[TEAM_ID]', '[MEMBERSHIP_ID]', roles=['admin'])
# Delete team
teams.delete('[TEAM_ID]')
Role-based access: Use
Role.team('[TEAM_ID]')for all team members orRole.team('[TEAM_ID]', 'editor')for a specific team role when setting permissions.
Serverless Functions
functions = Functions(client)
# Execute function
execution = functions.create_execution('[FUNCTION_ID]', body='{"key": "value"}')
# List executions
executions = functions.list_executions('[FUNCTION_ID]')
Writing a Function Handler (Python runtime)
# src/main.py — Appwrite Function entry point
def main(context):
# context.req — request object
# .body — raw request body (string)
# .body_json — parsed JSON body (dict, or None if not JSON)
# .headers — request headers (dict)
# .method — HTTP method (GET, POST, etc.)
# .path — URL path
# .query — parsed query parameters (dict)
# .query_string — raw query string
context.log('Processing: ' + context.req.method + ' ' + context.req.path)
if context.req.method == 'GET':
return context.res.json({'message': 'Hello from Appwrite Function!'})
data = context.req.body_json or {}
if 'name' not in data:
context.error('Missing name field')
return context.res.json({'error': 'Name is required'}, 400)
# Response methods
return context.res.json({'success': True}) # JSON response
# return context.res.text('Hello') # plain text
# return context.res.empty() # 204 No Content
# return context.res.redirect('https://example.com') # 302 Redirect
# return context.res.send('data', 200, {'X-Custom': '1'}) # custom response
Server-Side Rendering (SSR) Authentication
SSR apps (Flask, Django, FastAPI, etc.) use the server SDK to handle auth. You need two clients:
- Admin client — uses an API key, creates sessions, bypasses rate limits (reusable singleton)
- Session client — uses a session cookie, acts on behalf of a user (create per-request, never share)
from appwrite.client import Client
from appwrite.services.account import Account
from flask import request, jsonify, make_response, redirect
# Admin client (reusable)
admin_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]')
.set_key(os.environ['APPWRITE_API_KEY']))
# Session client (create per-request)
session_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]'))
session = request.cookies.get('a_session_[PROJECT_ID]')
if session:
session_client.set_session(session)
Email/Password Login
@app.post('/login')
def login():
account = Account(admin_client)
session = account.create_email_password_session(
request.json['email'], request.json['password']
)
# Cookie name must be a_session_<PROJECT_ID>
resp = make_response(jsonify({'success': True}))
resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
httponly=True, secure=True, samesite='Strict',
expires=session['expire'], path='/')
return resp
Authenticated Requests
@app.get('/user')
def get_user():
session = request.cookies.get('a_session_[PROJECT_ID]')
if not session:
return jsonify({'error': 'Unauthorized'}), 401
session_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]')
.set_session(session))
account = Account(session_client)
return jsonify(account.get())
OAuth2 SSR Flow
# Step 1: Redirect to OAuth provider
@app.get('/oauth')
def oauth():
account = Account(admin_client)
redirect_url = account.create_o_auth2_token(
OAuthProvider.Github,
'https://example.com/oauth/success',
'https://example.com/oauth/failure',
)
return redirect(redirect_url)
# Step 2: Handle callback — exchange token for session
@app.get('/oauth/success')
def oauth_success():
account = Account(admin_client)
session = account.create_session(request.args['userId'], request.args['secret'])
resp = make_response(jsonify({'success': True}))
resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
httponly=True, secure=True, samesite='Strict',
expires=session['expire'], path='/')
return resp
Cookie security: Always use
httponly,secure, andsamesite='Strict'to prevent XSS. The cookie name must bea_session_<PROJECT_ID>.
Forwarding user agent: Call
session_client.set_forwarded_user_agent(request.headers.get('user-agent'))to record the end-user's browser info for debugging and security.
Error Handling
from appwrite.exception import AppwriteException
try:
row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
except AppwriteException as e:
print(e.message) # human-readable error message
print(e.code) # HTTP status code (int)
print(e.type) # Appwrite error type string (e.g. 'document_not_found')
print(e.response) # full response body (dict)
Common error codes:
| Code | Meaning |
|---|---|
401 |
Unauthorized — missing or invalid session/API key |
403 |
Forbidden — insufficient permissions for this action |
404 |
Not found — resource does not exist |
409 |
Conflict — duplicate ID or unique constraint violation |
429 |
Rate limited — too many requests, retry after backoff |
Permissions & Roles (Critical)
Appwrite uses permission strings to control access to resources. Each permission pairs an action (read, update, delete, create, or write which grants create + update + delete) with a role target. By default, no user has access unless permissions are explicitly set at the document/file level or inherited from the collection/bucket settings. Permissions are arrays of strings built with the Permission and Role helpers.
from appwrite.permission import Permission
from appwrite.role import Role
Database Row with Permissions
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
'title': 'Hello World'
}, [
Permission.read(Role.user('[USER_ID]')), # specific user can read
Permission.update(Role.user('[USER_ID]')), # specific user can update
Permission.read(Role.team('[TEAM_ID]')), # all team members can read
Permission.read(Role.any()), # anyone (including guests) can read
])
File Upload with Permissions
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'), [
Permission.read(Role.any()),
Permission.update(Role.user('[USER_ID]')),
Permission.delete(Role.user('[USER_ID]')),
])
When to set permissions: Set document/file-level permissions when you need per-resource access control. If all documents in a collection share the same rules, configure permissions at the collection/bucket level and leave document permissions empty.
Common mistakes:
- Forgetting permissions — the resource becomes inaccessible to all users (including the creator)
Role.any()withwrite/update/delete— allows any user, including unauthenticated guests, to modify or remove the resourcePermission.read(Role.any())on sensitive data — makes the resource publicly readable