175 lines
4.9 KiB
Dart
175 lines
4.9 KiB
Dart
import 'dart:async';
|
|
import 'dart:convert';
|
|
|
|
import 'package:flutter/widgets.dart';
|
|
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
|
import 'package:http/http.dart' as http;
|
|
|
|
import '../../features/auth/data/auth_providers.dart';
|
|
import '../../features/inventory/data/inventory_providers.dart';
|
|
import '../../features/meal_plan/data/meal_plan_providers.dart';
|
|
import '../../features/pantry/data/pantry_providers.dart';
|
|
import '../../features/recipes/data/recipe_providers.dart';
|
|
import '../api/api_providers.dart';
|
|
|
|
class RealtimeDbEvent {
|
|
final String type;
|
|
final DateTime? timestamp;
|
|
|
|
const RealtimeDbEvent({
|
|
required this.type,
|
|
this.timestamp,
|
|
});
|
|
|
|
factory RealtimeDbEvent.fromSse({
|
|
required String type,
|
|
required String data,
|
|
}) {
|
|
final payload = jsonDecode(data) as Map<String, dynamic>;
|
|
final timestampRaw = payload['timestamp']?.toString();
|
|
return RealtimeDbEvent(
|
|
type: type,
|
|
timestamp: timestampRaw == null ? null : DateTime.tryParse(timestampRaw),
|
|
);
|
|
}
|
|
}
|
|
|
|
class RealtimeSseClient {
|
|
final String baseUrl;
|
|
final http.Client _client;
|
|
|
|
RealtimeSseClient({required this.baseUrl, http.Client? client})
|
|
: _client = client ?? http.Client();
|
|
|
|
void close() {
|
|
_client.close();
|
|
}
|
|
|
|
Stream<RealtimeDbEvent> connect({required String token}) async* {
|
|
while (true) {
|
|
try {
|
|
final request = http.Request('GET', Uri.parse('$baseUrl/events/stream'));
|
|
request.headers['Accept'] = 'text/event-stream';
|
|
request.headers['Authorization'] = 'Bearer $token';
|
|
|
|
final response = await _client.send(request);
|
|
if (response.statusCode == 401 || response.statusCode == 403) {
|
|
return;
|
|
}
|
|
if (response.statusCode < 200 || response.statusCode >= 300) {
|
|
throw StateError('SSE connection failed: HTTP ${response.statusCode}');
|
|
}
|
|
|
|
String currentType = 'message';
|
|
final dataLines = <String>[];
|
|
|
|
await for (final line in response.stream
|
|
.transform(utf8.decoder)
|
|
.transform(const LineSplitter())) {
|
|
if (line.isEmpty) {
|
|
if (dataLines.isNotEmpty) {
|
|
final data = dataLines.join('\n');
|
|
if (data.trim().isNotEmpty) {
|
|
yield RealtimeDbEvent.fromSse(type: currentType, data: data);
|
|
}
|
|
}
|
|
currentType = 'message';
|
|
dataLines.clear();
|
|
continue;
|
|
}
|
|
|
|
if (line.startsWith(':')) {
|
|
continue;
|
|
}
|
|
if (line.startsWith('event:')) {
|
|
currentType = line.substring(6).trim();
|
|
continue;
|
|
}
|
|
if (line.startsWith('data:')) {
|
|
dataLines.add(line.substring(5).trimLeft());
|
|
}
|
|
}
|
|
} catch (_) {
|
|
// Reconnect loop keeps stream alive across transient errors.
|
|
}
|
|
|
|
await Future<void>.delayed(const Duration(seconds: 2));
|
|
}
|
|
}
|
|
}
|
|
|
|
class _RealtimeTickNotifier extends Notifier<int> {
|
|
@override
|
|
int build() => 0;
|
|
|
|
void bump() {
|
|
state = state + 1;
|
|
}
|
|
}
|
|
|
|
final realtimeRefreshTickProvider =
|
|
NotifierProvider<_RealtimeTickNotifier, int>(_RealtimeTickNotifier.new);
|
|
|
|
final realtimeDbEventsProvider = StreamProvider<RealtimeDbEvent>((ref) async* {
|
|
final token = await ref.watch(authStateProvider.future);
|
|
if (token == null || token.isEmpty) {
|
|
return;
|
|
}
|
|
|
|
final apiClient = ref.watch(apiClientProvider);
|
|
final client = RealtimeSseClient(baseUrl: apiClient.baseUrl);
|
|
ref.onDispose(client.close);
|
|
|
|
yield* client.connect(token: token);
|
|
});
|
|
|
|
class GlobalRealtimeSync extends ConsumerStatefulWidget {
|
|
final Widget child;
|
|
|
|
const GlobalRealtimeSync({super.key, required this.child});
|
|
|
|
@override
|
|
ConsumerState<GlobalRealtimeSync> createState() => _GlobalRealtimeSyncState();
|
|
}
|
|
|
|
class _GlobalRealtimeSyncState extends ConsumerState<GlobalRealtimeSync> {
|
|
ProviderSubscription<AsyncValue<RealtimeDbEvent>>? _subscription;
|
|
Timer? _coalesceTimer;
|
|
|
|
void _scheduleRefresh() {
|
|
_coalesceTimer?.cancel();
|
|
_coalesceTimer = Timer(const Duration(milliseconds: 500), () {
|
|
if (!mounted) return;
|
|
|
|
ref.invalidate(inventoryProvider);
|
|
ref.invalidate(pantryProvider);
|
|
ref.invalidate(recipesProvider);
|
|
ref.invalidate(mealPlanDashboardProvider);
|
|
ref.read(realtimeRefreshTickProvider.notifier).bump();
|
|
});
|
|
}
|
|
|
|
@override
|
|
void initState() {
|
|
super.initState();
|
|
_subscription = ref.listenManual(realtimeDbEventsProvider, (_, next) {
|
|
next.whenData((event) {
|
|
if (event.type != 'db-change') return;
|
|
_scheduleRefresh();
|
|
});
|
|
});
|
|
}
|
|
|
|
@override
|
|
void dispose() {
|
|
_coalesceTimer?.cancel();
|
|
_subscription?.close();
|
|
super.dispose();
|
|
}
|
|
|
|
@override
|
|
Widget build(BuildContext context) {
|
|
return widget.child;
|
|
}
|
|
}
|