Realtime Client Database: gRPC Bi-Directional Streams and ZIO Hub
Realtime push-based databases such as Google Firebase conveniently ensure clients are synchronized with the server. Data updates stream to clients immediately as they happen; and if a client disconnects, updates are immediately processed after reconnecting.
gRPC server streaming and ZIO Hub can implement this functionality replicating an expensive paid Firebase service while allowing greater extensibility.
Evolving User Demands for Data Streaming
Typical web-based client-server communication is to have clients initiate all requests to the server. This minimizes
load on the server as very minimal processing needs to be performed between requests. As server technology and hardware
performance have increased, more robust paradigms have evolved to cater to rising user expectations. To reduce
notification latencies, it was imperative to allow server request initiation. Immediately reaction by the server to
external changes can be set to clients without waiting for manual user initiations or for a polling delay to elapse.
Bi-Directional gRPC Streaming using HTTP/2
Two technology standards for bi-directional web communications have become standards: WebSockets and HTTP/2 streams.
Today’s Complementary Web Standards
WebSockets
WebSockets were created first of the two as an extension to the HTTP/1.1 standard. By allowing clients to issue an Upgrade request on an established HTTP/1.1 connection, capable servers can switch the connection to use the websocket network protocol, supporting bi-directional streaming. Since this is an upgrade mechanism, backwards compatibility and fallback support for older clients has less complication to implement. The WebSocket protocol has a common JavaScript API across all browsers, making it today’s preferred bidirectional protocol within browser use-cases. Despite websocket being an unversioned specification it has seen improvements over time by way of it being built on top of an HTTP protocol. HTTP/2 significantly improved connection performance through connection multiplexing, allowing websockets to benefit when clients use multiple websocket connections simultaneously as they are now being multiplexed over the single upgraded HTTP/2 connection.
HTTP/2 Streaming
For non-browser communications such as by mobile apps or server-server communications, WebSockets is an unnecessary layer. WebSockets is an upgraded HTTP connection, implemented by creating an additional layer in the networking protocol. The changes in HTTP/2 directly addressed bidirectional communication streams, so when the WebSocket API isn’t beneficial, it is optimal to use the HTTP protocol capabilities directly. HTTP/2 streaming is the preferred bidirectional mechanism for all use-cases without a web browser client or JavaScript dependency.
gRPC: A Remote Procedure Call (RPC) framework
gRPC is a high-performance networking framework built on top of HTTP/2, supporting multiple programming languages for both client and server implementation. It encodes network traffic using Protocol Buffers using proto files and syntax, which also are used to define the server API exposed to clients.
A gRPC server which will be used for the realtime database will be called SyncService, and expose a bidirectional stream on the Bidirectional endpoint, receiving Request objects and emitting Response objects to their respective streams.
A Update endpoint has also been defined, this is the typical client-initiated handler, which will receive a
UpdateRequest
object from the client and return a UpdateResponse
object to the client.
service SyncService {
rpc Bidirectional (stream Request) returns (stream Response);
rpc Update(UpdateRequest) returns (UpdateResponse);
}
This maps to the Scala interface:
/**
* Requests will subscribe/unsubscribe to `Data`.
* Data updates of subscribed elements is streamed in realtime.
*/
def bidirectionalStream(request: Stream[StatusException, SyncRequest]): Stream[StatusException, SyncResponse]
/**
* Creation / Update of `Data`. Response will indicate success or failure due to write conflict.
* Conflicts are detected based on the ETag in the request.
*/
def update(request: UpdateRequest): IO[StatusException, UpdateResponse]
Subscription Request and Response Objects
A basic subscription mechanism will allow clients to subscribe and unsubscribe to object updates based on the object id. The protobuf definition for requests is:
message SyncRequest {
repeated Subscribe subscribes = 1;
repeated Unsubscribe unsubscribes = 2;
}
The definition for responses is the Data
object itself:
message SyncResponse {
Data data = 1;
}
Within the server implementation, we will define a Subscription Manager which will remember subscriptions for
a specific client. ZIO has a primitive called ZIO Hub which allows
objects such as these Subscription Manager to subscribe to a singular, central message queue. The queue will receive
notifications to all Data
updates, and each client Subscription Manager will subscribe to the Hub and filter for
events of the ids its client has requested. Because this Hub will queue the stream of database changes, we will name
it journal
since database journals have similar behaviour.
A helpful feature is to return the current Data
object back in the client response when initiating a subscription.
This operation has been indicated by a yellow dashed line in the function diagram.
Data
The Data
class will represent an arbitrary data record class, code will rely on the presence of an id
field, here
represented as an uint32
. While this type doesn’t exist in Java, it adds clarity to the API, but as the Protocol
Buffers Documentation API Best Practices indicates, limits to even the
int64 addressable range may make
a string id preferable. The field1
field
represents an arbitrary field, it could be extrapolated to have Data
contain additional fields (field2
,field3
etc.).
message Data {
uint32 id = 1;
string field1 = 2;
}
Adding Data Updates
ETag and Timestamp
ETags are part of the HTTP Specification and exist to reduce network transfer. The HTTP If-None-Match
header, when
implemented signals that should the response have the same generated ETag that the server should respond with a HTTP
304 Not Modified instead of a 200 Success with a populated body.
The usefulness of an ETag depends on server support: APIs may implement ETag support similar to HTTP Specification and use it to omit a response body, others may use it internally to return a previous response from its cache, while others solely include it as a convenience for clients.
Our API will use an ETag to have our server only return a full Data object on subscription if the client either doesn’t have a previous copy (ie: no ETag available) or has a stale version (ie: conflicting ETag).
To support this functionality, as well as many others which may depend on fetch/cache durations, we’ll associate an
ETag and last updated time to all Data
elements by wrapping them in a new DataRecord
class:
case class DataRecord(data: Data, lastUpdate: Instant, etag: ETag)
/**
* Update `databaseRecordsRef` with data in `request`, rejecting any conflicts.
* Conflicts are based on the included ETag:
* If the `previousEtag` doesn't match the ETag in the database it is a conflict.
* New item creation ignores the `previousEtag` field.
* All database item creation / updates are emitting to `journal`.
*/
def updateDatabaseRecords(
request: UpdateRequest,
journal: Hub[Data],
databaseRecordsRef: Ref[Map[Int, Data]]
): UIO[UpdateResponse]
We will need to call an effect to connect the Subscription Manager to the journal Hub:
/**
* Create Stream from database `journal`
*/
def userSubscriptionStream(
userSubscriptionsRef: Ref[HashSet[Int]],
journal: Hub[Data]
): ZIO[Scope, Nothing, Stream[StatusException, SyncResponse]]
We will need to connect the
/**
* Update `userSubscriptionsRef` with subscription changes.
*/
def modifyUserSubscriptions(
syncRequest: SyncRequest,
userSubscriptionsRef: Ref[HashSet[Int]],
databaseRecords: Map[Int, Data]
): UIO[Seq[SyncResponse]]
//TODO:
The context: AuthenticatedUser
parameter is for a feature exposed by ZIO to handle gRPC metadata. Every gRPC request
is able to provide headers, similar to how HTTP allows request headers. ZIO generates 2 interface
Sources
Realtime Push Database
Realtime Database using ZIO Hub and Bi-Directional gRPC Streams.Realtime Client Database: External Datasource using ZIO ZLayer
Expanding on the realtime Firebase implementation in the previous article, this expands the functionality allowing the server to fetch data on-demand from an external datasource. Additionally, functionality to periodical refresh active data which is subscribed to by connected clients transforms this database into an efficient cache to evolving external data which can only be obtained by polling.