Skip to main content
Version: 0.6.x

ZIO gRPC and Deadlines

When you use a gRPC it is a very important to set deadlines. In gRPC, deadlines are absolute timestamps that tell our system when the response of an RPC call is no longer needed. The deadline is sent to the server, and the computation is automatically interrupted when the deadline is exceeded. The client call automatically ends with a Status.DEADLINE_EXCEEDED error.

When you don't specify a deadline, client requests never timeout. All in-flight requests take resources on the server, and possibly upstream servers, which can ultimately hurt latency or crash the entire process.

In ZIO gRPC you can easily set deadlines (absolute timestamps), or timeouts which are relative to the time the outbound call is made.

Setting timeout for all requests

To set the same timeout for all requests, it is possible to provide a ClientTransform when constructing the client. This transformation is invoked before each request, and can determine the deadline relative to the system clock at the time the effect is executed.

import myexample.testservice.ZioTestservice.ServiceNameClient
import myexample.testservice.{Request, Response}
import scalapb.zio_grpc.{ZManagedChannel, ClientTransform}
import io.grpc.ManagedChannelBuilder
import zio._
import zio.Console._

val channel = ZManagedChannel(
ManagedChannelBuilder
.forAddress("localhost", 8980)
.usePlaintext()
)
// channel: ZManagedChannel = OnSuccess(
// trace = "scalapb.zio_grpc.ZManagedChannel.apply(ZManagedChannel.scala:11)",
// first = Sync(
// trace = "scalapb.zio_grpc.ZManagedChannel.apply(ZManagedChannel.scala:11)",
// eval = zio.ZIO$$$Lambda$13330/0x0000000103865c40@1101adbd
// ),
// successK = zio.ZIO$$$Lambda$13255/0x0000000103827040@4b9ab517
// )

// create layer:
val clientLayer = ServiceNameClient.live(
channel,
ClientTransform.withTimeoutMillis(3000))
// clientLayer: ZLayer[Any, Throwable, ServiceNameClient] = Fold(
// self = Suspend(
// self = zio.ZLayer$ScopedEnvironmentPartiallyApplied$$$Lambda$13277/0x0000000103838840@19549a76
// ),
// failure = zio.ZLayer$$Lambda$13337/0x00000001038a2040@2287da9c,
// success = zio.ZLayer$$Lambda$13335/0x00000001038a0840@6476b120
// )

val myAppLogicNeedsEnv = for {
// use layer through accessor methods:
res <- ServiceNameClient.unary(Request())
_ <- printLine(res.toString)
} yield ()
// myAppLogicNeedsEnv: ZIO[ServiceNameClient, Exception, Unit] = OnSuccess(
// trace = "repl.MdocSession.MdocApp.myAppLogicNeedsEnv(deadlines.md:40)",
// first = OnSuccess(
// trace = "myexample.testservice.ZioTestservice.ServiceNameAccessors.unary(ZioTestservice.scala:77)",
// first = Sync(
// trace = "myexample.testservice.ZioTestservice.ServiceNameAccessors.unary(ZioTestservice.scala:77)",
// eval = zio.ZIO$ServiceWithZIOPartiallyApplied$$$Lambda$13264/0x0000000103833440@78869456
// ),
// successK = zio.ZIO$$$Lambda$13255/0x0000000103827040@4b9ab517
// ),
// successK = <function1>
// )

Setting timeout for each request

As in the previous example, assuming there is a client in the environment, we can set the timeout for each request like this:

ServiceNameClient.withTimeoutMillis(3000).unary(Request())
// res0: ZIO[ServiceNameClient, io.grpc.StatusException, Response] = OnSuccess(
// trace = "myexample.testservice.ZioTestservice.ServiceNameAccessors.unary(ZioTestservice.scala:77)",
// first = Sync(
// trace = "myexample.testservice.ZioTestservice.ServiceNameAccessors.unary(ZioTestservice.scala:77)",
// eval = zio.ZIO$ServiceWithZIOPartiallyApplied$$$Lambda$13264/0x0000000103833440@45bc261a
// ),
// successK = zio.ZIO$$$Lambda$13255/0x0000000103827040@4b9ab517
// )

Clients provide (through the GeneratedClient trait) a number of methods that makes it possible to specify a deadline or a timeout for each request:

// Provide a new absolute deadline
def withDeadline(deadline: Deadline): Service

// Sets a new timeout for this service
def withTimeout(duration: zio.duration.Duration): Service

// Sets a new timeout in millis
def withTimeoutMillis(millis: Long): Service

// Replace the call options with the provided call options
def withCallOptions(callOptions: CallOptions): Service

// update the CallOptions for this service
def mapCallOptions(f: CallOptions => CallOptions): Service

// update the request Metadata for this service
def mapMetadataZIO(f: SafeMetadata => UIO[SafeMetadata]): Service

If you are using a client instance, the above methods are available to provide you with a new client that has a modified CallOptions effect. Making the copy of those clients is cheap and can be safely done for each individual call:

val clientScoped = ServiceNameClient.scoped(channel)
// clientScoped: ZIO[Scope, Throwable, ServiceNameClient] = OnSuccess(
// trace = "myexample.testservice.ZioTestservice.ServiceNameClient.scoped(ZioTestservice.scala:109)",
// first = OnSuccess(
// trace = "myexample.testservice.ZioTestservice.ServiceNameClientWithResponseMetadata.scoped(ZioTestservice.scala:191)",
// first = OnSuccess(
// trace = "scalapb.zio_grpc.ZManagedChannel.apply(ZManagedChannel.scala:11)",
// first = Sync(
// trace = "scalapb.zio_grpc.ZManagedChannel.apply(ZManagedChannel.scala:11)",
// eval = zio.ZIO$$$Lambda$13330/0x0000000103865c40@1101adbd
// ),
// successK = zio.ZIO$$$Lambda$13255/0x0000000103827040@4b9ab517
// ),
// successK = zio.ZIO$$Lambda$13273/0x0000000103837040@59f3470b
// ),
// successK = zio.ZIO$$Lambda$13273/0x0000000103837040@8dd02a4
// )

val myAppLogic = ZIO.scoped {
clientScoped.flatMap { client =>
for {
res <- client
.withTimeoutMillis(3000).unary(Request())
} yield res
}
}
// myAppLogic: ZIO[Any, Throwable, Response] = OnSuccess(
// trace = "repl.MdocSession.MdocApp.myAppLogic(deadlines.md:57)",
// first = OnSuccess(
// trace = "repl.MdocSession.MdocApp.myAppLogic(deadlines.md:57)",
// first = Sync(
// trace = "repl.MdocSession.MdocApp.myAppLogic(deadlines.md:57)",
// eval = zio.Scope$ReleaseMap$$$Lambda$13350/0x00000001038a9440@443513cd
// ),
// successK = zio.ZIO$$Lambda$13273/0x0000000103837040@601d61a7
// ),
// successK = zio.ZIO$ScopedPartiallyApplied$$$Lambda$13352/0x00000001038aa840@2315a9b0
// )