import 'dart:async'; import 'dart:convert'; import 'package:flutter/widgets.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:http/http.dart' as http; import '../../features/auth/data/auth_providers.dart'; import '../../features/inventory/data/inventory_providers.dart'; import '../../features/meal_plan/data/meal_plan_providers.dart'; import '../../features/pantry/data/pantry_providers.dart'; import '../../features/recipes/data/recipe_providers.dart'; import '../api/api_providers.dart'; class RealtimeDbEvent { final String type; final DateTime? timestamp; const RealtimeDbEvent({ required this.type, this.timestamp, }); factory RealtimeDbEvent.fromSse({ required String type, required String data, }) { final payload = jsonDecode(data) as Map; final timestampRaw = payload['timestamp']?.toString(); return RealtimeDbEvent( type: type, timestamp: timestampRaw == null ? null : DateTime.tryParse(timestampRaw), ); } } class RealtimeSseClient { final String baseUrl; final http.Client _client; RealtimeSseClient({required this.baseUrl, http.Client? client}) : _client = client ?? http.Client(); void close() { _client.close(); } Stream connect({required String token}) async* { while (true) { try { final request = http.Request('GET', Uri.parse('$baseUrl/events/stream')); request.headers['Accept'] = 'text/event-stream'; request.headers['Authorization'] = 'Bearer $token'; final response = await _client.send(request); if (response.statusCode == 401 || response.statusCode == 403) { return; } if (response.statusCode < 200 || response.statusCode >= 300) { throw StateError('SSE connection failed: HTTP ${response.statusCode}'); } String currentType = 'message'; final dataLines = []; await for (final line in response.stream .transform(utf8.decoder) .transform(const LineSplitter())) { if (line.isEmpty) { if (dataLines.isNotEmpty) { final data = dataLines.join('\n'); if (data.trim().isNotEmpty) { yield RealtimeDbEvent.fromSse(type: currentType, data: data); } } currentType = 'message'; dataLines.clear(); continue; } if (line.startsWith(':')) { continue; } if (line.startsWith('event:')) { currentType = line.substring(6).trim(); continue; } if (line.startsWith('data:')) { dataLines.add(line.substring(5).trimLeft()); } } } catch (_) { // Reconnect loop keeps stream alive across transient errors. } await Future.delayed(const Duration(seconds: 2)); } } } class _RealtimeTickNotifier extends Notifier { @override int build() => 0; void bump() { state = state + 1; } } final realtimeRefreshTickProvider = NotifierProvider<_RealtimeTickNotifier, int>(_RealtimeTickNotifier.new); final realtimeDbEventsProvider = StreamProvider((ref) async* { final token = await ref.watch(authStateProvider.future); if (token == null || token.isEmpty) { return; } final apiClient = ref.watch(apiClientProvider); final client = RealtimeSseClient(baseUrl: apiClient.baseUrl); ref.onDispose(client.close); yield* client.connect(token: token); }); class GlobalRealtimeSync extends ConsumerStatefulWidget { final Widget child; const GlobalRealtimeSync({super.key, required this.child}); @override ConsumerState createState() => _GlobalRealtimeSyncState(); } class _GlobalRealtimeSyncState extends ConsumerState { ProviderSubscription>? _subscription; Timer? _coalesceTimer; void _scheduleRefresh() { _coalesceTimer?.cancel(); _coalesceTimer = Timer(const Duration(milliseconds: 500), () { if (!mounted) return; ref.invalidate(inventoryProvider); ref.invalidate(pantryProvider); ref.invalidate(recipesProvider); ref.invalidate(mealPlanDashboardProvider); ref.read(realtimeRefreshTickProvider.notifier).bump(); }); } @override void initState() { super.initState(); _subscription = ref.listenManual(realtimeDbEventsProvider, (_, next) { next.whenData((event) { if (event.type != 'db-change') return; _scheduleRefresh(); }); }); } @override void dispose() { _coalesceTimer?.cancel(); _subscription?.close(); super.dispose(); } @override Widget build(BuildContext context) { return widget.child; } }