All-In on RxJS? Think Again.
28 November
RxJS is declarative and clear, as long as there is at least one subscriber and when the source
replays… and starts with something? Wait, is our startWith
before the publishReplay
or after?
It’s a good thing we have the map
function - no, not that map
.
Here’s the thing.
Computers run the code we write with “painful accuracy” [Andy Harris, approximately].
When there’s a bug, it’s because our own brains effed up.
And when we dig a little deeper its because the code - which we wrote - couldn’t be reasoned about easily.
Maybe it didn’t abstract enough so it overwhelmed our working memory,
or maybe it used language that implied something nuanced that wouldn’t have been implied in conversation.
But frankly our job isn’t to manage complexities, it’s to simplify or remove them.
A Little Background
Angular used to be slow. It still is, but it used to, too.
- Mitch Hedberg
RxJS exists because Angular was slow and they couldn’t make it any faster without surfacing a new mental model for devs.
There’s nothing wrong with that and it’s quite common.
Concurrent programming introduces new primitives, assembly languages introduce new instruction types, etc. all in the name of speed.
RxJS is there to reduce the computation required to update the view.
More specifically it reduces computation when no view updates are needed by signaling more precisely which parts of the view need to be updated and when.
(Well, more precisely than Angular’s usual way, but it’s not beating out the rest of the pack).
“But it’s declarative”
Let’s compare a simple component in a TODO app which uses RxJS in a fully “declarative” style to the usual imperative style. Ignoring the template (which is the exact same for each):
(Skip straight to the tl;dr if you don’t want to see the rationale)
Imperative with RxJS
|
“Declarative” RxJS
|
class TodoListComponent {
@onChange('onCurrentUser')
currentUser$ = this.userStore.currentUser$
@onChange('onTodos')
todos: Todo[]
allDone: boolean
error: Error
userName: string
constructor(private userStore: UserStore) {}
onCurrentUser(user) {
this.userName = user?.name
if (user) {
this.todos = undefined
switchFetch('todos', 'GET', todosPath(user.id))
.then(todos => this.todos = todos)
.catch(error => this.error = error)
} else {
this.todos = []
}
}
onTodos(todos) {
this.allDone = todos && !todos.some(todo => !todo.done)
}
}
|
class TodoListComponent {
currentUser$ = this.userStore.currentUser$
todos$: Observable<Todo[]>
allDone$: Observable<boolean>
error$ = new BehaviorSubject<Error>(undefined)
userName$: Observable<string>
constructor(
private userStore: UserStore,
private log: LogService
) {
this.userName$ = this.currentUser$.pipe(pluck('name'))
this.todos$ = this.currentUser$.pipe(
switchMap(currentUser => {
try {
return currentUser
? fromFetch('GET', todosPath(currentUser.id))
.pipe(
startWith(undefined),
catchError(error => {
this.error$.next(error)
return EMPTY
})
)
: of([])
} catch(error) {
log.error(error)
return EMPTY
}
}),
share(),
)
this.allDone$ = this.todos$.pipe(
map(todos => todos && !todos.some(todo => !todo.done)),
)
}
}
|
Choose your adventure:
-
If you’re thinking “What’s with the funky try/catch in the declarative code?” : In the RxJS world, errors can flow through a pipe AND native exceptions can still be thrown. If a native javascript exception happens within that switchMap
without the try/catch, the stream would break and never recover.
-
If you’re thinking “Okay, but they’re practically the same and the one on the right is better because it’s declarative”: you might be right. So far the code looks mostly similar apart from the declarative code having two separate error paths and convoluted control flow.
-
Lastly, if you’re thinking “Just use a store with a BehaviorSubject
for the todos and populate the store using the imperative style”: then you might be ahead of the game, maybe skip to the tl;dr but there are still some good insights in exploring the purely declarative implementations below.)
Let’s add a feature.
Our PM insists that our client’s customers’ users want to be able to add a TODO task, not just stare at the list all day.
We’ll assume there’s a new component in the template for adding a TODO task which is wired up to call a new method onAddTodo
with the new, unsaved task. So all we have to do is (1) refresh the list of TODOs after (2) saving to make sure we have the latest from the server.
Imperative with RxJS
|
“Declarative” RxJS
|
class TodoListComponent {
@onChange('onCurrentUser')
currentUser$ = this.userStore.currentUser$
@onChange('onTodos')
todos: Todo[]
allDone: boolean
error: Error
userName: string
saving: boolean = false // ADDED
constructor(private userStore: UserStore) {}
onCurrentUser(user) {
this.userName = user?.name
if (user) {
this.todos = undefined
this.fetchTodos() // refactored
} else {
this.todos = []
}
}
onTodos(todos) {
this.allDone = todos && !todos.some(todo => !todo.done)
}
// ADDED
onAddTodo(todo) {
this.saving = true
fetch('POST', todosPath(), todo)
.then(() => this.fetchTodos())
.catch(error => this.error = error)
.finally(() => this.saving = false)
}
// refactored
fetchTodos() {
switchFetch('todos', 'GET', todosPath(this.currentUser.id))
.then(todos => this.todos = todos)
.catch(error => this.error = error)
}
}
|
class TodoListComponent {
currentUser$ = this.userStore.currentUser$
todos$ = Observable<Todo[]>
allDone$ = Observable<boolean>
error$ = new BehaviorSubject<Error>(undefined)
saving$ = new BehaviorSubject<boolean>(false) // ADDED
addTodo$ = new Subject<Todo>() // ADDED
addedTodo$ = new Subject<Todo>() // ADDED
constructor(
private userStore: UserStore,
private log: LogService
) {
this.userName$ = this.currentUser$.pipe(pluck('name'))
this.todos$ = combineLatest([ // CHANGED
this.currentUser$,
this.addedTodo$.pipe(startWith(undefined)), // ADDED
]).pipe(
switchMap(([currentUser, _]) => { // CHANGED
try {
return currentUser
? fromFetch('GET', todosPath(currentUser.id))
// WRONG
// This pipe should start with `undefined`
// only when currentUser$ emits a new value,
// not when addedTodo$ emits.
.pipe(
startWith(undefined),
catchError(error => {
this.error$.next(error)
return EMPTY
})
)
: of([])
} catch(error) {
log.error(error)
return EMPTY
}
}),
// refactored
share(),
)
this.allDone$ = this.todos$.pipe(
map(todos => todos && !todos.some(todo => !todo.done)),
)
// ADDED
this.addedTodo$ = this.addTodo$.pipe(
tap(() => this.saving$.next(true)),
switchMap(todo => {
try {
return fromFetch('POST', todosPath(), todo).pipe(
catchError(error => {
this.error$.next(error)
return EMPTY
})
)
} catch(error) {
log.error(error)
return EMPTY
}
}),
tap({
next: () => this.saving$.next(false),
error: () => this.saving$.next(false),
complete: () => this.saving$.next(false)
})
share(),
)
}
// ADDED
onAddTodo(todo) {
this.addTodo$.next(todo)
// If the view is no longer subscribed to anything that
// would cause the pipe to execute the POST request, we
// still want to save the TODO task. Subscribe here to
// make that happen.
this.addedTodo$.pipe(take(1)).subscribe()
}
}
|
One More Feature?
The next thing to do is to remove TODOs, too. One more round of estimating, tasking out, and design-test-implement later:
Imperative with RxJS
|
“Declarative” RxJS
|
class TodoListComponent {
@onChange('onCurrentUser')
currentUser$ = this.userStore.currentUser$
@onChange('onTodos')
todos: Todo[]
allDone: boolean
error: Error
userName: string
saving: boolean = false
constructor(private userStore: UserStore) {}
onCurrentUser(user) {
this.userName = user?.name
if (user) {
this.todos = undefined
this.fetchTodos()
} else {
this.todos = []
}
}
onTodos(todos) {
this.allDone = todos && !todos.some(todo => !todo.done)
}
onAddTodo(todo) {
this.saving = true
fetch('POST', todosPath(), todo)
.then(() => this.fetchTodos())
.catch(error => this.error = error)
.finally(() => this.saving = false)
}
// ADDED
onRemoveTodo(todo) {
this.saving = true
fetch('DELETE', todoPath(todo.id))
.then(() => this.fetchTodos())
.catch(error => this.error = error)
.finally(() => this.saving = false)
}
fetchTodos() {
switchFetch('todos', 'GET', todosPath(this.currentUser.id))
.then(todos => this.todos = todos)
.catch(error => this.error = error)
}
}
|
class TodoListComponent {
currentUser$ = this.userStore.currentUser$
todos$ = Observable<Todo[]>
allDone$ = Observable<boolean>
error$ = new BehaviorSubject<Error>(undefined)
userName$: Observable<string>
saving$ = new BehaviorSubject<boolean>(false)
addTodo$ = new Subject<Todo>()
addedTodo$ = new Subject<Todo>()
removeTodo$ = new Subject<Todo>() // ADDED
removedTodo$ = new Subject<Todo>() // ADDED
constructor(
private userStore: UserStore,
private log: LogService
) {
this.userName$ = this.currentUser$.pipe(pluck('name'))
this.todos$ = combineLatest([
this.currentUser$,
this.addedTodo$.pipe(startWith(undefined)),
this.removedTodo$.pipe(startWith(undefined)), // ADDED
]).pipe(
switchMap(([currentUser, _, __]) => { // CHANGED
try {
currentUser
? fromFetch('GET', todosPath(currentUser.id))
// STILL WRONG
// This pipe should start with `undefined`
// only when currentUser$ emits a new value,
// not when addedTodo$ emits.
.pipe(
startWith(undefined),
catchError(error => {
this.error$.next(error)
return EMPTY
})
)
: of([])
} catch(error) {
log.error(error)
return EMPTY
}
),
share(),
)
this.allDone$ = this.todos$.pipe(
map(todos => todos && !todos.some(todo => !todo.done)),
)
this.addedTodo$ = this.addTodo$.pipe(
tap(() => this.saving$.next(true)),
switchMap(todo => {
try {
return fromFetch('POST', todosPath(), todo).pipe(
catchError(error => {
this.error$.next(error)
return EMPTY
})
)
} catch(error) {
log.error(error)
return EMPTY
}
}),
tapEverything(() => this.saving$.next(false)), // refactored
share(),
)
// ADDED
this.removedTodo$ = this.removeTodo$.pipe(
tap(() => this.saving$.next(true)),
switchMap(todo => {
try {
return fromFetch('DELETE', todoPath(todo.id)).pipe(
catchError(error => {
this.error$.next(error)
return EMPTY
})
)
} catch(error) {
log.error(error)
return EMPTY
}
),
tapEverything(() => this.saving$.next(false)),
share(),
)
}
onAddTodo(todo) {
this.addTodo$.next(todo)
// If the view is no longer subscribed to anything that
// would cause the pipe to execute the POST request, we
// still want to save the TODO task. Subscribe here to
// make that happen.
this.addedTodo$.pipe(take(1)).subscribe()
}
// ADDED
onRemoveTodo(todo) {
this.removeTodo$.next(todo)
// If the view is no longer subscribed to anything that
// would cause the pipe to execute the DELETE request, we
// still want to delete the TODO task. Subscribe here to
// make that happen.
this.removedTodo$.pipe(take(1)).subscribe()
}
}
|
Lastly, requirements change - agility is good. Everything before now was purely additive. Let’s see what happens when a requirement comes in which necessitates a change in an existing feature.
New requirement: After removing a TODO task, just delete it from our local array of TODOs instead of refreshing the whole list from the server.
Imperative with RxJS
|
“Declarative” RxJS
|
...
onRemoveTodo(todo) {
this.saving = true
fetch('DELETE', todoPath(todo.id))
.then(() =>
this.todos = this.todos.remove(todo) // CHANGED
)
.catch(error => this.error = error)
.finally(() => this.saving = false)
}
...
|
// Hell no.
// Seriously, scroll up and try to code this in
// your head. The results below can wait.
// Compare your solution to the one, concise, clear
// line to the left. Spoiler: it's far from concise
// or clear. Also, it will have been harder to
// implement and it will have broken unrelated
// functionality along the way.
// This is already a showstopper. I cannot recommend
// forgoing the ability to quickly adapt to changing
// business needs.
|
So how do these stack up against each other?
Imperative with RxJS
|
“Declarative” RxJS
|
Pros
- Human friendly.
- Devtools and debugger friendly.
- Testing is a breeze.
- Developer’s intention is clear.
- No “streams-without-subscribers” timing issues: when the observables don’t have subscribers at expected times: e.g. ngIf hides the element with the “async pipe”.
- No “leaked-hidden-subscriptions” timing issues: e.g. via
shareReplay , misplaced takeUntil , et. al.
- Easy to read.
- Easy to test.
- Easy to organize / refactor
- Code ended cleaner than when it started
- Can use async/await if you want
- Adding functionality was achieved by adding, not changing or removing.
- Adding functionality was achieved again by adding, not changing or removing. Removing this feature will be just as clean as adding it was.
- Reactivity is preserved and utilizes RxJS
- Junior programmers can read and write this code without sacrificing quality.
|
Pros
- Relationships between observables are (hopefully) perfectly solved for the derived stream - “Declarative style”.
- (Hopefully) perfectly represents the relationships between observables.
- As long as the code has zero side-effects (e.g. no calls to RxJS
tap and all other callbacks don’t modify external state), there can be no timing issues from other areas of the code modifying shared state. (Except for those arising from imperfectly defined streams.)
- In some scenarios it’s more precise by doing less work because streams can be interrupted part-way-through on unsubscribe (which could lead to unexpected behavior so that’s not always a “pro”).
|
Cons
- Could have “shared-state” timing issues: when multiple functions could set
currentUser while another function is expecting currentUser to stay the same before and after an asynchronous call.
switchFetch (or some other cancellable fetch ) is needed for feature parity with RxJS switchMap .
|
Cons
- Harder to read
- Harder to test
- Relationships between observables MUST be perfectly solved for the derived stream - which is complicated when considering the presence or lack of subscribers and how that affects things like
withLatestFrom .
- Less human friendly.
- Devtools and debugger unfriendly.
- Mostly useless stack traces. If support is a priority and time-to-resolution is important, you want useful stack traces.
- Developer’s intention isn’t clear.
- Everything is an
Observable<something> or function argument. If it’s not in a pipe, it’s probably wrong.
- The pipes are “like callback hell” [- anonymous programmer].
- The words
pipe switchMap of next share and map are odd. They’re not part of the domain language for application programming or the domain language for the customers/users, so they aren’t of value to the business.
- In the examples on this page, we could not conditionally use
startWith(undefined) when fetching TODOs after currentUser emits but not when addedTodo emits (without adding even more complication). So the implementations are no longer functionally equivalent and tech debt was already present in a simple “todo” app.
- The composition and pipes defining
this.todos$ had to be changed for every feature.
- The
.pipe(take(1)).subscribe() calls do not communicate the developer’s intention.
- Its “bulk” is excessive, which becomes much more apparent as features are added. (Not counting the 8 lines of comments added with the new features.)
- The churn on those lines of code is already adding up. Merge conflicts are more likely.
- Two error paths (native JS exceptions and RxJS errors) require extra code to ensure reliability.
- When infinite streams break, it degrades poorly requiring a whole-page refresh after error.
- The infinite streams prevent typical error propagation and must be handled atypically (e.g. the
LogService above).
- Junior programmers cannot read and write this code without sacrificing quality.
|
Summary (tl;dr)
It’s very apparent that the purely-declarative RxJS approach within an Angular component class is not conducive to productivity or even correctness despite the allure and promise of both. I’d trust a junior developer more to successfully fix the imperative code shown than the declarative code. In the times where declarative programming helps and is human-friendly, use it - some services, stores, and caches come to mind. But for most javascript… don’t.