Skip to content

Commit 22b787b

Browse files
Merge pull request #240 from pneumaticapp/frontend/websocket/47593__Catch_errors_for_websockets
47593 frontend [ websocket ] Catch errors for websockets
2 parents 8d8769d + d971733 commit 22b787b

5 files changed

Lines changed: 40 additions & 21 deletions

File tree

frontend/src/public/layout/MainLayout/MainLayout.tsx

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ import { isEnvAnalytics, isEnvPush } from '../../constants/enviroment';
2727
import { IUnsavedUser, TUserListItem } from '../../types/user';
2828
import { checkIsTemplateOwner, loadGroups } from '../../redux/actions';
2929
import { getIsAdmin } from '../../redux/selectors/user';
30-
import { closeAllConnections, hasActiveConnections } from '../../redux/utils/webSocketConnections';
31-
import { promiseDelay } from '../../utils/timeouts';
32-
3330
import styles from './MainLayout.css';
3431

3532
export interface IMainLayoutComponentStoreProps {
@@ -101,13 +98,7 @@ export function MainLayout({
10198

10299
React.useEffect(() => {
103100
if (billingPlan) {
104-
if (hasActiveConnections()) {
105-
closeAllConnections()
106-
.then(() => promiseDelay(500))
107-
.then(() => watchUserWSEventsAction());
108-
} else {
109-
watchUserWSEventsAction();
110-
}
101+
watchUserWSEventsAction();
111102

112103
loadNotificationsList();
113104
loadTasksCount();

frontend/src/public/redux/auth/saga.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { all, call, fork, put, takeEvery, takeLatest, select, takeLeading } from 'redux-saga/effects';
1+
import type { Task } from 'redux-saga';
2+
import { all, call, cancel, fork, put, takeEvery, takeLatest, select, takeLeading } from 'redux-saga/effects';
23
import { auth } from '../../api/auth';
34
import {
45
accountEditFailed,
@@ -66,6 +67,7 @@ import { getUTMParams, IUserUtm } from '../../views/user/utils/utmParams';
6667
import { clearAppFilters, setGeneralLoaderVisibility } from '../general/actions';
6768
import { getQueryStringParams, history } from '../../utils/history';
6869
import { watchWsEvents } from '../realtime/watchWsEvents';
70+
import { closeAllConnections } from '../utils/webSocketConnections';
6971
import { TUploadedFile, uploadUserAvatar } from '../../utils/uploadFiles';
7072
import { changePhotoProfile } from '../../api/changePhotoProfile';
7173
import { ELoggedState, IAuthUser } from '../../types/redux';
@@ -268,6 +270,7 @@ export function* registerWithInvite({ payload }: TRegisterUserInvited) {
268270
}
269271

270272
function* handleLogoutUnsubscribing({ shouldExpireToken }: { shouldExpireToken: boolean }) {
273+
yield call(stopWatchWsEvents);
271274
yield put(clearAppFilters());
272275
resetFirebaseDeviceToken();
273276
resetSuperuserToken();
@@ -688,12 +691,24 @@ export function* watchSetUserToken() {
688691
yield takeEvery(EAuthActions.SetToken, handleSetUserToken);
689692
}
690693

694+
let watchWsEventsTask: Task | undefined;
695+
696+
function* stopWatchWsEvents() {
697+
if (watchWsEventsTask) {
698+
yield cancel(watchWsEventsTask);
699+
watchWsEventsTask = undefined;
700+
}
701+
702+
yield call(closeAllConnections);
703+
}
704+
691705
function* handleWatchUserWSEvents() {
692-
yield fork(watchWsEvents);
706+
yield call(stopWatchWsEvents);
707+
watchWsEventsTask = yield fork(watchWsEvents);
693708
}
694709

695710
export function* watchUserWSEvents() {
696-
yield takeEvery(EAuthActions.WatchUserWSEvents, handleWatchUserWSEvents);
711+
yield takeLeading(EAuthActions.WatchUserWSEvents, handleWatchUserWSEvents);
697712
}
698713

699714
export function* watchUploadUserPhoto() {

frontend/src/public/redux/realtime/utils/routeRealtimeEvent.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { call, put, select } from 'redux-saga/effects';
22

33
import type { IRealtimeWsEnvelope } from '../types';
4+
import type { IStoreTask, IStoreWorkflows } from '../../../types/redux';
45
import { ERealtimeEnvelopeType } from '../types';
56

67
import { handleAddTask, handleRemoveTask } from '../../tasks/saga';
@@ -17,7 +18,6 @@ import { getUserTimezone } from '../../selectors/user';
1718
import { updateWorkflowLogItem } from '../../workflows/slice';
1819
import { isWorkflowEndedEventType } from './isWorkflowEndedEventType';
1920
import { mapBackendNewEventToRedux } from '../../../utils/mappers';
20-
import type { IStoreTask, IStoreWorkflows } from '../../../types/redux';
2121
import { getWorkflowsStore } from '../../selectors/workflows';
2222
import { getTaskStore } from '../../selectors/task';
2323

@@ -86,16 +86,14 @@ export function* routeRealtimeEvent(envelope: IRealtimeWsEnvelope) {
8686
if (item) {
8787
yield call(prependNotificationItem, item);
8888
} else {
89-
logger.error(`unhandled WebSocket event: ${envelope.type}, id=${envelope.id}`);
89+
logger.error(`unhandled WebSocket event: ${envelope.type}`);
9090
}
9191

9292
break;
9393
}
9494
default: {
9595
const unexpectedEnvelope = envelope as IRealtimeWsEnvelope;
96-
logger.error(
97-
`unhandled WebSocket event: ${unexpectedEnvelope.type}, id=${unexpectedEnvelope.id}`,
98-
);
96+
logger.error(`unhandled WebSocket event: ${String(unexpectedEnvelope.type)}`);
9997
break;
10098
}
10199
}

frontend/src/public/redux/realtime/watchWsEvents.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { getBrowserConfigEnv } from '../../utils/getConfig';
88
import { mergePaths } from '../../utils/urls';
99
import type { IRealtimeWsEnvelope } from './types';
1010
import { routeRealtimeEvent } from './utils/routeRealtimeEvent';
11+
import { logger } from '../../utils/logger';
1112

1213

1314

@@ -22,6 +23,10 @@ export function* watchWsEvents() {
2223

2324
while (true) {
2425
const envelope: IRealtimeWsEnvelope = yield take(channel);
25-
yield call(routeRealtimeEvent, envelope);
26+
try {
27+
yield call(routeRealtimeEvent, envelope);
28+
} catch (error) {
29+
logger.error(`failed to route WebSocket event`, error);
30+
}
2631
}
2732
}

frontend/src/public/redux/utils/createWebSocketChannel.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { eventChannel } from 'redux-saga';
22
import { mapToCamelCase } from '../../utils/mappers';
3+
import { logger } from '../../utils/logger';
34
import { addConnection } from './webSocketConnections';
45

56
export interface WebSocketWithRemoveFlag extends WebSocket {
@@ -54,8 +55,16 @@ export function createWebSocketChannel(url: string) {
5455
return;
5556
}
5657

57-
const data = mapToCamelCase(JSON.parse(message.data));
58-
emit(data);
58+
try {
59+
const data = mapToCamelCase(JSON.parse(message.data));
60+
emit(data);
61+
} catch (error) {
62+
logger.error('Invalid WebSocket message', message.data, error);
63+
}
64+
};
65+
66+
ws.onerror = (event) => {
67+
logger.error('WebSocket connection error', event);
5968
};
6069

6170
ws.onclose = () => {
@@ -72,6 +81,7 @@ export function createWebSocketChannel(url: string) {
7281
clearInterval(heartbeatInterval);
7382
heartbeatInterval = null;
7483
}
84+
ws.shouldRemove = true;
7585
ws.close();
7686
};
7787
});

0 commit comments

Comments
 (0)