Created
April 1, 2025 16:02
-
-
Save logickoder/73fac7ef49fca5538cbfeb8c9cf4eaf4 to your computer and use it in GitHub Desktop.
Simple flutter websocket controller with riverpod
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'dart:convert'; | |
import 'package:collection/collection.dart'; | |
import 'package:flutter/cupertino.dart'; | |
import 'package:flutter_riverpod/flutter_riverpod.dart'; | |
import 'package:web_socket_channel/io.dart'; | |
import 'package:web_socket_channel/web_socket_channel.dart'; | |
import '../../../config/app_config.dart'; | |
import '../../../core/logger.dart'; | |
import '../../authentication/domain/usecase.dart'; | |
import '../data/dto.dart'; | |
import '../data/model.dart'; | |
import 'state.dart'; | |
final messageController = AsyncNotifierProvider.autoDispose | |
.family<MessageController, MessageState, int>(MessageController.new); | |
class MessageController | |
extends AutoDisposeFamilyAsyncNotifier<MessageState, int> { | |
static const _maxRetry = 5; | |
var _active = true; | |
WebSocketChannel? _channel; | |
MessageState get _currentState => state.value!; | |
@override | |
FutureOr<MessageState> build(int arg) { | |
_connect(); | |
ref.onDispose(disconnect); | |
return const MessageState(); | |
} | |
Future<void> _connect({int retryCount = 0}) async { | |
if (!_active) { | |
return; | |
} | |
try { | |
state = const AsyncLoading(); | |
final token = await ref.read(authenticationUsecase).getToken(); | |
final uri = Uri.parse('${AppConfig.realTimeBaseUrl}/ws/chats/$arg/'); | |
logger.i("Connecting to $uri"); | |
_channel = IOWebSocketChannel.connect( | |
uri, | |
headers: {"Authorization": "Bearer $token"}, | |
pingInterval: const Duration(seconds: 30), | |
connectTimeout: const Duration(seconds: 10), | |
); | |
await _channel!.ready; | |
state = const AsyncData(MessageState(state: ConnectionState.active)); | |
sendMessage(FetchMessageRequest(command: 'fetch_messages', roomId: arg)); | |
_channel!.stream.listen( | |
_onData, | |
onError: _onError, | |
onDone: _onDone, | |
cancelOnError: false, | |
); | |
} catch (error, stackTrace) { | |
if (retryCount < _maxRetry) { | |
logger.e( | |
"Failed to connect. Retrying in 1 second.", | |
error: error, | |
stackTrace: stackTrace, | |
); | |
return Future.delayed( | |
const Duration(seconds: 1), | |
() => _connect(retryCount: retryCount + 1), | |
); | |
} else { | |
logger.e( | |
"Failed to connect after $_maxRetry attempts.", | |
error: error, | |
stackTrace: stackTrace, | |
); | |
state = AsyncError(error, stackTrace); | |
} | |
} | |
} | |
void sendMessage(data) { | |
final message = jsonEncode(data.toJson()); | |
logger.i("Sending message $message"); | |
_channel?.sink.add(message); | |
} | |
void disconnect() { | |
logger.i("Disconnecting from channel $arg"); | |
_channel?.sink.close(); | |
_channel = null; | |
_active = false; | |
} | |
void _onData(data) { | |
if (data == null) { | |
return; | |
} | |
final Map<String, dynamic>? decodedMessage = jsonDecode(data.toString()); | |
if (decodedMessage == null) { | |
return; | |
} | |
logger.i("---------- New message: $decodedMessage"); | |
switch (decodedMessage['command']) { | |
case "fetch_messages": | |
{ | |
final payload = GetRecentMessagesResponse.fromJson(decodedMessage); | |
if (payload.messages != _currentState.messages) { | |
state = AsyncData( | |
_currentState.copyWith(messages: payload.messages), | |
); | |
} | |
} | |
break; | |
case "new_message": | |
{ | |
final payload = NewMessageResponse.fromJson(decodedMessage); | |
final exists = _currentState.messages.firstWhereOrNull( | |
(e) => e.id == payload.id, | |
); | |
if (exists == null) { | |
state = AsyncData( | |
_currentState.copyWith( | |
messages: [ | |
..._currentState.messages, | |
RecentMessage( | |
id: payload.id, | |
message: payload.message, | |
createdAt: payload.createdAt, | |
sender: Sender( | |
id: payload.senderId, | |
username: payload.senderUsername ?? '', | |
), | |
), | |
], | |
), | |
); | |
} | |
} | |
break; | |
} | |
} | |
void _onDone() { | |
state = const AsyncData(MessageState(state: ConnectionState.done)); | |
disconnect(); | |
} | |
void _onError(error, stackTrace) { | |
state = AsyncError(error, stackTrace); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment