Skip to content

Instantly share code, notes, and snippets.

@logickoder
Created April 1, 2025 16:02
Show Gist options
  • Save logickoder/73fac7ef49fca5538cbfeb8c9cf4eaf4 to your computer and use it in GitHub Desktop.
Save logickoder/73fac7ef49fca5538cbfeb8c9cf4eaf4 to your computer and use it in GitHub Desktop.
Simple flutter websocket controller with riverpod
import 'dart:async';
import 'dart:convert';
import 'package:collection/collection.dart';
import 'package:flutter/cupertino.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import '../../../config/app_config.dart';
import '../../../core/logger.dart';
import '../../authentication/domain/usecase.dart';
import '../data/dto.dart';
import '../data/model.dart';
import 'state.dart';
final messageController = AsyncNotifierProvider.autoDispose
.family<MessageController, MessageState, int>(MessageController.new);
class MessageController
extends AutoDisposeFamilyAsyncNotifier<MessageState, int> {
static const _maxRetry = 5;
var _active = true;
WebSocketChannel? _channel;
MessageState get _currentState => state.value!;
@override
FutureOr<MessageState> build(int arg) {
_connect();
ref.onDispose(disconnect);
return const MessageState();
}
Future<void> _connect({int retryCount = 0}) async {
if (!_active) {
return;
}
try {
state = const AsyncLoading();
final token = await ref.read(authenticationUsecase).getToken();
final uri = Uri.parse('${AppConfig.realTimeBaseUrl}/ws/chats/$arg/');
logger.i("Connecting to $uri");
_channel = IOWebSocketChannel.connect(
uri,
headers: {"Authorization": "Bearer $token"},
pingInterval: const Duration(seconds: 30),
connectTimeout: const Duration(seconds: 10),
);
await _channel!.ready;
state = const AsyncData(MessageState(state: ConnectionState.active));
sendMessage(FetchMessageRequest(command: 'fetch_messages', roomId: arg));
_channel!.stream.listen(
_onData,
onError: _onError,
onDone: _onDone,
cancelOnError: false,
);
} catch (error, stackTrace) {
if (retryCount < _maxRetry) {
logger.e(
"Failed to connect. Retrying in 1 second.",
error: error,
stackTrace: stackTrace,
);
return Future.delayed(
const Duration(seconds: 1),
() => _connect(retryCount: retryCount + 1),
);
} else {
logger.e(
"Failed to connect after $_maxRetry attempts.",
error: error,
stackTrace: stackTrace,
);
state = AsyncError(error, stackTrace);
}
}
}
void sendMessage(data) {
final message = jsonEncode(data.toJson());
logger.i("Sending message $message");
_channel?.sink.add(message);
}
void disconnect() {
logger.i("Disconnecting from channel $arg");
_channel?.sink.close();
_channel = null;
_active = false;
}
void _onData(data) {
if (data == null) {
return;
}
final Map<String, dynamic>? decodedMessage = jsonDecode(data.toString());
if (decodedMessage == null) {
return;
}
logger.i("---------- New message: $decodedMessage");
switch (decodedMessage['command']) {
case "fetch_messages":
{
final payload = GetRecentMessagesResponse.fromJson(decodedMessage);
if (payload.messages != _currentState.messages) {
state = AsyncData(
_currentState.copyWith(messages: payload.messages),
);
}
}
break;
case "new_message":
{
final payload = NewMessageResponse.fromJson(decodedMessage);
final exists = _currentState.messages.firstWhereOrNull(
(e) => e.id == payload.id,
);
if (exists == null) {
state = AsyncData(
_currentState.copyWith(
messages: [
..._currentState.messages,
RecentMessage(
id: payload.id,
message: payload.message,
createdAt: payload.createdAt,
sender: Sender(
id: payload.senderId,
username: payload.senderUsername ?? '',
),
),
],
),
);
}
}
break;
}
}
void _onDone() {
state = const AsyncData(MessageState(state: ConnectionState.done));
disconnect();
}
void _onError(error, stackTrace) {
state = AsyncError(error, stackTrace);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment