import 'dart:async'; import 'dart:convert'; import 'dart:io'; enum WebSocketStatus { connecting, connected, disconnected, error, } typedef WebSocketMessageHandler = void Function(dynamic message); typedef WebSocketStatusHandler = void Function(WebSocketStatus status); class WebsocketClientWrap { final String url; final Duration reconnectInterval; final Map? headers; WebSocket? _webSocket; StreamSubscription? _subscription; Timer? _reconnectTimer; bool _disposed = false; WebSocketStatus _status = WebSocketStatus.disconnected; WebSocketStatus get status => _status; final List _messageHandlers = []; final List _statusHandlers = []; WebsocketClientWrap({ required this.url, this.reconnectInterval = const Duration(seconds: 5), this.headers, }); /// 连接 WebSocket Future connect() async { if (_disposed) { throw StateError('WebSocket has been disposed'); } if (_status == WebSocketStatus.connected || _status == WebSocketStatus.connecting) { return; } _updateStatus(WebSocketStatus.connecting); try { _webSocket = await WebSocket.connect(url, headers: headers); _updateStatus(WebSocketStatus.connected); _subscription = _webSocket!.listen( _handleMessage, onError: _handleError, onDone: _handleDone, ); // 取消重连定时器 _reconnectTimer?.cancel(); _reconnectTimer = null; } catch (e) { _handleError(e); _scheduleReconnect(); } } /// 断开连接 Future disconnect() async { _reconnectTimer?.cancel(); _reconnectTimer = null; await _subscription?.cancel(); _subscription = null; await _webSocket?.close(); _webSocket = null; _updateStatus(WebSocketStatus.disconnected); } /// 发送消息 void send(dynamic message) { if (_status != WebSocketStatus.connected || _webSocket == null) { throw StateError('WebSocket is not connected'); } if (message is String) { _webSocket!.add(message); } else if (message is Map || message is List) { _webSocket!.add(jsonEncode(message)); } else { _webSocket!.add(message.toString()); } } /// 添加消息监听器 void addMessageListener(WebSocketMessageHandler handler) { _messageHandlers.add(handler); } /// 移除消息监听器 void removeMessageListener(WebSocketMessageHandler handler) { _messageHandlers.remove(handler); } /// 添加状态监听器 void addStatusListener(WebSocketStatusHandler handler) { _statusHandlers.add(handler); } /// 移除状态监听器 void removeStatusListener(WebSocketStatusHandler handler) { _statusHandlers.remove(handler); } /// 释放资源 Future dispose() async { _disposed = true; await disconnect(); _messageHandlers.clear(); _statusHandlers.clear(); } void _handleMessage(dynamic message) { try { // 尝试解析 JSON 消息 final decoded = jsonDecode(message); for (final handler in _messageHandlers) { handler(decoded); } } catch (e) { // 如果不是 JSON,直接传递原始消息 for (final handler in _messageHandlers) { handler(message); } } } void _handleError(dynamic error) { _updateStatus(WebSocketStatus.error); _scheduleReconnect(); } void _handleDone() { if (_status != WebSocketStatus.disconnected && !_disposed) { _updateStatus(WebSocketStatus.disconnected); _scheduleReconnect(); } } void _scheduleReconnect() { if (_disposed || _reconnectTimer != null) return; _reconnectTimer = Timer(reconnectInterval, () { _reconnectTimer = null; connect(); }); } void _updateStatus(WebSocketStatus status) { _status = status; for (final handler in _statusHandlers) { handler(status); } } }