feat: implement real-time database synchronization with SSE and update backend modules
This commit is contained in:
@@ -7,7 +7,8 @@ on:
|
||||
branches: [ main, develop ]
|
||||
|
||||
jobs:
|
||||
test:
|
||||
backend-pr-quick:
|
||||
if: github.event_name == 'pull_request'
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
@@ -28,14 +29,64 @@ jobs:
|
||||
working-directory: ./backend
|
||||
run: npm ci
|
||||
|
||||
- name: Prisma schema validate
|
||||
working-directory: ./backend
|
||||
run: npx prisma validate --schema prisma/schema.prisma
|
||||
|
||||
- name: Generate Prisma Client
|
||||
working-directory: ./backend
|
||||
run: npm run prisma:generate
|
||||
|
||||
- name: Verify generated Prisma client is typed
|
||||
working-directory: ./backend
|
||||
run: |
|
||||
if ! grep -q "export \* from '.prisma/client/default'" node_modules/@prisma/client/index.d.ts; then
|
||||
echo "Prisma client export is unexpected";
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
- name: Build NestJS app
|
||||
working-directory: ./backend
|
||||
run: npm run build
|
||||
|
||||
backend-full:
|
||||
if: github.event_name == 'push'
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
node-version: [24.15.0]
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Setup Node.js ${{ matrix.node-version }}
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: ${{ matrix.node-version }}
|
||||
cache: 'npm'
|
||||
|
||||
- name: Install dependencies (backend)
|
||||
working-directory: ./backend
|
||||
run: npm ci
|
||||
|
||||
- name: Prisma schema validate
|
||||
working-directory: ./backend
|
||||
run: npx prisma validate --schema prisma/schema.prisma
|
||||
|
||||
- name: Generate Prisma Client
|
||||
working-directory: ./backend
|
||||
run: npm run prisma:generate
|
||||
|
||||
- name: Verify generated Prisma client is typed
|
||||
working-directory: ./backend
|
||||
run: |
|
||||
if ! grep -q "export \* from '.prisma/client/default'" node_modules/@prisma/client/index.d.ts; then
|
||||
echo "Prisma client export is unexpected";
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
- name: Dependency audit (high+critical)
|
||||
working-directory: ./backend
|
||||
run: npm audit --audit-level=high
|
||||
@@ -47,7 +98,6 @@ jobs:
|
||||
- name: Build NestJS app
|
||||
working-directory: ./backend
|
||||
run: npm run build
|
||||
continue-on-error: true
|
||||
|
||||
flutter-quality:
|
||||
runs-on: ubuntu-latest
|
||||
@@ -65,6 +115,16 @@ jobs:
|
||||
- 'flutter/**'
|
||||
- '.github/workflows/test.yml'
|
||||
|
||||
- name: Set Flutter test mode
|
||||
if: steps.filter.outputs.flutter == 'true'
|
||||
shell: bash
|
||||
run: |
|
||||
if [ "${{ github.event_name }}" = "pull_request" ]; then
|
||||
echo "FLUTTER_TEST_CMD=flutter test --reporter=compact" >> "$GITHUB_ENV"
|
||||
else
|
||||
echo "FLUTTER_TEST_CMD=flutter test" >> "$GITHUB_ENV"
|
||||
fi
|
||||
|
||||
- name: Setup Flutter
|
||||
if: steps.filter.outputs.flutter == 'true'
|
||||
uses: subosito/flutter-action@v2
|
||||
@@ -84,4 +144,4 @@ jobs:
|
||||
- name: Run Flutter tests
|
||||
if: steps.filter.outputs.flutter == 'true'
|
||||
working-directory: ./flutter
|
||||
run: flutter test
|
||||
run: ${{ env.FLUTTER_TEST_CMD }}
|
||||
|
||||
@@ -16,6 +16,7 @@ import { UsersModule } from './users/users.module';
|
||||
import { UserProductsModule } from './user-products/user-products.module';
|
||||
import { CategoriesModule } from './categories/categories.module';
|
||||
import { AiModule } from './ai/ai.module';
|
||||
import { RealtimeModule } from './realtime/realtime.module';
|
||||
import { JwtAuthGuard } from './auth/jwt-auth.guard';
|
||||
import { RolesGuard } from './auth/roles.guard';
|
||||
|
||||
@@ -44,6 +45,7 @@ import { RolesGuard } from './auth/roles.guard';
|
||||
UserProductsModule,
|
||||
CategoriesModule,
|
||||
AiModule,
|
||||
RealtimeModule,
|
||||
],
|
||||
providers: [
|
||||
{
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
|
||||
import { PrismaClient } from '@prisma/client';
|
||||
import { Prisma, PrismaClient } from '@prisma/client';
|
||||
import { RealtimeEventsService } from '../realtime/realtime-events.service';
|
||||
|
||||
@Injectable()
|
||||
export class PrismaService
|
||||
@@ -7,9 +8,30 @@ export class PrismaService
|
||||
implements OnModuleInit, OnModuleDestroy
|
||||
{
|
||||
private readonly logger = new Logger(PrismaService.name);
|
||||
private readonly writeActions = new Set<string>([
|
||||
'create',
|
||||
'update',
|
||||
'upsert',
|
||||
'delete',
|
||||
'createMany',
|
||||
'updateMany',
|
||||
'deleteMany',
|
||||
]);
|
||||
|
||||
constructor() {
|
||||
constructor(private readonly realtimeEvents: RealtimeEventsService) {
|
||||
super();
|
||||
|
||||
const realtimeMiddleware: Prisma.Middleware = async (params, next) => {
|
||||
const result = await next(params);
|
||||
|
||||
if (params.model && this.writeActions.has(params.action)) {
|
||||
this.realtimeEvents.notifyDatabaseWrite();
|
||||
}
|
||||
|
||||
return result;
|
||||
};
|
||||
|
||||
this.$use(realtimeMiddleware);
|
||||
}
|
||||
|
||||
async onModuleInit() {
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Subject } from 'rxjs';
|
||||
|
||||
export interface DbChangeEvent {
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class RealtimeEventsService {
|
||||
private readonly subject = new Subject<DbChangeEvent>();
|
||||
private flushTimer: NodeJS.Timeout | null = null;
|
||||
private hasPendingChanges = false;
|
||||
|
||||
readonly events$ = this.subject.asObservable();
|
||||
|
||||
notifyDatabaseWrite(): void {
|
||||
this.hasPendingChanges = true;
|
||||
if (this.flushTimer) return;
|
||||
|
||||
// Coalesce burst writes into one SSE event to reduce client/server load.
|
||||
this.flushTimer = setTimeout(() => {
|
||||
this.flushTimer = null;
|
||||
if (!this.hasPendingChanges) return;
|
||||
this.hasPendingChanges = false;
|
||||
this.subject.next({ timestamp: new Date().toISOString() });
|
||||
}, 400);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
import { Controller, MessageEvent, Sse } from '@nestjs/common';
|
||||
import { interval, map, merge, Observable } from 'rxjs';
|
||||
import { RealtimeEventsService } from './realtime-events.service';
|
||||
|
||||
@Controller('events')
|
||||
export class RealtimeController {
|
||||
constructor(private readonly realtimeEvents: RealtimeEventsService) {}
|
||||
|
||||
@Sse('stream')
|
||||
stream(): Observable<MessageEvent> {
|
||||
const changes$ = this.realtimeEvents.events$.pipe(
|
||||
map((event) => ({
|
||||
type: 'db-change',
|
||||
data: event,
|
||||
})),
|
||||
);
|
||||
|
||||
// Keeps connections alive through proxies and gives clients liveness signal.
|
||||
const heartbeat$ = interval(20_000).pipe(
|
||||
map(() => ({
|
||||
type: 'heartbeat',
|
||||
data: { timestamp: new Date().toISOString() },
|
||||
})),
|
||||
);
|
||||
|
||||
return merge(changes$, heartbeat$);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
import { Global, Module } from '@nestjs/common';
|
||||
import { RealtimeController } from './realtime.controller';
|
||||
import { RealtimeEventsService } from './realtime-events.service';
|
||||
|
||||
@Global()
|
||||
@Module({
|
||||
controllers: [RealtimeController],
|
||||
providers: [RealtimeEventsService],
|
||||
exports: [RealtimeEventsService],
|
||||
})
|
||||
export class RealtimeModule {}
|
||||
@@ -0,0 +1,174 @@
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
|
||||
import 'package:flutter/widgets.dart';
|
||||
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
||||
import 'package:http/http.dart' as http;
|
||||
|
||||
import '../../features/auth/data/auth_providers.dart';
|
||||
import '../../features/inventory/data/inventory_providers.dart';
|
||||
import '../../features/meal_plan/data/meal_plan_providers.dart';
|
||||
import '../../features/pantry/data/pantry_providers.dart';
|
||||
import '../../features/recipes/data/recipe_providers.dart';
|
||||
import '../api/api_providers.dart';
|
||||
|
||||
class RealtimeDbEvent {
|
||||
final String type;
|
||||
final DateTime? timestamp;
|
||||
|
||||
const RealtimeDbEvent({
|
||||
required this.type,
|
||||
this.timestamp,
|
||||
});
|
||||
|
||||
factory RealtimeDbEvent.fromSse({
|
||||
required String type,
|
||||
required String data,
|
||||
}) {
|
||||
final payload = jsonDecode(data) as Map<String, dynamic>;
|
||||
final timestampRaw = payload['timestamp']?.toString();
|
||||
return RealtimeDbEvent(
|
||||
type: type,
|
||||
timestamp: timestampRaw == null ? null : DateTime.tryParse(timestampRaw),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
class RealtimeSseClient {
|
||||
final String baseUrl;
|
||||
final http.Client _client;
|
||||
|
||||
RealtimeSseClient({required this.baseUrl, http.Client? client})
|
||||
: _client = client ?? http.Client();
|
||||
|
||||
void close() {
|
||||
_client.close();
|
||||
}
|
||||
|
||||
Stream<RealtimeDbEvent> connect({required String token}) async* {
|
||||
while (true) {
|
||||
try {
|
||||
final request = http.Request('GET', Uri.parse('$baseUrl/events/stream'));
|
||||
request.headers['Accept'] = 'text/event-stream';
|
||||
request.headers['Authorization'] = 'Bearer $token';
|
||||
|
||||
final response = await _client.send(request);
|
||||
if (response.statusCode == 401 || response.statusCode == 403) {
|
||||
return;
|
||||
}
|
||||
if (response.statusCode < 200 || response.statusCode >= 300) {
|
||||
throw StateError('SSE connection failed: HTTP ${response.statusCode}');
|
||||
}
|
||||
|
||||
String currentType = 'message';
|
||||
final dataLines = <String>[];
|
||||
|
||||
await for (final line in response.stream
|
||||
.transform(utf8.decoder)
|
||||
.transform(const LineSplitter())) {
|
||||
if (line.isEmpty) {
|
||||
if (dataLines.isNotEmpty) {
|
||||
final data = dataLines.join('\n');
|
||||
if (data.trim().isNotEmpty) {
|
||||
yield RealtimeDbEvent.fromSse(type: currentType, data: data);
|
||||
}
|
||||
}
|
||||
currentType = 'message';
|
||||
dataLines.clear();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (line.startsWith(':')) {
|
||||
continue;
|
||||
}
|
||||
if (line.startsWith('event:')) {
|
||||
currentType = line.substring(6).trim();
|
||||
continue;
|
||||
}
|
||||
if (line.startsWith('data:')) {
|
||||
dataLines.add(line.substring(5).trimLeft());
|
||||
}
|
||||
}
|
||||
} catch (_) {
|
||||
// Reconnect loop keeps stream alive across transient errors.
|
||||
}
|
||||
|
||||
await Future<void>.delayed(const Duration(seconds: 2));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class _RealtimeTickNotifier extends Notifier<int> {
|
||||
@override
|
||||
int build() => 0;
|
||||
|
||||
void bump() {
|
||||
state = state + 1;
|
||||
}
|
||||
}
|
||||
|
||||
final realtimeRefreshTickProvider =
|
||||
NotifierProvider<_RealtimeTickNotifier, int>(_RealtimeTickNotifier.new);
|
||||
|
||||
final realtimeDbEventsProvider = StreamProvider<RealtimeDbEvent>((ref) async* {
|
||||
final token = await ref.watch(authStateProvider.future);
|
||||
if (token == null || token.isEmpty) {
|
||||
return;
|
||||
}
|
||||
|
||||
final apiClient = ref.watch(apiClientProvider);
|
||||
final client = RealtimeSseClient(baseUrl: apiClient.baseUrl);
|
||||
ref.onDispose(client.close);
|
||||
|
||||
yield* client.connect(token: token);
|
||||
});
|
||||
|
||||
class GlobalRealtimeSync extends ConsumerStatefulWidget {
|
||||
final Widget child;
|
||||
|
||||
const GlobalRealtimeSync({super.key, required this.child});
|
||||
|
||||
@override
|
||||
ConsumerState<GlobalRealtimeSync> createState() => _GlobalRealtimeSyncState();
|
||||
}
|
||||
|
||||
class _GlobalRealtimeSyncState extends ConsumerState<GlobalRealtimeSync> {
|
||||
ProviderSubscription<AsyncValue<RealtimeDbEvent>>? _subscription;
|
||||
Timer? _coalesceTimer;
|
||||
|
||||
void _scheduleRefresh() {
|
||||
_coalesceTimer?.cancel();
|
||||
_coalesceTimer = Timer(const Duration(milliseconds: 500), () {
|
||||
if (!mounted) return;
|
||||
|
||||
ref.invalidate(inventoryProvider);
|
||||
ref.invalidate(pantryProvider);
|
||||
ref.invalidate(recipesProvider);
|
||||
ref.invalidate(mealPlanDashboardProvider);
|
||||
ref.read(realtimeRefreshTickProvider.notifier).bump();
|
||||
});
|
||||
}
|
||||
|
||||
@override
|
||||
void initState() {
|
||||
super.initState();
|
||||
_subscription = ref.listenManual(realtimeDbEventsProvider, (_, next) {
|
||||
next.whenData((event) {
|
||||
if (event.type != 'db-change') return;
|
||||
_scheduleRefresh();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@override
|
||||
void dispose() {
|
||||
_coalesceTimer?.cancel();
|
||||
_subscription?.close();
|
||||
super.dispose();
|
||||
}
|
||||
|
||||
@override
|
||||
Widget build(BuildContext context) {
|
||||
return widget.child;
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,10 @@
|
||||
import 'package:flutter/material.dart';
|
||||
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
||||
import 'dart:async';
|
||||
|
||||
import '../../../core/api/api_error_mapper.dart';
|
||||
import '../../../core/l10n/l10n.dart';
|
||||
import '../../../core/realtime/realtime_sync.dart';
|
||||
import 'admin_ai_panel.dart';
|
||||
import 'admin_aliases_panel.dart';
|
||||
import 'admin_inventory_panel.dart';
|
||||
@@ -38,6 +40,32 @@ class AdminDatabasePanel extends ConsumerStatefulWidget {
|
||||
class _AdminDatabasePanelState extends ConsumerState<AdminDatabasePanel> {
|
||||
_DatabaseTab _activeTab = _DatabaseTab.inventory;
|
||||
bool _isRefreshingCategories = false;
|
||||
int _panelRefreshVersion = 0;
|
||||
ProviderSubscription<int>? _realtimeTickSubscription;
|
||||
Timer? _realtimeDebounce;
|
||||
|
||||
@override
|
||||
void initState() {
|
||||
super.initState();
|
||||
_realtimeTickSubscription = ref.listenManual<int>(
|
||||
realtimeRefreshTickProvider,
|
||||
(_, __) {
|
||||
if (!mounted) return;
|
||||
_realtimeDebounce?.cancel();
|
||||
_realtimeDebounce = Timer(const Duration(milliseconds: 600), () {
|
||||
if (!mounted) return;
|
||||
setState(() => _panelRefreshVersion++);
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
void dispose() {
|
||||
_realtimeDebounce?.cancel();
|
||||
_realtimeTickSubscription?.close();
|
||||
super.dispose();
|
||||
}
|
||||
|
||||
List<_DatabaseTabConfig> get _tabConfigs => [
|
||||
_DatabaseTabConfig(
|
||||
@@ -153,7 +181,12 @@ class _AdminDatabasePanelState extends ConsumerState<AdminDatabasePanel> {
|
||||
children: [
|
||||
header,
|
||||
const SizedBox(height: 12),
|
||||
Expanded(child: currentTab.panel),
|
||||
Expanded(
|
||||
child: KeyedSubtree(
|
||||
key: ValueKey('admin-db-${_activeTab.name}-$_panelRefreshVersion'),
|
||||
child: currentTab.panel,
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
);
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import 'package:flutter/material.dart';
|
||||
import 'package:flutter/services.dart';
|
||||
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
||||
import 'dart:async';
|
||||
|
||||
import '../../../core/api/api_error_mapper.dart';
|
||||
import '../../../core/l10n/l10n.dart';
|
||||
import '../../../core/realtime/realtime_sync.dart';
|
||||
import '../data/admin_repository.dart';
|
||||
import '../domain/user_admin.dart';
|
||||
|
||||
@@ -28,6 +30,8 @@ class _AdminUsersPanelState extends ConsumerState<AdminUsersPanel> {
|
||||
bool _filterPremiumOnly = false;
|
||||
bool _filterSharingOffOnly = false;
|
||||
List<UserAdmin> _users = [];
|
||||
ProviderSubscription<int>? _realtimeTickSubscription;
|
||||
Timer? _realtimeDebounce;
|
||||
|
||||
String _sortLabel(_UserSort sort) => switch (sort) {
|
||||
_UserSort.newest => 'Nyast',
|
||||
@@ -71,11 +75,24 @@ class _AdminUsersPanelState extends ConsumerState<AdminUsersPanel> {
|
||||
void initState() {
|
||||
super.initState();
|
||||
_searchCtrl = TextEditingController();
|
||||
_realtimeTickSubscription = ref.listenManual<int>(
|
||||
realtimeRefreshTickProvider,
|
||||
(_, __) {
|
||||
if (!mounted) return;
|
||||
_realtimeDebounce?.cancel();
|
||||
_realtimeDebounce = Timer(const Duration(milliseconds: 600), () {
|
||||
if (!mounted) return;
|
||||
_load();
|
||||
});
|
||||
},
|
||||
);
|
||||
_load();
|
||||
}
|
||||
|
||||
@override
|
||||
void dispose() {
|
||||
_realtimeDebounce?.cancel();
|
||||
_realtimeTickSubscription?.close();
|
||||
_searchCtrl.dispose();
|
||||
super.dispose();
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import 'package:flutter/material.dart';
|
||||
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
||||
|
||||
import '../../../core/api/api_error_mapper.dart';
|
||||
import '../../../core/realtime/realtime_sync.dart';
|
||||
import '../../admin/data/admin_repository.dart';
|
||||
import '../../admin/domain/receipt_alias.dart';
|
||||
|
||||
@@ -16,13 +17,27 @@ class _UserAliasesScreenState extends ConsumerState<UserAliasesScreen> {
|
||||
List<ReceiptAlias> _aliases = [];
|
||||
bool _isLoading = true;
|
||||
String? _error;
|
||||
ProviderSubscription<int>? _realtimeTickSubscription;
|
||||
|
||||
@override
|
||||
void initState() {
|
||||
super.initState();
|
||||
_realtimeTickSubscription = ref.listenManual<int>(
|
||||
realtimeRefreshTickProvider,
|
||||
(_, __) {
|
||||
if (!mounted) return;
|
||||
_load();
|
||||
},
|
||||
);
|
||||
_load();
|
||||
}
|
||||
|
||||
@override
|
||||
void dispose() {
|
||||
_realtimeTickSubscription?.close();
|
||||
super.dispose();
|
||||
}
|
||||
|
||||
Future<void> _load() async {
|
||||
setState(() {
|
||||
_isLoading = true;
|
||||
|
||||
@@ -3,6 +3,7 @@ import 'package:flutter_localizations/flutter_localizations.dart';
|
||||
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
||||
|
||||
import 'core/l10n/l10n.dart';
|
||||
import 'core/realtime/realtime_sync.dart';
|
||||
import 'core/router/app_router.dart';
|
||||
|
||||
void main() {
|
||||
@@ -18,7 +19,9 @@ class RecipeApp extends ConsumerWidget {
|
||||
return MaterialApp.router(
|
||||
onGenerateTitle: (context) => context.l10n.appTitle,
|
||||
builder: (context, child) {
|
||||
return SelectionArea(child: child ?? const SizedBox.shrink());
|
||||
return GlobalRealtimeSync(
|
||||
child: SelectionArea(child: child ?? const SizedBox.shrink()),
|
||||
);
|
||||
},
|
||||
theme: ThemeData(
|
||||
colorScheme: ColorScheme.fromSeed(seedColor: Colors.green),
|
||||
|
||||
Reference in New Issue
Block a user