Preventing subject/observable completion after throwError()?
I have a service with a series or watch$(...)
methods that returns observables for different data in my system and tracks those observables to push updates indefinitely.
The service polls a backend source for data subscribers need and processes and passes the results along to those subscribers via the observables returned from watch$(
). Many of these return types are composed and transformed objects, so there’s a lot of cross-requesting within the service. So far it works really well.
There are often business or infrastructure errors due to the nature of the data and upstream sources. These always need to be communicated with the user and the polling doesn’t stop as long as there are subscribers. Sometimes the user can address the underlying issue and sometimes it’s just a dependency returning an error, etc. But whatever the result, something needs to make its way out to the subscribers every time there’s an update.
I first tried to address this by using RxJS error handling, but the problem is that subjects complete as soon as you use error()
and observables complete if you let it get to the error
handler. I should have read the docs closer on that one because it wasn’t obvious what was going on until the streams never recovered after a single dependency error.
To work around this I now wrap every response in an API response object that includes a state
field that can be success
or any of a number of known error types. However, this means I have overhead at every level of the pipeline since each service response would need to check to see if the call succeeded and then recast the typed error if it didn’t. It has become really messy and doubled my code in simpler methods vs. what I had with RxJS error handling.
At this point I don’t know of a better way to handle this situation. Any time an object (or one of its dependencies) is refreshed inside the service the subscribers need to get a result. But if a backend request fails then it still gets bounced around through every pipeline operator on its way to the subscribers despite only ever being returned in the first line of each operator during a success check.
It's also a mess because now I have all my API errors handled in next
handlers at the UI level but I still also need error
handlers for unexpected errors that might come up.
What I really need is something that allows me to use throwError()
(or something like it) but not have everything complete. If that was a flag I could set on subjects when they were created or a parameter I could pass to throwError()
it would solve all my problems.
It’s also possible that there’s a better way to do this and I’m just not seeing it.
Has anyone solved this in a good way?
Edit: Here are some simple examples. Note that none of thesubject.next(2)
reach the terminal and complete
only emits on the observable when it catches the error.
Uncaught subject error
const subject = new Subject<number>();
subject.subscribe({
next: value => console.log(`next: ${value}`),
error: error => console.log(`error: ${error}`),
complete: () => console.log(`complete`),
});
subject.next(1);
subject.error("error");
subject.next(2);
Logs:
next: 1
error: error
Caught subject error
const subject = new Subject<number>();
subject.pipe(catchError(() => of(-1)))
.subscribe({
next: value => console.log(`next: ${value}`),
error: error => console.log(`error: ${error}`),
complete: () => console.log(`complete`),
});
subject.next(1);
subject.error(new Error("Error"));
subject.next(2);
Logs:
next: 1
next: -1
complete
Uncaught observable error
const subject = new Subject<number>();
subject
.pipe(
tap(() => {
throw "error";
}),
catchError(() => of(-1))
)
.subscribe({
next: value => console.log(`next: ${value}`),
error: error => console.log(`error: ${error}`),
complete: () => console.log(`complete`),
});
subject.next(1);
subject.next(2);
Logs:
error: error
Caught observable error
const subject = new Subject<number>();
subject
.pipe(
tap(() => {
throw "error";
}),
catchError(() => of(-1))
)
.subscribe({
next: value => console.log(`next: ${value}`),
error: error => console.log(`error: ${error}`),
complete: () => console.log(`complete`),
});
subject.next(1);
subject.next(2);
Logs:
next: -1
complete