DataSource
The DataSource class provides a modern, declarative API for configuring and using the ORM. It bundles driver configuration, entity registration, and connection management into a single interface.
Prerequisites
What You’ll Learn
- How
DataSourceOptionsshape runtime behavior - How initialization and default connection selection work
- How to choose between generated helpers and manual datasource composition
Step Outcome
By the end of this page, you should be able to:
- Build a
DataSourcefrom generated options or manual options - Initialize and dispose connections safely
- Route queries to default or named connections intentionally
Minimal Lifecycle (Always)
- Construct
DataSourcewith options. - Call
await ds.init()once. - Run query/repository operations.
- Call
await ds.dispose()on shutdown/tests.
Overview
Future<void> dataSourceOverview() async {
final ds = DataSource(
DataSourceOptions(
driver: SqliteDriverAdapter.file('database.sqlite'),
entities: [UserOrmDefinition.definition, PostOrmDefinition.definition],
),
);
await ds.init();
// Use the ORM
final users = await ds.query<$User>().get();
await ds.dispose();
}
DataSourceOptions
| Option | Type | Default | Description |
|---|---|---|---|
driver | DriverAdapter | required | The database driver adapter |
entities | List<ModelDefinition> | [] | Models to register. Provide this or registry. |
registry | ModelRegistry? | null | Generated registry (includes type aliases). If provided and entities is empty, models are taken from the registry. |
name | String | 'default' | Logical name for the connection |
database | String? | null | Database/catalog identifier for observability |
tablePrefix | String | '' | Prefix applied to unqualified table names (queries, joins, pivots, and schema operations). Schema-qualified names are not prefixed. |
defaultSchema | String? | null | Default schema for ad-hoc queries |
codecs | Map<String, ValueCodec> | {} | Custom value codecs to register |
logging | bool | false | Enable query logging |
logger | contextual.Logger? | null | Optional contextual logger for default query logging (used only when logging is true) |
Example Configuration (with generated registry)
DataSourceOptions createOptions(DriverAdapter driver) {
final options = DataSourceOptions(
driver: driver,
entities: [UserOrmDefinition.definition, PostOrmDefinition.definition],
name: 'primary',
database: 'myapp',
tablePrefix: 'app_',
defaultSchema: 'public',
logging: true,
);
return options;
}
Initialization
Always call init() before using the data source:
Future<void> initializeDataSource(DataSource ds) async {
await ds.init();
// init() is idempotent - calling multiple times is safe
}
The init() method:
- Is idempotent—calling it multiple times has no effect
- Automatically registers the DataSource with
ConnectionManager - Automatically sets it as default if it's the first DataSource initialized
If you use static model helpers (Users.query()), make sure a default datasource has been initialized or explicitly set.
Using Static Model Helpers
Once initialized, the first DataSource automatically becomes the default. This enables using generated model helper classes (like Users / Posts) without threading a DataSource everywhere.
Future<void> staticHelpersExample() async {
final ds = DataSource(
DataSourceOptions(
name: 'myapp',
driver: SqliteDriverAdapter.file('database.sqlite'),
entities: [UserOrmDefinition.definition],
),
);
await ds.init(); // Auto-registers and sets as default
// Static helpers now work automatically!
final users = await Users.query().get();
final post = await Posts.find(1);
}
ConnectionManager + Resolver Binding
Static helpers resolve connections in this order:
- A custom resolver set via
Model.bindConnectionResolver(...). - The default
ConnectionManagerregistration (set byinit()orsetAsDefault()).
If you need complete control (multi-tenant, sharded, per-request routing), bind your own resolver:
Model.bindConnectionResolver(
resolveConnection: (_) => myQueryContext,
connectionManager: ConnectionManager.instance,
defaultConnection: 'primary',
);
To clear static bindings (tests or teardown):
Model.unbindConnectionResolver();
ConnectionManager.instance.clearDefault();
DataSource.clearDefault();
Querying Data
Use query<T>() to create a typed query builder:
Future<void> queryingExamples(DataSource ds) async {
// Simple query
final allUsers = await ds.query<$User>().get();
// With filters
final activeUsers = await ds
.query<$User>()
.whereEquals('active', true)
.orderBy('createdAt', descending: true)
.limit(10)
.get();
// With relations
final posts = await ds.query<$Post>().with_(['author', 'comments']).get();
}
Repository Operations
Use repo<T>() for CRUD operations:
Future<void> repositoryExamples(DataSource ds) async {
final userRepo = ds.repo<$User>();
// Insert
await userRepo.insert(
$User(id: 0, email: 'new@example.com', name: 'New User'),
);
// Insert many
await userRepo.insertMany([
$User(id: 0, email: 'user1@example.com'),
$User(id: 0, email: 'user2@example.com'),
]);
// Update
final user = await userRepo.find(1);
if (user != null) {
user.setAttribute('name', 'Updated');
await userRepo.update(user);
}
// Delete
await userRepo.delete({'id': 1});
}
Transactions
Execute multiple operations atomically:
Future<void> transactionExamples(DataSource ds) async {
await ds.transaction(() async {
final user = await ds.repo<$User>().insert(
$User(id: 0, email: 'alice@example.com', name: 'Alice'),
);
await ds.repo<$Post>().insert(
$Post(id: 0, authorId: user.id, title: 'First Post'),
);
// If any operation fails, all changes are rolled back
});
// Transactions can return values
final result = await ds.transaction(() async {
final user = await ds.repo<$User>().insert(
$User(id: 0, email: 'bob@example.com', name: 'Bob'),
);
return user;
});
}
Use transactions for any sequence where partial success would leave inconsistent state.
Ad-hoc Table Queries
Query tables without a model definition:
Future<void> adhocTableExamples(DataSource ds) async {
final logs = await ds
.table('audit_logs')
.whereEquals('action', 'login')
.orderBy('timestamp', descending: true)
.limit(100)
.get();
for (final log in logs) {
print('${log['user_id']} logged in at ${log['timestamp']}');
}
}
Query Logging & Debugging
- Basic
- Options
Future<void> loggingExamples(DataSource ds) async {
// Enable logging
ds.enableQueryLog(includeParameters: true);
// Execute queries...
await ds.query<$User>().get();
// Review the log
for (final entry in ds.queryLog) {
print('SQL: ${entry.sql}');
print('Bindings: ${entry.bindings}');
print('Duration: ${entry.duration}');
}
// Clear when done
ds.clearQueryLog();
ds.disableQueryLog();
}
DataSource createDataSourceWithLogging(DriverAdapter driver) {
final ds = DataSource(
DataSourceOptions(
driver: driver,
entities: [UserOrmDefinition.definition],
logging: true,
),
);
// Or enable at runtime
ds.enableQueryLog(includeParameters: true);
ds.disableQueryLog();
return ds;
}
Access Query Log
void accessQueryLog(DataSource ds) {
for (final entry in ds.queryLog) {
print('SQL: ${entry.sql}');
print('Bindings: ${entry.bindings}');
print('Duration: ${entry.duration}');
}
ds.clearQueryLog();
}
Contextual Logger
DataSource createDataSourceWithLogger(DriverAdapter driver) {
final logger = contextual.Logger(defaultChannelEnabled: false);
return DataSource(
DataSourceOptions(
driver: driver,
entities: [UserOrmDefinition.definition],
logging: true,
logger: logger,
),
);
}
Pretend Mode
Preview SQL without executing:
Future<void> pretendModeExample(DataSource ds) async {
final statements = await ds.pretend(() async {
await ds.repo<$User>().insert($User(id: 0, email: 'test@example.com'));
});
for (final entry in statements) {
print('Would execute: ${entry.sql}');
}
// No actual database changes occurred
}
Execution Hooks
void executionHooksExample(DataSource ds) {
final unregister = ds.beforeExecuting((statement) {
print('[SQL] ${statement.sqlWithBindings}');
});
// Later, unregister
unregister();
}
Multiple DataSources
Create separate data sources for different databases:
- Notes
- Code
Use multiple data sources when you need separate databases (tenants, analytics, read/write split, etc.). Keep them named and dispose them on shutdown.
- Setup
- Default
- Query
Define and initialize two DataSources (each has its own connection).
final mainDs = DataSource(
DataSourceOptions(
name: 'main',
driver: SqliteDriverAdapter.inMemory(),
entities: [UserOrmDefinition.definition],
),
);
final analyticsDs = DataSource(
DataSourceOptions(
name: 'analytics',
driver: SqliteDriverAdapter.inMemory(),
entities: [PostOrmDefinition.definition],
),
);
await mainDs.init();
await analyticsDs.init();
Optionally set a default connection for static helpers:
// Set main as default
mainDs.setAsDefault();
Query through the DataSource you want (or pass connection: to static helpers).
// Query specific databases
final users = await mainDs.query<$User>().get();
final events = await analyticsDs.query<$Post>().get();
// Use static helpers with connection parameter
final analyticsUsers = await Users.query('analytics').get();
Common Production Pattern
- Keep one primary write datasource.
- Add read/analytics datasource only when needed.
- Name connections explicitly (
primary,analytics, tenant IDs). - Avoid implicit switching in shared utility code.
Lifecycle Management
Future<void> lifecycleExample(DataSource ds) async {
// Check initialization state
if (!ds.isInitialized) {
await ds.init();
}
// Cleanup
await ds.dispose();
// Re-initialize if needed
await ds.init();
}
Disposing a default DataSource unregisters it from ConnectionManager and clears static helper bindings, so subsequent static calls require a new default to be set.
Access Underlying Components
void accessUnderlyingComponents(DataSource ds) {
final conn = ds.connection; // ORM connection
final ctx = ds.context; // Query context
final registry = ds.registry; // Model registry
final codecs = ds.codecRegistry; // Codec registry
}
Custom Codecs
class JsonCodec extends ValueCodec<Map<String, dynamic>> {
Object? encode(Map<String, dynamic>? value) =>
value != null ? jsonEncode(value) : null;
Map<String, dynamic>? decode(Object? value) =>
value is String ? jsonDecode(value) : null;
}
DataSource createDataSourceWithCodecs(DriverAdapter driver) {
final ds = DataSource(
DataSourceOptions(
driver: driver,
entities: [UserOrmDefinition.definition],
codecs: {'JsonMap': JsonCodec()},
),
);
return ds;
}