RxJS Observable Services
- Published on
- Reading time
- 17 min read
Exploring Reactivity Patterns
Having learned React before Angular, it took time to appreciate RxJS's more direct and connected approach to reactivity. This power and control comes with a steep learning curve, present in the syntax and in thinking about state as an observable stream. Here, I'll run through a few basic patterns for managing state using the tools it provides.
Sections
- Prerequisites and Setup
- Stateful Components
- Sharing State with Services
- DRYing up the code (don't repeat yourself)
Prerequisites and Setup
I'll use common RxJS operators like pipe
, tap
, map
, and shareReplay
with subjects, subscriptions, and Angular's async
pipe. I will explain each as we go, but you can check out the documentation for RxJS operators here for more details. Promises, async/await, and try/catch aren't hard prerequisites for this post, but I'd strongly recommend learning these first as a point of comparison.
Since this is a CRUD app, I used JSON server to simulate a backend. After initializing the Angular project, I installed json-server and added a "server": "json-server --watch db.json --port 8000"
script to run with npm run server
. Lastly, because I'm too lazy to open two terminal tabs, I'll run the frontend and mock-backend simultaneously with concurrently. package.json
should look like this now:
...
"scripts": {
... // the other stuff
"start": "ng serve",
"server": "json-server --watch zeldaDB.json --port 8000",
"both": "concurrently 'npm run start' 'npm run serve'"
}
...
The app is a list of Zelda games that includes a form to add items, edit-in-place to update the title and year, and the ability to delete items. Unique IDs are auto-assigned by json-server when games are added. Here is the simple interface with data I've pre-populated (went a little lazy on the date-picker and lack of an image upload functionality, but the focus is on state management 🙈🤷♂️):
export interface Game {
id: string
title: string
image: string
releaseYear: number
}
Stateful Components
The first pattern I'll mention doesn't take advantage of everything RxJS and Angular offer but is a good starting point for anyone familiar with the typical Promise-based pattern. First I've set up the CRUD functions in a dummy service to interact with the server (dummy as in data-less for now):
export class ZeldaService {
constructor(public http: HttpClient) {}
createData(newGame: Partial<Game>) {
return this.http
.post<Game>('http://localhost:8000/games', newGame)
.pipe(catchError(this.handleError));
}
readData() {
return this.http.get<Game[]>('http://localhost:8000/games').pipe(
map((response: Game[]) =>
response.sort((a, b) => a.releaseYear - b.releaseYear)
),
catchError(this.handleError)
);
}
updateData(gameId: string, updatedField: Partial<Game>) {
return this.http
.patch<Game>(`http://localhost:8000/games/${gameId}`, updatedField)
.pipe(tap(console.log), catchError(this.handleError));
}
deleteData(gameId: string) {
return this.http
.delete<Game>(`http://localhost:8000/games/${gameId}`)
.pipe(tap(console.log), catchError(this.handleError));
}
}
Using Angular's built-in HttpClient
and the relevant methods, I tap
to log the results, map
to typecast and sort the stream from the readData
observable, and catchError
for basic error handling. For now, I'll call these service methods from a single component. The GET request fires to initiate the data on load, and I've defined several interaction handlers. Here's what the relevant bits of the app look like so far:
export class AppComponent {
// ...constructor and declarations
ngOnInit() {
// Read Data (initialize local state)
this.zeldaService.readData().subscribe({
next: (res) => (this.zeldaGames = res),
});
}
handleSubmit(newGame: Partial<Game>) {
this.zeldaService.createData(newGame).subscribe({
next: (addedGame: Game) => {
let newArr = [...this.zeldaGames, addedGame as Game].sort(
(a, b) => a.releaseYear - b.releaseYear
);
// Update local state
this.zeldaGames = newArr as Game[];
this.gameForm.reset();
},
error: (err) => console.error(err),
});
}
handleEditInPlace(game: Game, field: keyof Game, event: FocusEvent) {
this.editing = ''; // Reset edit in place
let editedField = (event.target as HTMLInputElement).value;
if (game[field] !== editedField) {
this.zeldaService
.updateData(game.id, { [field]: editedField })
.subscribe({
next: (updatedGame: Game) => {
// Sync data with database
let newGamesList = [
...this.zeldaGames.filter(
(game: Game) => game.id !== updatedGame.id
),
{
...game,
[field]: editedField,
} as Game,
];
// Update local state
this.zeldaGames = newGamesList.sort(
(a, b) => a.releaseYear - b.releaseYear
);
},
error: (err) => console.error(err),
});
}
}
handleDelete(gameId: string) {
this.zeldaService.deleteData(gameId).subscribe({
next: (_) =>
// Update local state
(this.zeldaGames = this.zeldaGames.filter(
(game) => game.id !== gameId
)),
error: (error) => console.error(error),
});
}
// ...other helpers
}
In this implementation, data is stored, updated, and handled entirely in the component. This approach works fine here, but the reactivity, organization, and readability could be greatly improved - something I've learned the hard way around maintainability when scaling...
The first improvement in that direction is to store the readData
observable in the service. Then, by piping shareReplay
on the GET request, the observable remains hot, and the most recent data in the stream will be shared with new subscribers. Otherwise, each new subscriber would cause a re-fetch because the observable would become cold as soon as the request resolves. Here's the result:
export class ZeldaService {
public games$!: Observable<Game[]>;
constructor(public http: HttpClient) {
// Initialize observable
this.games$ = this.readData();
}
// ... other stuff
readData() {
return (
this.games$ || this.http.get<Game[]>('http://localhost:8000/games').pipe(
map((response: Game[]) =>
response.sort((a, b) => a.releaseYear - b.releaseYear)
),
shareReplay(1),
catchError(this.handleError)
)
);
}
// ... other stuff
}
To summarize, I initialize the observable within the service's constructor and add a little logic to prevent re-initialization. After doing so, I subscribe to the new games$
observable inside the app component instead of calling readData
like before. In the subscription, the data from the observable is set as component-level state, zeldaGames
, which can then be updated when data is returned from the other CRUD functions. However, these updates won't affect the original observable meaning that other subscribers elsewhere in the application won't see these changes which is the primary drawback of this doing everything at the component level.
As an aside, if adding, editing, or deleting isn't necessary, a cleaner approach is to subscribe to the data directly in the template using the async
pipe. Doing so can reduce boilerplate since the async
pipe unpacks the data and handles unsubscribing under the hood. This doesn't really matter here because shareReplay
handles unsubscribing for us by default (when its param refCount = true
), but it will be good to know shortly when I introduce the BehaviorSubject
. Using async
, our template would look like this:
// ... other unchanged stuff
<li *ngFor="let game of zeldaService.games$ | async; trackBy:gameById">
// ... other unchanged stuff
Handling state entirely within a component is reasonable for single-component usage but becomes problematic when multiple pages and components must be synced. If another component subscribes to this method, its version of the data would get out of sync as things change within its sibling(s). Re-fetching fixes this, but it would be a waste because the returns from the post, patch, and delete requests are all we need to update the state in a way that is synced with the database. Every component should ideally subscribe to the same object rather than create their own copies, forcing state to be consistent throughout the application.
Before moving on, imagine I refactored the Zelda cards into a separate component. Then, I would loop over the cards with ngFor
and pass game data into each component as a property. For this parent-child case, prop drilling with @Input()
and @Output()
is great for sharing state because the data is only passed down one level and is specific to the UI. In instances where components or pages are unrelated, prop drilling would get unmanageable. For unrelated pages or sibling components needing the same information, it becomes imperative to share the data with a store.
Sharing State with Services
RxJS has 4 entities called subjects used to share state: the Subject
, BehaviorSubject
, ReplaySubject
, and AsyncSubject
. We'll use the BehaviorSubject
, which works similarly to useState (React) in that it takes an initial value and emits the most recent value to every active or future subscriber. Using it in a service achieves a similar effect of combining useState and useContext, but this approach feels more direct and verbose. Lastly, feel free to go read up on what the other subjects do in the RxJS subject docs.
Now, the changes here are two-fold. There is a lot of code cleanup to be done, however I'll save most of that for later. First, let's get the BehaviorSubject
working:
export class ZeldaService {
public games$ = new BehaviorSubject<Game[]>([]);
constructor(public http: HttpClient) {
this.readData(); // Call to fetch and initialize BehaviorSubject
}
// ... other unchanged stuff
readData() {
return ( // Removed logic preventing re-initialization, no longer necessary
this.http.get<Game[]>('http://localhost:8000/games').pipe(
map((response: Game[]) =>
response.sort((a, b) => a.releaseYear - b.releaseYear)
), // Removed shareReplay
catchError(this.handleError)
)
.subscribe({
next: (games: Game[]) => this.games$.next(games),
});
);
}
// ... other unchanged stuff
}
I updated the service so that games$
is now a BehaviorSubject
initialized by readData
(the function no longer returns anything). Technically, I initialized it as an empty array, but that isn't what I would do in a real application. Normally, I initialize to null
and display a loading UI while null
. Then, I update it with the fetched state or an empty array if the request fails. Adding this makes for a good bonus exercise, but I left them out for simplicity.
The fetch observable in readData
will only be subscribed to once, so shareReplay
can be removed. Next, I deleted the subscription in the component and the zeldaGames
variable that the subscription initialized. Instead, I directly subscribe to games$
in the template using async
. All references to zeldaGames
were refactored to use games$
and be updated with games$.next(newValue)
.
Each function uses the existing state to calculate a new state. This represents an important principle called immutability. Technically, this method doesn't directly mutate state - a best-practice approach for avoiding several organizational and performance-related issues around the consistency of updates across an application. These issues are the exact reason we want a global store for shared state instead of making different component-level updates like in the previous section.
To demonstrate this approach, notice I added a component called card-list
that is a direct copy of the card ul
, including the async
subscription to games$
. It also copies over the new implementations of handleEditInPlace
and handleDelete
. Since they both subscribe to the same state as siblings, card updates in one list are reflected simultaneously in the other!
Previously, when storing zeldaGames
in the components, updates were not automatically sent to other subscribers. Feel free to try making a local copy within a card-list component instead of subscribing to games
with async
to see this in action.
DRYing up the code (don't repeat yourself)
Notice the functions to update and delete state are duplicated in both components. Some people like this, but it isn't as "DRY" so other folks keep this code in the service. I'm in the middle because I like keeping a reference in the component to make everything more verbose. In the next example, the meat of each function is moved to the service, and I can still see at a glance what a component uses:
// ... unchanged stuff
updateData(game: Game, fieldName: string, updatedField: string) {
return this.http
.patch<Game>(`http://localhost:8000/games/${game.id}`, {
[fieldName]: updatedField,
})
.pipe(tap(console.log), catchError(this.handleError))
.subscribe({
next: (updatedGame: Game) => {
// Sync data with database
let newGamesList = [
...this.games$
.getValue()
.filter((game: Game) => game.id !== updatedGame.id),
{
...game,
[fieldName]: updatedField,
} as Game,
];
this.games$.next(
newGamesList.sort((a, b) => a.releaseYear - b.releaseYear)
);
},
error: (err) => console.error(err),
});
}
deleteData(gameId: string) {
return this.http
.delete<Game>(`http://localhost:8000/games/${gameId}`)
.pipe(tap(console.log), catchError(this.handleError))
.subscribe({
next: (_) =>
this.games$.next(
this.games$.getValue().filter((game) => game.id !== gameId)
),
error: (error) => console.error(error),
});
}
The gist of the changes above is that the .subscribe()
s from each component's update and delete functions are moved into the service so they only appear once across the code. The functionality remains unchanged! Unfortunately, in more complex apps this can be problematic. Generally, people get pretty strict on only subscribing to things within components. In fact, people prioritize not subscribing at all and only using the async
pipe if possible to avoid potential memory issues from arising.
The other thing that's obvious is that by subscribing to things in the service, our components can't see the results of these requests. For loading UI or a "something went wrong" UI, these cues would be necessary for the component to be more fine-tuned and reactive. That's the key! In principle, folks only like to subscribe to things that NEED to react.
The question remains then: Can we update our state in the service and still get the results we need in the component? Yes, but we'll need to refactor the subscribe
back into the components and move the service's logic to update state into a different operator. The result is a trade-off toward something slightly less verbose:
// ... unchanged stuff
updateData(game: Game, fieldName: string, updatedField: string) {
return this.http
.patch<Game>(`http://localhost:8000/games/${game.id}`, {
[fieldName]: updatedField,
})
.pipe(
tap((updatedGame: Game) => {
// Sync data with database
let newGamesList = [
...this.games$
.getValue()
.filter((game: Game) => game.id !== updatedGame.id),
{
...game,
[fieldName]: updatedField,
} as Game,
];
this.games$.next(
newGamesList.sort((a, b) => a.releaseYear - b.releaseYear)
);
}),
catchError(this.handleError)
);
}
deleteData(gameId: string) {
return this.http.delete<Game>(`http://localhost:8000/games/${gameId}`).pipe(
tap((_) =>
this.games$.next(
this.games$.getValue().filter((game) => game.id !== gameId)
)
),
catchError(this.handleError)
);
}
Now, the results of the requests can be received by subscribers in our components without duplicating the logic outside of the service. Unfortunately, using tap
like this to treat state updates as a side effect instead of using subscribe
doesn't feel as clear. Another drawback is that without calling subscribe
, these requests won't execute and the data won't get updated by tap
at all. Unlike the previous example, calling the function isn't enough, and subscribing in the component will be explicitly required whether it uses the result or not. Though it's more DRY, it has significant downsides so it's up to everyone and their team to decide what is tolerable. For marginally improved clarity, using map
instead of tap
might feel less like a side effect and more obvious that data is being updated, but perhaps that's just subjective.
Finally, I didn't build an example of this but another common approach is to have an HTTP service for the requests and a store service for storing and managing global state. In that architecture, the first version of the service we had would be a perfect HTTP service. Then, we would build another service to store global state and hold any logic (that can be called by our components). Only UI-necessary code will be placed into the components for better organization. This is a good approach for large-scale apps that require a high degree of test coverage. Our previous stateful examples are more complicated to test because the methods require simulation of the BehaviorSubject and just generally more than you'd want to deal with if you only wanted to test calling an endpoint.
Wrapping up
Angular offers a lot of power in managing state right out of the box! Couple that with RxJS, and most applications can manage things just fine with simple, stateful services. Despite the tradeoffs I weighed, most of these patterns are reasonable compromises for most apps. Generally, the best pattern comes down to whatever feels best. If things grow to be overwhelming then make adjustments and use that as an organic way to learn.
As a final note, there are many different ways of managing state outside of the basics covered here such as, in order of increasing complexity, Angular's new signals, more advanced RxJS operators, or third-party state management solutions. While the focus of this post was what Angular and RxJS can achieve for simple apps, I plan on writing about more of these approaches in the future. Until then, thanks for reading!