LogoSkills

clickhouse Reference

Detailed API reference for the serverpod-clickhouse package.

ClickHouse API Reference#

Detailed API reference for the serverpod-clickhouse package.

ClickHouseService#

Initialization#

// Initialize in Serverpod (using passwords.yaml)
await ClickHouseService.initialize(pod);

// Manual initialization (for testing)
await ClickHouseService.initializeWithConfig(
  ClickHouseConfig.cloud(
    host: 'xxx.clickhouse.cloud',
    database: 'analytics',
    username: 'default',
    password: 'xxx',
  ),
);

Instance Access#

final service = ClickHouseService.instance;

// Available components
service.client    // ClickHouseClient - HTTP client
service.tracker   // EventTracker - Event tracking
service.analytics // AnalyticsQueryBuilder - Analytics queries
service.schema    // SchemaManager - Schema management
service.sync      // SyncUtility - Sync utilities

AnalyticsQueryBuilder#

Basic Metrics#

dau() - Daily Active Users

Future<ClickHouseResult> dau({
  DateTime? date,  // Reference date (default: today)
  int days = 30,   // Query period
})

// Usage
final result = await analytics.dau(days: 30);
// Result: [{date: '2025-01-01', dau: 150}, ...]

wau() - Weekly Active Users

Future<ClickHouseResult> wau({int weeks = 12})

// Result: [{week: '2025-01-06', wau: 850}, ...]

mau() - Monthly Active Users

Future<ClickHouseResult> mau({int months = 12})

// Result: [{month: '2025-01-01', mau: 3200}, ...]

Funnel Analysis#

funnel() - Step-by-Step Conversion Analysis

Future<FunnelResult> funnel({
  required List<String> steps,  // Event step list
  int days = 7,                 // Analysis period
  Duration? windowDuration,     // Conversion time window (default: 1 day)
})

// Usage
final result = await analytics.funnel(
  steps: ['sign_up_started', 'email_entered', 'sign_up_completed'],
  days: 7,
);

// Result
print(result.overallConversionRate);  // Overall conversion rate
print(result.stepResults[0].users);    // Step 1 user count
print(result.stepResults[0].dropoffRate); // Drop-off rate

Retention Analysis#

cohortRetention() - Cohort Retention

Future<ClickHouseResult> cohortRetention({
  required String cohortEvent,   // Cohort defining event
  String returnEvent = 'app_opened',  // Return event
  int weeks = 8,                 // Analysis period (weeks)
})

// Result: [{cohort_week, week_number, users}, ...]

nDayRetention() - N-Day Retention

Future<ClickHouseResult> nDayRetention({
  required String cohortEvent,
  String returnEvent = 'app_opened',
  List<int> days = const [1, 7, 30],  // D1, D7, D30
  int lookbackDays = 60,
})

// Result: [{cohort_date, cohort_size, day_1_retained, day_7_retained, day_30_retained}, ...]

Revenue Analysis#

dailyRevenue() - Daily Revenue

Future<ClickHouseResult> dailyRevenue({
  String revenueTable = 'orders',
  int days = 30,
})

// Result: [{date, revenue, order_count, unique_customers}, ...]

arpu() - Average Revenue Per User

Future<ClickHouseResult> arpu({
  String revenueTable = 'orders',
  int months = 6,
})

// Result: [{month, arpu, total_revenue, paying_users}, ...]

Path Analysis#

Future<ClickHouseResult> navigationPaths({
  int days = 7,
  int minCount = 10,      // Minimum transition count
  String? flowName,       // Specific flow filter
})

// Result: [{from_screen, to_screen, transitions, unique_users}, ...]

dropOffPoints() - Drop-off Points

Future<ClickHouseResult> dropOffPoints({
  String? flowName,
  int days = 7,
  int limit = 20,
})

// Result: [{flow_name, abandoned_at, step_index, abandon_count}, ...]

userJourney() - User Journey

Future<ClickHouseResult> userJourney({
  required String userId,
  int days = 7,
  String? sessionId,
})

// Result: [{session_id, timestamp, event_name, from_screen, to_screen}, ...]

Custom Queries#

Future<ClickHouseResult> custom(
  String sql, {
  Map<String, dynamic>? params,
})

// Usage
final result = await analytics.custom('''
  SELECT
    toDate(created_at) as date,
    count() as count
  FROM kobic_analytics.payment_book_order
  WHERE created_at >= now() - INTERVAL {days} DAY
  GROUP BY date
''', params: {'days': 30});

EventTracker#

Event Sending#

Future<void> track(
  String eventName, {
  String? userId,
  String? sessionId,
  Map<String, dynamic>? properties,
  Map<String, dynamic>? context,
})

// Usage
await tracker.track(
  'button_click',
  userId: 'user_123',
  sessionId: 'session_456',
  properties: {
    'button': 'purchase',
    'book_id': '789',
    'price': 15000,
  },
  context: {
    'device': 'mobile',
    'os': 'iOS',
    'app_version': '1.0.0',
  },
);

Batch Sending#

// Immediately send buffered events
await tracker.flush();

// On server shutdown (send all events then cleanup)
await tracker.shutdown();

Configuration#

EventTrackerConfig(
  batchSize: 100,           // Batch size
  flushInterval: Duration(seconds: 30),  // Auto-flush interval
  maxRetries: 3,            // Retry count
)

ClickHouseClient#

Direct Queries#

Future<ClickHouseResult> query(
  String sql, {
  Map<String, dynamic>? params,
})

// Usage
final result = await client.query(
  'SELECT * FROM events WHERE user_id = {user_id}',
  params: {'user_id': 'user_123'},
);

Connection Test#

Future<bool> ping()

// Usage
final connected = await client.ping();

Batch Insert#

Future<void> insert(
  String table,
  List<Map<String, dynamic>> rows,
)

// Usage
await client.insert('events', [
  {'event_name': 'click', 'user_id': 'u1', 'timestamp': DateTime.now()},
  {'event_name': 'view', 'user_id': 'u2', 'timestamp': DateTime.now()},
]);

ClickHouseResult#

Properties#

class ClickHouseResult {
  final List<Map<String, dynamic>> rows;  // Result rows
  final int rowsRead;      // Rows read
  final int bytesRead;     // Bytes read
  final Duration elapsed;  // Query duration
}

Usage#

final result = await analytics.dau(days: 7);

// Access results
for (final row in result.rows) {
  print('${row['date']}: ${row['dau']} users');
}

// Metadata
print('Query took: ${result.elapsed.inMilliseconds}ms');
print('Rows read: ${result.rowsRead}');

Parameter Binding#

Supported Types#

TypeExample
String{user_id} → 'user_123'
int{days} → 30
List{ids} → ('a', 'b', 'c')
DateTime{date} → '2025-01-14'

Example#

await client.query('''
  SELECT * FROM events
  WHERE user_id IN ({user_ids})
    AND timestamp >= {start_date}
    AND event_name = {event}
''', params: {
  'user_ids': ['user_1', 'user_2', 'user_3'],
  'start_date': DateTime(2025, 1, 1),
  'event': 'purchase',
});

Error Handling#

try {
  final result = await analytics.dau(days: 30);
} on ClickHouseException catch (e) {
  print('ClickHouse error: ${e.message}');
  print('Query: ${e.query}');
} catch (e) {
  print('Unexpected error: $e');
}

Performance Optimization#

Batch Queries (Large Datasets)#

// WHERE + IN pattern (99% memory reduction compared to JOIN)
final results = await analytics.batchQuery(
  ids: largeUserList,
  batchSize: 1000,
  sql: 'SELECT * FROM events WHERE user_id IN ({ids})',
);

// Merge results
final merged = analytics.mergeResults(results);

argMax Usage (Avoiding FINAL)#

// Query latest state in ReplacingMergeTree
final latest = await analytics.latestUserState(
  stateTable: 'user_states',
  userIds: ['u1', 'u2'],
  selectColumns: ['status', 'last_login'],
  versionColumn: 'updated_at',
);