Basics Tutorial
This tutorial provides a basic introduction to Scala programmers to working with ZIO gRPC.
By walking through this example you'll learn how to:
- Define a service in a .proto file.
- Generate server and client code using ZIO gRPC code generator.
- Use ZIO gRPC API to write a simple client and server for your service.
It assumes that you have read the Introduction to gRPC and are familiar with protocol buffers. Note that the example in this tutorial uses the proto3 version of the protocol buffers language: you can find out more in the proto3 language guide and ScalaPB generated code guide.
Why use gRPC?
Our example is a simple route mapping application that lets clients get information about features on their route, create a summary of their route, and exchange route information such as traffic updates with the server and other clients.
With gRPC we can define our service once in a .proto
file and generate clients and servers in any of gRPC’s supported languages, which in turn can be run in environments ranging from servers inside a large data center to your own tablet — all the complexity of communication between different languages and environments is handled for you by gRPC. We also get all the advantages of working with protocol buffers, including efficient serialization, a simple IDL, and easy interface updating.
Example code and setup
The example code for our tutorial is in
scalapb/zio-grpc/examples/routeguide/src/main/scala/zio_grpc/examples/routeguide.
To download the example, clone the latest release in zio-grpc
repository by
running the following command:
$ git clone -b v0.5.0 https://github.com/scalapb/zio-grpc.git
Then change your current directory to zio-grpc/examples
:
$ cd zio-grpc/examples/routeguide
Defining the service
Our first step (as you'll know from the Introduction to gRPC) is to define the gRPC service and the method request and response types using protocol buffers. You can see the complete .proto file in scalapb/zio-grpc/examples/src/main/protobuf/route_guide.proto.
ZIO gRPC generates code into the same Scala package that ScalaPB uses. Since java_package
is
specified, the Scala package will be the java_package
with the proto file name appended to it. In
this case, the package name would be io.grpc.examples.routeguide.route_guide
.
option java_package = "io.grpc.examples.routeguide";
You can read more on how ScalaPB determines the Scala package name and how can this be customized in ScalaPB's documentation.
To define a service, we specify a named service
in the .proto file:
service RouteGuide {
...
}
Then we define rpc
methods inside our service definition, specifying their
request and response types. gRPC lets you define four kinds of service methods,
all of which are used in the RouteGuide
service:
A simple RPC where the client sends a request to the server and waits for a response to come back.
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}A server-side streaming RPC where the client sends a request to the server and gets a stream to read a sequence of messages back. The client reads from the returned stream until there are no more messages. As you can see in our example, you specify a server-side streaming method by placing the
stream
keyword before the response type.// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}A client-side streaming RPC where the client sends a stream of messages to the server. Once the client has finished writing the messages, it waits for the server to read them all and return its response. You specify a client-side streaming method by placing the
stream
keyword before the request type.// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}A bidirectional streaming RPC where both sides send a sequence of messages. The two streams operate independently, so clients and servers can read and write in whatever order they like: for example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes. The order of messages in each stream is preserved. You specify this type of method by placing the
stream
keyword before both the request and the response.// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
Our .proto
file also contains protocol buffer message type definitions for all
the request and response types used in our service methods - for example, here's
the Point
message type:
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
Generating client and server code
When you compile the application in SBT (using compile
), an SBT plugin named
sbt-protoc
invokes two code generators. The first code generator is ScalaPB
which generates case classes for all messages and some gRPC-related code that
ZIO-gRPC interfaces with. The second generator is ZIO gRPC code generator, which
generates a ZIO interface to your service.
The following classes are generated from our service definition in target/scala_2.13/src_managed
:
Feature.scala
,Point.scala
,Rectangle.scala
, and others which contain all the protocol buffer code to populate, serialize, and retrieve our request and response message types.ZioRouteGuide.scala
which contains (along with some other useful code):- a base trait for
RouteGuide
servers to implement,ZioRouteGuide.ZRouteGuide
, with all the methods definitions in theRouteGuide
service. ZioRouteGuide.RouteGuideClient
, contains ZIO accessor methods that clients can use to talk to aRouteGuide
server.
- a base trait for
Creating the server
First let's look at how we create a RouteGuide
server. If you're only
interested in creating gRPC clients, you can skip this section and go straight
to Creating the client (though you might find it interesting
anyway!).
There are two parts to making our RouteGuide
service do its job:
- Implementing the trait
ZRouteGuide
generated from our service definition: returning the ZIO effects that do the actual "work" of our service. - Putting an instance of ZRouteGuide behind a gRPC server to listen for requests from clients and return the service responses.
You can find our example RouteGuide
server in
scalapb/zio-grpc/examples/src/main/scala/zio_grpc/examples/routeguide/RouteGuideServer.scala.
Let's take a closer look at how it works.
Implementing ZRouteGuide
As you can see, our server has a RouteGuideService
class that extends the
generated ZioRouteGuide.ZRouteGuide
base trait:
class RouteGuideService(
features: Seq[Feature],
routeNotesRef: Ref[Map[Point, List[RouteNote]]]
) extends ZioRouteGuide.ZRouteGuide[ZEnv, Any] {
The trait ZRouteGuide[R, Context]
takes two type parameters:
R
represents the environment. These can be dependencies that the server needs in order to do its job. In our exampleR
isZEnv
which is ZIO's default environment which contains basic services such asClock
andConsole
.Context
represents data that is unique to each request, for example, Metadata headers, or the identity of the user making the request. We will learn aboutContext
in a future example.
Simple RPC
RouteGuideService
implements all our service methods. Let's
look at the simplest method first, GetFeature()
, which just gets a Point
from
the client and returns the corresponding feature information from its database
in a Feature
.
def getFeature(request: Point): ZIO[ZEnv, Status, Feature] =
ZIO.fromOption(findFeature(request)).mapError(_ => Status.NOT_FOUND)
def findFeature(point: Point): Option[Feature] =
features.find(f => f.getLocation == point && f.name.nonEmpty)
The getFeature()
method takes the request (of type Point
), and returns a ZIO
effect that represents the work of computing the response. The value that is returned represents
a suspended effect: nothing actually happens until ZIO runtime
ultimately runs the effect. The type of the effect is ZIO[ZEnv, Status, Feature]
which means
it is a computation:
- can fail with value of type
Status
(this type comes from grpc-java and represents a gRPC status code). - can succeed with value of type
Feature
. - requires an environment of type
ZEnv
to run.
In this case, our effect is built on top of a pure function findFeature
that returns Some(feature)
if there is a feature in the database that corresponds to the given point, or None
otherwise.
We use ZIO.fromOption
to turn the Option[Feature]
into an effect of type IO[Option[Nothing], Feature]
which means that it can either succeed with a value of type Feature
or fail with a value of type Option[Nothing]
(the only possible value of this type is None
since there are no instances of type Nothing
). We
then use mapError
to map the case of an error to gRPC's NOT_FOUND
status.
Server-side streaming RPC
Next let's look at one of our streaming RPCs. ListFeatures
is a server-side
streaming RPC, so we need to send back multiple Feature
s to our client.
def listFeatures(request: Rectangle): ZStream[ZEnv, Status, Feature] = {
val left = request.getLo.longitude min request.getHi.longitude
val right = request.getLo.longitude max request.getHi.longitude
val top = request.getLo.latitude max request.getHi.latitude
val bottom = request.getLo.latitude min request.getHi.latitude
ZStream.fromIterable(
features.filter { feature =>
val lat = feature.getLocation.latitude
val lon = feature.getLocation.longitude
lon >= left && lon <= right && lat >= bottom && lat <= top
}
)
}
Like the simple RPC, this method gets a request object (the Rectangle
in which
our client wants to find Feature
s) and returns a ZStream[ZEnv, Status, Feature]
, which represents an effectful stream that can produce, provided an environment of type
ZEnv
zero or more elements of type Feature
and fail with a value of type of Status
.
This time, the stream does not need the environment and can not ever fail (since our database is a constant in the same process!)
We build the stream from a Scala collection we build by filtering through the features sequence. ZIO gRPC takes over streaming the response to the client when the stream gets executed.
Client-side streaming RPC
Now let's look at something a little more complicated: the client-side streaming
method RecordRoute()
, where we get a stream of Point
s from the client and
return a single RouteSummary
with information about their trip once the stream finishes.
def recordRoute(
request: zio.stream.Stream[Status, Point]
): ZIO[Clock, Status, RouteSummary] = {
// Zips each element with the previous element, initially accompanied by None.
request.zipWithPrevious
.fold(RouteSummary()) {
case (summary, (maybePrevPoint, currentPoint)) =>
// Compute the next status based on the current status.
summary.copy(
pointCount = summary.pointCount + 1,
featureCount =
summary.featureCount + (if (findFeature(currentPoint).isDefined) 1
else 0),
distance = summary.distance + maybePrevPoint
.map(calcDistance(_, currentPoint))
.getOrElse(0)
)
}
.timed // returns a new effect that times the execution
.map {
case (duration, summary) =>
summary.copy(elapsedTime = (duration.toMillis / 1000).toInt)
}
}
Here, our method gets a stream that is produced by the client. As you can see
from the signature of this method, our goal would be to turn this stream into an
effect that results in a RouteSummary
.
RouteSummary
contains the number of points, number of features on the trip, total distance passed, and the time it took. As this summary can be built iteratively we use fold, which takes the summary and new input to compute the next summary. Since we are
adding up the distance between successive pair of points, we will use zipWithPrevious
that gives us a pair (Option[Point], Point)
where the left element represents the previous element in the stream (which is initially None).
The fold
method gives us a IO[Status, RouteSummary]
. Using the timed
method we are getting a new ZIO effect that upon success gives us the a tuple (zio.duration.Duration, RouteSummary)
where the duration represents the time it took to process
the effect thus far. We then use map
to turn it back to a RouteSummary
that contains the elapsed time in seconds.
Bidirectional streaming RPC
Finally, let's look at our bidirectional streaming RPC RouteChat()
.
def routeChat(
request: zio.stream.Stream[Status, RouteNote]
): ZStream[ZEnv, Status, RouteNote] =
request.flatMap { note =>
// By using flatMap, we can map each RouteNote we receive to a stream with
// the existing RouteNotes for that location, and those sub-streams are going
// to get concatenated.
// We start from an effect that updates the map with the new RouteNote,
// and returns the notes associated with the location just before the update.
val updateMapEffect: UIO[List[RouteNote]] =
routeNotesRef.modify { routeNotes =>
val messages = routeNotes.getOrElse(note.getLocation, Nil)
(messages, routeNotes.updated(note.getLocation, note :: messages))
}
// We create a stream from the effect.
ZStream.fromIterableM(updateMapEffect)
}
As with our client-side streaming example, we are getting a Stream
of
RouteNote
s, except this time we are also returning a stream of
RouteNote
s. Although each side will always get the other's messages in the
order they were written, both the client and server can read and write in any
order — the streams operate completely independently.
In this example, we are using flatMap
on the incoming stream to map each input to a new
effectful stream representing the notes that are available in that location. We are using Ref#modify
to mutate the collection of notes in the given location and return the list of notes available just prior to the update.
Starting the server
Once we've implemented all our methods, we also need to start up a gRPC server
so that clients can actually use our service. The following snippet shows how we
do this for our RouteGuide
service:
object RouteGuideServer extends ServerMain {
override def port: Int = 8980
val featuresDatabase = JsonFormat.fromJsonString[FeatureDatabase](
Source.fromResource("route_guide_db.json").mkString
)
val createRouteGuide = for {
routeNotes <- Ref.make(Map.empty[Point, List[RouteNote]])
} yield new RouteGuideService(featuresDatabase.feature, routeNotes)
def services: ServiceList[zio.ZEnv] =
ServiceList.addM(createRouteGuide)
}
ZIO gRPC provides a base trait to quickly set up gRPC services with zero boilerplate.
- We override the port we are going to use (default is 9000)
- Create an effect that constructs an instance of our service (we need an effectful construction since
our service constructor takes a
zio.Ref
) - Override
def services
to return aServiceList
that contains our service.
ServerMain
is meant to be used for simple applications. If you need to do more in your initialization, you can take a look at the source code of ServerMain
and customize.
Creating the client
In this section, we'll look at creating a client for our RouteGuide
service. You can see our complete example client code in
RouteGuideClientApp.scala.
Instantiating a client
To call service methods, we first need to create a client. There are two patterns to work with clients:
- Use
RouteGuideClient.managed
to instantiate a client inside azio.ZManaged
. Then through calling itsuse
method, the client can be accessed and method can be called on it. - Use
RouteGuideClient.live
to create aZLayer
that can be used to provide a client as a singleton to our program through the environment. In that case, throughout the program we use accessor methods, defined statically inRouteGuideClient
that expect the client to be available in the environment.
Throughout this tutorial, we will follow the second pattern. We create a Layer
that can provide a RouteGuideClient
like this:
val clientLayer: Layer[Throwable, RouteGuideClient] =
RouteGuideClient.live(
ZManagedChannel(
ManagedChannelBuilder.forAddress("localhost", 8980).usePlaintext()
)
)
Calling service methods
Now let's look at how we call our service methods.
As described above, RouteGuideClient
contains accessor methods for each RPC
that return an effect or a stream that needs a client in the environment to be ran:
def getFeature(req: Point):
ZIO[RouteGuideClient, Status, Feature]
def listFeatures(req: Rectangle):
ZStream[RouteGuideClient, Status, Feature]
def recordRoute[R0](req: ZStream[R0, Status, Point]):
ZIO[RouteGuideClient with R0, Status, RouteSummary]
def routeChat[R0](req: ZStream[R0, Status, RouteNote]):
ZStream[RouteGuideClient with R0, Status, RouteNote]
Simple RPC
Calling the simple RPC GetFeature
on the static accessor stub is as
straightforward as instantiating a local effect:
def getFeature(
lat: Int,
lng: Int
): ZIO[RouteGuideClient with Console, Status, Unit] =
(for {
f <- RouteGuideClient.getFeature(Point(lat, lng))
_ <- putStrLn(s"""Found feature called "${f.name}".""")
} yield ()).catchSome {
case status if status == Status.NOT_FOUND =>
putStrLn(s"Feature not found: ${status.toString()}")
}
We create and populate a request protocol buffer object (in our case
Point
), pass it to the getFeature()
method on our accessor, and get
back an effect that needs a RouteGuideClient
environment. We chain
the response with a call to putStrLn
to print the result on the console,
and we catch the NOT_FOUND
response and print an error. All other errors
are not handled at this level and will "bubble up" up to the program's exitCode
handler.
Server-side streaming RPC
Next, let's look at a server-side streaming call to ListFeatures
, which
returns a stream of geographical Feature
s:
_ <-
RouteGuideClient
.listFeatures(
Rectangle(
lo = Some(Point(400000000, -750000000)),
hi = Some(Point(420000000, -730000000))
)
)
.zipWithIndex
.foreach {
case (feature, index) =>
putStrLn(s"Result #${index + 1}: $feature")
}
Now listFeatures
returns a ZStream
. We use zipWithIndex
to get a stream
where each of the original elements are accompanied with a zero-based index. We turn
this stream into a single effect that processes the entire stream by calling foreach
and providing it with a function that maps each element into an effect. In this case,
the effect prints the feature.
Client-side streaming RPC
Now for something a little more complicated: the client-side streaming method
RecordRoute
, where we send a stream of Point
s to the server and get back
a single RouteSummary
.
def recordRoute(numPoints: Int) =
for {
summary <- RouteGuideClient.recordRoute(
ZStream
.repeatEffect(
nextIntBetween(0, features.size).map(features(_).getLocation)
)
.tap(p => putStrLn(s"Visiting (${p.latitude}, ${p.longitude})"))
.schedule(Schedule.spaced(300.millis))
.take(numPoints)
)
_ <- putStrLn(
s"Finished trip with ${summary.pointCount} points. " +
s"Passed ${summary.featureCount} features. " +
s"Travelled ${summary.distance} meters. " +
s"It took ${summary.elapsedTime} seconds."
)
} yield ()
Here, we pass into recordRoute
an effectful stream that randomly picks an element from the features
collection (a constant), and insert random delay between elements.
Like all the other accessor methods it's worth noting that no side effect happens upon calling recordRoute
. The method returns immediately giving us an effect that represents sending this stream to the server. When the effect ultimately run it can succeed with a value of type RouteSummary
once the entire stream has been sent to the server.
In this example, we chain to this effect an effect to print the summary to the console.
Bidirectional streaming RPC
Finally, let's look at our bidirectional streaming RPC RouteChat()
.
val routeChat =
for {
res <-
RouteGuideClient
.routeChat(
ZStream(
RouteNote(
location = Some(Point(0, 0)),
message = "First message"
),
RouteNote(
location = Some(Point(0, 10_000_000)),
message = "Second Message"
),
RouteNote(
location = Some(Point(10_000_000, 0)),
message = "Third Message"
),
RouteNote(
location = Some(Point(10_000_000, 10_000_000)),
message = "Four Message"
)
).tap { note =>
putStrLn(
s"""Sending message "${note.message}" at ${note.getLocation.latitude}, ${note.getLocation.longitude}"""
)
}
)
.foreach { note =>
putStrLn(
s"""Got message "${note.message}" at ${note.getLocation.latitude}, ${note.getLocation.longitude}"""
)
}
} yield ()
In this method, we both get and return a Stream
of
RouteNotes. Here both streams execute independently at the same time. Although each side will always
get the other's messages in the order they were written, both the client and
server can read and write in any order — the streams operate completely
independently.
Providing the client layer into the application logic
All the effects we created were dependent on a RouteGuideClient
available in the environment. We earlier instantiated a clientLayer
, so we can provide it to our application logic at the top-level (the run
method):
val myAppLogic =
for {
// Looking for a valid feature
_ <- getFeature(409146138, -746188906)
// Looking for a missing feature
_ <- getFeature(0, 0)
// Calls listFeatures with a rectangle of interest. Prints
// each response feature as it arrives.
// start: listFeatures
_ <-
RouteGuideClient
.listFeatures(
Rectangle(
lo = Some(Point(400000000, -750000000)),
hi = Some(Point(420000000, -730000000))
)
)
.zipWithIndex
.foreach {
case (feature, index) =>
putStrLn(s"Result #${index + 1}: $feature")
}
// end: listFeatures
_ <- recordRoute(10)
_ <- routeChat
} yield ()
final def run(args: List[String]) =
myAppLogic.provideCustomLayer(clientLayer).exitCode
Try it out!
Run the server:
sbt "runMain zio_grpc.examples.routeguide.RouteGuideServer"
From another terminal, run the client:
sbt "runMain zio_grpc.examples.routeguide.RouteGuideClientApp"
This document, "ZIO gRPC: Basics Tutorial", is a derivative of "gRPC – Basics Tutorial" by gRPC Authors, used under CC-BY-4.0. "ZIO gRPC: Basics Tutorial" is licensed under CC-BY-4.0 by Nadav Samet.