Skip to main content

DataSource

The DataSource class provides a modern, declarative API for configuring and using the ORM. It bundles driver configuration, entity registration, and connection management into a single interface.

Prerequisites

What You’ll Learn

  • How DataSourceOptions shape runtime behavior
  • How initialization and default connection selection work
  • How to choose between generated helpers and manual datasource composition

Step Outcome

By the end of this page, you should be able to:

  • Build a DataSource from generated options or manual options
  • Initialize and dispose connections safely
  • Route queries to default or named connections intentionally

Minimal Lifecycle (Always)

  1. Construct DataSource with options.
  2. Call await ds.init() once.
  3. Run query/repository operations.
  4. Call await ds.dispose() on shutdown/tests.

Overview

Future<void> dataSourceOverview() async {
final ds = DataSource(
DataSourceOptions(
driver: SqliteDriverAdapter.file('database.sqlite'),
entities: [UserOrmDefinition.definition, PostOrmDefinition.definition],
),
);

await ds.init();

// Use the ORM
final users = await ds.query<$User>().get();

await ds.dispose();
}

DataSourceOptions

OptionTypeDefaultDescription
driverDriverAdapterrequiredThe database driver adapter
entitiesList<ModelDefinition>[]Models to register. Provide this or registry.
registryModelRegistry?nullGenerated registry (includes type aliases). If provided and entities is empty, models are taken from the registry.
nameString'default'Logical name for the connection
databaseString?nullDatabase/catalog identifier for observability
tablePrefixString''Prefix applied to unqualified table names (queries, joins, pivots, and schema operations). Schema-qualified names are not prefixed.
defaultSchemaString?nullDefault schema for ad-hoc queries
codecsMap<String, ValueCodec>{}Custom value codecs to register
loggingboolfalseEnable query logging
loggercontextual.Logger?nullOptional contextual logger for default query logging (used only when logging is true)

Example Configuration (with generated registry)

DataSourceOptions createOptions(DriverAdapter driver) {
final options = DataSourceOptions(
driver: driver,
entities: [UserOrmDefinition.definition, PostOrmDefinition.definition],
name: 'primary',
database: 'myapp',
tablePrefix: 'app_',
defaultSchema: 'public',
logging: true,
);
return options;
}

Initialization

Always call init() before using the data source:

Future<void> initializeDataSource(DataSource ds) async {
await ds.init();
// init() is idempotent - calling multiple times is safe
}

The init() method:

  • Is idempotent—calling it multiple times has no effect
  • Automatically registers the DataSource with ConnectionManager
  • Automatically sets it as default if it's the first DataSource initialized

If you use static model helpers (Users.query()), make sure a default datasource has been initialized or explicitly set.

Using Static Model Helpers

Once initialized, the first DataSource automatically becomes the default. This enables using generated model helper classes (like Users / Posts) without threading a DataSource everywhere.

Future<void> staticHelpersExample() async {
final ds = DataSource(
DataSourceOptions(
name: 'myapp',
driver: SqliteDriverAdapter.file('database.sqlite'),
entities: [UserOrmDefinition.definition],
),
);

await ds.init(); // Auto-registers and sets as default

// Static helpers now work automatically!
final users = await Users.query().get();
final post = await Posts.find(1);
}

ConnectionManager + Resolver Binding

Static helpers resolve connections in this order:

  1. A custom resolver set via Model.bindConnectionResolver(...).
  2. The default ConnectionManager registration (set by init() or setAsDefault()).

If you need complete control (multi-tenant, sharded, per-request routing), bind your own resolver:

Model.bindConnectionResolver(
resolveConnection: (_) => myQueryContext,
connectionManager: ConnectionManager.instance,
defaultConnection: 'primary',
);

To clear static bindings (tests or teardown):

Model.unbindConnectionResolver();
ConnectionManager.instance.clearDefault();
DataSource.clearDefault();

Querying Data

Use query<T>() to create a typed query builder:

Future<void> queryingExamples(DataSource ds) async {
// Simple query
final allUsers = await ds.query<$User>().get();

// With filters
final activeUsers = await ds
.query<$User>()
.whereEquals('active', true)
.orderBy('createdAt', descending: true)
.limit(10)
.get();

// With relations
final posts = await ds.query<$Post>().with_(['author', 'comments']).get();
}

Repository Operations

Use repo<T>() for CRUD operations:

Future<void> repositoryExamples(DataSource ds) async {
final userRepo = ds.repo<$User>();

// Insert
await userRepo.insert(
$User(id: 0, email: 'new@example.com', name: 'New User'),
);

// Insert many
await userRepo.insertMany([
$User(id: 0, email: 'user1@example.com'),
$User(id: 0, email: 'user2@example.com'),
]);

// Update
final user = await userRepo.find(1);
if (user != null) {
user.setAttribute('name', 'Updated');
await userRepo.update(user);
}

// Delete
await userRepo.delete({'id': 1});
}

Transactions

Execute multiple operations atomically:

Future<void> transactionExamples(DataSource ds) async {
await ds.transaction(() async {
final user = await ds.repo<$User>().insert(
$User(id: 0, email: 'alice@example.com', name: 'Alice'),
);

await ds.repo<$Post>().insert(
$Post(id: 0, authorId: user.id, title: 'First Post'),
);

// If any operation fails, all changes are rolled back
});

// Transactions can return values
final result = await ds.transaction(() async {
final user = await ds.repo<$User>().insert(
$User(id: 0, email: 'bob@example.com', name: 'Bob'),
);
return user;
});
}

Use transactions for any sequence where partial success would leave inconsistent state.

Ad-hoc Table Queries

Query tables without a model definition:

Future<void> adhocTableExamples(DataSource ds) async {
final logs = await ds
.table('audit_logs')
.whereEquals('action', 'login')
.orderBy('timestamp', descending: true)
.limit(100)
.get();

for (final log in logs) {
print('${log['user_id']} logged in at ${log['timestamp']}');
}
}

Query Logging & Debugging

Future<void> loggingExamples(DataSource ds) async {
// Enable logging
ds.enableQueryLog(includeParameters: true);

// Execute queries...
await ds.query<$User>().get();

// Review the log
for (final entry in ds.queryLog) {
print('SQL: ${entry.sql}');
print('Bindings: ${entry.bindings}');
print('Duration: ${entry.duration}');
}

// Clear when done
ds.clearQueryLog();
ds.disableQueryLog();
}

Access Query Log

void accessQueryLog(DataSource ds) {
for (final entry in ds.queryLog) {
print('SQL: ${entry.sql}');
print('Bindings: ${entry.bindings}');
print('Duration: ${entry.duration}');
}

ds.clearQueryLog();
}

Contextual Logger

DataSource createDataSourceWithLogger(DriverAdapter driver) {
final logger = contextual.Logger(defaultChannelEnabled: false);
return DataSource(
DataSourceOptions(
driver: driver,
entities: [UserOrmDefinition.definition],
logging: true,
logger: logger,
),
);
}

Pretend Mode

Preview SQL without executing:

Future<void> pretendModeExample(DataSource ds) async {
final statements = await ds.pretend(() async {
await ds.repo<$User>().insert($User(id: 0, email: 'test@example.com'));
});

for (final entry in statements) {
print('Would execute: ${entry.sql}');
}
// No actual database changes occurred
}

Execution Hooks

void executionHooksExample(DataSource ds) {
final unregister = ds.beforeExecuting((statement) {
print('[SQL] ${statement.sqlWithBindings}');
});

// Later, unregister
unregister();
}

Multiple DataSources

Create separate data sources for different databases:

Use multiple data sources when you need separate databases (tenants, analytics, read/write split, etc.). Keep them named and dispose them on shutdown.

Common Production Pattern

  • Keep one primary write datasource.
  • Add read/analytics datasource only when needed.
  • Name connections explicitly (primary, analytics, tenant IDs).
  • Avoid implicit switching in shared utility code.

Lifecycle Management

Future<void> lifecycleExample(DataSource ds) async {
// Check initialization state
if (!ds.isInitialized) {
await ds.init();
}

// Cleanup
await ds.dispose();

// Re-initialize if needed
await ds.init();
}

Disposing a default DataSource unregisters it from ConnectionManager and clears static helper bindings, so subsequent static calls require a new default to be set.

Access Underlying Components

void accessUnderlyingComponents(DataSource ds) {
final conn = ds.connection; // ORM connection
final ctx = ds.context; // Query context
final registry = ds.registry; // Model registry
final codecs = ds.codecRegistry; // Codec registry
}

Custom Codecs

class JsonCodec extends ValueCodec<Map<String, dynamic>> {

Object? encode(Map<String, dynamic>? value) =>
value != null ? jsonEncode(value) : null;


Map<String, dynamic>? decode(Object? value) =>
value is String ? jsonDecode(value) : null;
}

DataSource createDataSourceWithCodecs(DriverAdapter driver) {
final ds = DataSource(
DataSourceOptions(
driver: driver,
entities: [UserOrmDefinition.definition],
codecs: {'JsonMap': JsonCodec()},
),
);
return ds;
}

Read This Next