Who

All-In on RxJS? Think Again. 

RxJS is declarative and clear, as long as there is at least one subscriber and when the source replays… and starts with something?  Wait, is our startWith before the publishReplay or after? It’s a good thing we have the map function - no, not that map. 

Here’s the thing. Computers run the code we write with “painful accuracy” [Andy Harris, approximately]. When there’s a bug, it’s because our own brains effed up. And when we dig a little deeper its because the code - which we wrote - couldn’t be reasoned about easily. Maybe it didn’t abstract enough so it overwhelmed our working memory, or maybe it used language that implied something nuanced that wouldn’t have been implied in conversation. 

But frankly our job isn’t to manage complexities, it’s to simplify or remove them. 

A Little Background

Angular used to be slow.  It still is, but it used to, too. 
- Mitch Hedberg

RxJS exists because Angular was slow and they couldn’t make it any faster without surfacing a new mental model for devs. There’s nothing wrong with that and it’s quite common. Concurrent programming introduces new primitives, assembly languages introduce new instruction types, etc. all in the name of speed. 

RxJS is there to reduce the computation required to update the view. More specifically it reduces computation when no view updates are needed by signaling more precisely which parts of the view need to be updated and when. (Well, more precisely than Angular’s usual way, but it’s not beating out the rest of the pack). 

“But it’s declarative”

Let’s compare a simple component in a TODO app which uses RxJS in a fully “declarative” style to the usual imperative style.  Ignoring the template (which is the exact same for each):

(Skip straight to the tl;dr if you don’t want to see the rationale)


Imperative with RxJS

“Declarative” RxJS

class TodoListComponent {
  @onChange('onCurrentUser')
  currentUser$ = this.userStore.currentUser$
  @onChange('onTodos')
  todos: Todo[]
  allDone: boolean
  error: Error
  userName: string

  constructor(private userStore: UserStore) {}

  onCurrentUser(user) {
    this.userName = user?.name
    if (user) {
      this.todos = undefined
      switchFetch('todos', 'GET', todosPath(user.id))
        .then(todos => this.todos = todos)
        .catch(error => this.error = error)
    } else {
      this.todos = []
    }
  }

  onTodos(todos) {
    this.allDone = todos && !todos.some(todo => !todo.done)
  }
}
class TodoListComponent {
  currentUser$ = this.userStore.currentUser$
  todos$: Observable<Todo[]>
  allDone$: Observable<boolean>
  error$ = new BehaviorSubject<Error>(undefined)
  userName$: Observable<string>

  constructor(
    private userStore: UserStore,
    private log: LogService
  ) {
    this.userName$ = this.currentUser$.pipe(pluck('name'))

    this.todos$ = this.currentUser$.pipe(
      switchMap(currentUser => {
        try {
          return currentUser
            ? fromFetch('GET', todosPath(currentUser.id))
                .pipe(
                  startWith(undefined),
                  catchError(error => {
                    this.error$.next(error)
                    return EMPTY
                  })
                )
            : of([])
        } catch(error) {
          log.error(error)
          return EMPTY
        }
      }),
      share(),
    )

    this.allDone$ = this.todos$.pipe(
      map(todos => todos && !todos.some(todo => !todo.done)),
    )
  }
}


Choose your adventure:

  1. If you’re thinking “What’s with the funky try/catch in the declarative code?”  : In the RxJS world, errors can flow through a pipe AND native exceptions can still be thrown.  If a native javascript exception happens within that switchMap without the try/catch, the stream would break and never recover. 

  2. If you’re thinking “Okay, but they’re practically the same and the one on the right is better because it’s declarative”: you might be right. So far the code looks mostly similar apart from the declarative code having two separate error paths and convoluted control flow. 

  3. Lastly, if you’re thinking “Just use a store with a BehaviorSubject for the todos and populate the store using the imperative style”: then you might be ahead of the game, maybe skip to the tl;dr but there are still some good insights in exploring the purely declarative implementations below.) 

Let’s add a feature. 

Our PM insists that our client’s customers’ users want to be able to add a TODO task, not just stare at the list all day. 

We’ll assume there’s a new component in the template for adding a TODO task which is wired up to call a new method onAddTodo with the new, unsaved task.  So all we have to do is (1) refresh the list of TODOs after (2) saving to make sure we have the latest from the server. 


Imperative with RxJS

“Declarative” RxJS

class TodoListComponent {
  @onChange('onCurrentUser')
  currentUser$ = this.userStore.currentUser$
  @onChange('onTodos')
  todos: Todo[]
  allDone: boolean
  error: Error
  userName: string
  saving: boolean = false // ADDED

  constructor(private userStore: UserStore) {}

  onCurrentUser(user) {
    this.userName = user?.name
    if (user) {
      this.todos = undefined
      this.fetchTodos() // refactored
    } else {
      this.todos = []
    }
  }

  onTodos(todos) {
    this.allDone = todos && !todos.some(todo => !todo.done)
  }

  // ADDED
  onAddTodo(todo) {
    this.saving = true
    fetch('POST', todosPath(), todo)
      .then(() => this.fetchTodos())
      .catch(error => this.error = error)
      .finally(() => this.saving = false)
  }

  // refactored
  fetchTodos() {
    switchFetch('todos', 'GET', todosPath(this.currentUser.id))
      .then(todos => this.todos = todos)
      .catch(error => this.error = error)
  }
}
class TodoListComponent {
  currentUser$ = this.userStore.currentUser$
  todos$ = Observable<Todo[]>
  allDone$ = Observable<boolean>
  error$ = new BehaviorSubject<Error>(undefined)
  saving$ = new BehaviorSubject<boolean>(false) // ADDED
  addTodo$ = new Subject<Todo>() // ADDED
  addedTodo$ = new Subject<Todo>() // ADDED

  constructor(
    private userStore: UserStore,
    private log: LogService
  ) {
    this.userName$ = this.currentUser$.pipe(pluck('name'))

    this.todos$ = combineLatest([ // CHANGED
      this.currentUser$,
      this.addedTodo$.pipe(startWith(undefined)), // ADDED
    ]).pipe(
      switchMap(([currentUser, _]) => { // CHANGED
        try {
          return currentUser
            ? fromFetch('GET', todosPath(currentUser.id))
                // WRONG
                // This pipe should start with `undefined`
                // only when currentUser$ emits a new value,
                // not when addedTodo$ emits.
                .pipe(
                  startWith(undefined),
                  catchError(error => {
                    this.error$.next(error)
                    return EMPTY
                  })
                )
            : of([])
        } catch(error) {
          log.error(error)
          return EMPTY
        }
      }),
      // refactored
      share(),
    )

    this.allDone$ = this.todos$.pipe(
      map(todos => todos && !todos.some(todo => !todo.done)),
    )

    // ADDED
    this.addedTodo$ = this.addTodo$.pipe(
      tap(() => this.saving$.next(true)),
      switchMap(todo => {
        try {
          return fromFetch('POST', todosPath(), todo).pipe(
            catchError(error => {
              this.error$.next(error)
              return EMPTY
            })
          )
        } catch(error) {
          log.error(error)
          return EMPTY
        }
      }),
      tap({
        next: () => this.saving$.next(false),
        error: () => this.saving$.next(false),
        complete: () => this.saving$.next(false)
      })
      share(),
    )
  }

  // ADDED
  onAddTodo(todo) {
    this.addTodo$.next(todo)
    // If the view is no longer subscribed to anything that
    // would cause the pipe to execute the POST request, we
    // still want to save the TODO task.  Subscribe here to
    // make that happen.
    this.addedTodo$.pipe(take(1)).subscribe()
  }
}


One More Feature? 

The next thing to do is to remove TODOs, too.  One more round of estimating, tasking out, and design-test-implement later:


Imperative with RxJS

“Declarative” RxJS

class TodoListComponent {
  @onChange('onCurrentUser')
  currentUser$ = this.userStore.currentUser$
  @onChange('onTodos')
  todos: Todo[]
  allDone: boolean
  error: Error
  userName: string
  saving: boolean = false

  constructor(private userStore: UserStore) {}

  onCurrentUser(user) {
    this.userName = user?.name
    if (user) {
      this.todos = undefined
      this.fetchTodos()
    } else {
      this.todos = []
    }
  }

  onTodos(todos) {
    this.allDone = todos && !todos.some(todo => !todo.done)
  }

  onAddTodo(todo) {
    this.saving = true
    fetch('POST', todosPath(), todo)
      .then(() => this.fetchTodos())
      .catch(error => this.error = error)
      .finally(() => this.saving = false)
  }

  // ADDED
  onRemoveTodo(todo) {
    this.saving = true
    fetch('DELETE', todoPath(todo.id))
      .then(() => this.fetchTodos())
      .catch(error => this.error = error)
      .finally(() => this.saving = false)
  }

  fetchTodos() {
    switchFetch('todos', 'GET', todosPath(this.currentUser.id))
      .then(todos => this.todos = todos)
      .catch(error => this.error = error)
  }
}
class TodoListComponent {
  currentUser$ = this.userStore.currentUser$
  todos$ = Observable<Todo[]>
  allDone$ = Observable<boolean>
  error$ = new BehaviorSubject<Error>(undefined)
  userName$: Observable<string>
  saving$ = new BehaviorSubject<boolean>(false)
  addTodo$ = new Subject<Todo>()
  addedTodo$ = new Subject<Todo>()
  removeTodo$ = new Subject<Todo>() // ADDED
  removedTodo$ = new Subject<Todo>() // ADDED

  constructor(
    private userStore: UserStore,
    private log: LogService
  ) {
    this.userName$ = this.currentUser$.pipe(pluck('name'))

    this.todos$ = combineLatest([
      this.currentUser$,
      this.addedTodo$.pipe(startWith(undefined)),
      this.removedTodo$.pipe(startWith(undefined)), // ADDED
    ]).pipe(
      switchMap(([currentUser, _, __]) => { // CHANGED
        try {
          currentUser
            ? fromFetch('GET', todosPath(currentUser.id))
                // STILL WRONG
                // This pipe should start with `undefined`
                // only when currentUser$ emits a new value,
                // not when addedTodo$ emits.
                .pipe(
                  startWith(undefined),
                  catchError(error => {
                    this.error$.next(error)
                    return EMPTY
                  })
                )
            : of([])
        } catch(error) {
          log.error(error)
          return EMPTY
        }
      ),
      share(),
    )

    this.allDone$ = this.todos$.pipe(
      map(todos => todos && !todos.some(todo => !todo.done)),
    )

    this.addedTodo$ = this.addTodo$.pipe(
      tap(() => this.saving$.next(true)),
      switchMap(todo => {
        try {
          return fromFetch('POST', todosPath(), todo).pipe(
            catchError(error => {
              this.error$.next(error)
              return EMPTY
            })
          )
        } catch(error) {
          log.error(error)
          return EMPTY
        }
      }),
      tapEverything(() => this.saving$.next(false)), // refactored
      share(),
    )

    // ADDED
    this.removedTodo$ = this.removeTodo$.pipe(
      tap(() => this.saving$.next(true)),
      switchMap(todo => {
        try {
          return fromFetch('DELETE', todoPath(todo.id)).pipe(
            catchError(error => {
              this.error$.next(error)
              return EMPTY
            })
          )
        } catch(error) {
          log.error(error)
          return EMPTY
        }
      ),
      tapEverything(() => this.saving$.next(false)),
      share(),
    )
  }

  onAddTodo(todo) {
    this.addTodo$.next(todo)
    // If the view is no longer subscribed to anything that
    // would cause the pipe to execute the POST request, we
    // still want to save the TODO task.  Subscribe here to
    // make that happen.
    this.addedTodo$.pipe(take(1)).subscribe()
  }

  // ADDED
  onRemoveTodo(todo) {
    this.removeTodo$.next(todo)
    // If the view is no longer subscribed to anything that
    // would cause the pipe to execute the DELETE request, we
    // still want to delete the TODO task.  Subscribe here to
    // make that happen.
    this.removedTodo$.pipe(take(1)).subscribe()
  }
}


Lastly, requirements change - agility is good.  Everything before now was purely additive.  Let’s see what happens when a requirement comes in which necessitates a change in an existing feature. 

New requirement: After removing a TODO task, just delete it from our local array of TODOs instead of refreshing the whole list from the server. 


Imperative with RxJS

“Declarative” RxJS

  ...

  onRemoveTodo(todo) {
    this.saving = true
    fetch('DELETE', todoPath(todo.id))
      .then(() =>
        this.todos = this.todos.remove(todo) // CHANGED
      )
      .catch(error => this.error = error)
      .finally(() => this.saving = false)
  }
  ...
// Hell no.

// Seriously, scroll up and try to code this in
// your head.  The results below can wait.

// Compare your solution to the one, concise, clear
// line to the left.  Spoiler: it's far from concise
// or clear.  Also, it will have been harder to
// implement and it will have broken unrelated
// functionality along the way.

// This is already a showstopper.  I cannot recommend
// forgoing the ability to quickly adapt to changing
// business needs.


So how do these stack up against each other? 

Imperative with RxJS

“Declarative” RxJS

Pros

  • Human friendly. 
  • Devtools and debugger friendly. 
  • Testing is a breeze. 
  • Developer’s intention is clear. 
  • No “streams-without-subscribers” timing issues: when the observables don’t have subscribers at expected times: e.g. ngIf hides the element with the “async pipe”. 
  • No “leaked-hidden-subscriptions” timing issues: e.g. via shareReplay, misplaced takeUntil, et. al. 
  • Easy to read. 
  • Easy to test. 
  • Easy to organize / refactor
  • Code ended cleaner than when it started
  • Can use async/await if you want
  • Adding functionality was achieved by adding, not changing or removing. 
  • Adding functionality was achieved again by adding, not changing or removing.  Removing this feature will be just as clean as adding it was. 
  • Reactivity is preserved and utilizes RxJS
  • Junior programmers can read and write this code without sacrificing quality. 

Pros

  • Relationships between observables are (hopefully) perfectly solved for the derived stream - “Declarative style”. 
  • (Hopefully) perfectly represents the relationships between observables. 
  • As long as the code has zero side-effects (e.g. no calls to RxJS tap and all other callbacks don’t modify external state), there can be no timing issues from other areas of the code modifying shared state.  (Except for those arising from imperfectly defined streams.) 
  • In some scenarios it’s more precise by doing less work because streams can be interrupted part-way-through on unsubscribe (which could lead to unexpected behavior so that’s not always a “pro”). 

Cons

  • Could have “shared-state” timing issues: when multiple functions could set currentUser while another function is expecting currentUser to stay the same before and after an asynchronous call. 
  • switchFetch (or some other cancellable fetch) is needed for feature parity with RxJS switchMap. 

Cons

  • Harder to read
  • Harder to test
  • Relationships between observables MUST be perfectly solved for the derived stream - which is complicated when considering the presence or lack of subscribers and how that affects things like withLatestFrom. 
  • Less human friendly. 
  • Devtools and debugger unfriendly. 
  • Mostly useless stack traces.  If support is a priority and time-to-resolution is important, you want useful stack traces. 
  • Developer’s intention isn’t clear. 
  • Everything is an Observable<something> or function argument.  If it’s not in a pipe, it’s probably wrong. 
  • The pipes are “like callback hell” [- anonymous programmer]. 
  • The words pipe switchMap of next share and map are odd.  They’re not part of the domain language for application programming or the domain language for the customers/users, so they aren’t of value to the business. 
  • In the examples on this page, we could not conditionally use startWith(undefined) when fetching TODOs after currentUser emits but not when addedTodo emits (without adding even more complication).  So the implementations are no longer functionally equivalent and tech debt was already present in a simple “todo” app. 
  • The composition and pipes defining this.todos$ had to be changed for every feature. 
  • The .pipe(take(1)).subscribe() calls do not communicate the developer’s intention. 
  • Its “bulk” is excessive, which becomes much more apparent as features are added. (Not counting the 8 lines of comments added with the new features.) 
  • The churn on those lines of code is already adding up.  Merge conflicts are more likely. 
  • Two error paths (native JS exceptions and RxJS errors) require extra code to ensure reliability. 
  • When infinite streams break, it degrades poorly requiring a whole-page refresh after error. 
  • The infinite streams prevent typical error propagation and must be handled atypically (e.g. the LogService above). 
  • Junior programmers cannot read and write this code without sacrificing quality. 



Summary (tl;dr)

It’s very apparent that the purely-declarative RxJS approach within an Angular component class is not conducive to productivity or even correctness despite the allure and promise of both.  I’d trust a junior developer more to successfully fix the imperative code shown than the declarative code.  In the times where declarative programming helps and is human-friendly, use it - some services, stores, and caches come to mind.  But for most javascript… don’t.