ZIO gRPC and Deadlines
When you use a gRPC it is a very important to set deadlines.
In gRPC, deadlines are absolute timestamps that tell our system when the response of an RPC call is
no longer needed. The deadline is sent to the server, and the computation is automatically interrupted
when the deadline is exceeded. The client call automatically ends with a Status.DEADLINE_EXCEEDED
error.
When you don't specify a deadline, client requests never timeout. All in-flight requests take resources on the server, and possibly upstream servers, which can ultimately hurt latency or crash the entire process.
In ZIO gRPC you can easily set deadlines (absolute timestamps), or timeouts which are relative to the time the outbound call is made.
Setting timeout for all requests
To set the same timeout for all requests, it is possible to provide a ClientTransform
when constructing the
client. This transformation is invoked before each request, and can determine the deadline relative to the
system clock at the time the effect is executed.
import myexample.testservice.ZioTestservice.ServiceNameClient
import myexample.testservice.{Request, Response}
import scalapb.zio_grpc.{ZManagedChannel, ClientTransform}
import io.grpc.ManagedChannelBuilder
import zio._
import zio.Console._
val channel = ZManagedChannel(
ManagedChannelBuilder
.forAddress("localhost", 8980)
.usePlaintext()
)
// channel: ZManagedChannel = OnSuccess(
// trace = "scalapb.zio_grpc.ZManagedChannel.apply(ZManagedChannel.scala:13)",
// first = Sync(
// trace = "scalapb.zio_grpc.ZManagedChannel.apply(ZManagedChannel.scala:13)",
// eval = zio.ZIO$$$Lambda$15080/0x00000001040dd840@3a8a0345
// ),
// successK = zio.ZIO$$$Lambda$15000/0x000000010409e840@7744254c
// )
// create layer:
val clientLayer = ServiceNameClient.live(
channel,
ClientTransform.withTimeoutMillis(3000))
// clientLayer: ZLayer[Any, Throwable, ServiceNameClient] = Fold(
// self = Suspend(
// self = zio.ZLayer$ScopedEnvironmentPartiallyApplied$$$Lambda$15061/0x00000001040cb040@160b2e27
// ),
// failure = zio.ZLayer$$Lambda$15087/0x000000010411c040@890b8a2,
// success = zio.ZLayer$$Lambda$15085/0x000000010411a840@65111152
// )
val myAppLogicNeedsEnv = for {
// use layer through accessor methods:
res <- ServiceNameClient.unary(Request())
_ <- printLine(res.toString)
} yield ()
// myAppLogicNeedsEnv: ZIO[ServiceNameClient, Exception, Unit] = OnSuccess(
// trace = "repl.MdocSession.MdocApp.myAppLogicNeedsEnv(deadlines.md:40)",
// first = OnSuccess(
// trace = "myexample.testservice.ZioTestservice.ServiceNameAccessors.unary(ZioTestservice.scala:77)",
// first = Sync(
// trace = "myexample.testservice.ZioTestservice.ServiceNameAccessors.unary(ZioTestservice.scala:77)",
// eval = zio.ZIO$ServiceWithZIOPartiallyApplied$$$Lambda$15009/0x00000001040ac840@3a22392f
// ),
// successK = zio.ZIO$$$Lambda$15000/0x000000010409e840@7744254c
// ),
// successK = <function1>
// )
Setting timeout for each request
As in the previous example, assuming there is a client in the environment, we can set the timeout for each request like this:
ServiceNameClient.withTimeoutMillis(3000).unary(Request())
// res0: ZIO[ServiceNameClient, io.grpc.StatusException, Response] = OnSuccess(
// trace = "myexample.testservice.ZioTestservice.ServiceNameAccessors.unary(ZioTestservice.scala:77)",
// first = Sync(
// trace = "myexample.testservice.ZioTestservice.ServiceNameAccessors.unary(ZioTestservice.scala:77)",
// eval = zio.ZIO$ServiceWithZIOPartiallyApplied$$$Lambda$15009/0x00000001040ac840@1e89d681
// ),
// successK = zio.ZIO$$$Lambda$15000/0x000000010409e840@7744254c
// )
Clients provide (through the GeneratedClient
trait) a number of methods that makes it possible to
specify a deadline or a timeout for each request:
// Provide a new absolute deadline
def withDeadline(deadline: Deadline): Service
// Sets a new timeout for this service
def withTimeout(duration: zio.duration.Duration): Service
// Sets a new timeout in millis
def withTimeoutMillis(millis: Long): Service
// Replace the call options with the provided call options
def withCallOptions(callOptions: CallOptions): Service
// update the CallOptions for this service
def mapCallOptions(f: CallOptions => CallOptions): Service
// update the request Metadata for this service
def mapMetadataZIO(f: SafeMetadata => UIO[SafeMetadata]): Service
If you are using a client instance, the above methods are available to provide you with a new
client that has a modified CallOptions
effect. Making the copy of those clients is cheap and can
be safely done for each individual call:
val clientScoped = ServiceNameClient.scoped(channel)
// clientScoped: ZIO[Scope, Throwable, ServiceNameClient] = OnSuccess(
// trace = "myexample.testservice.ZioTestservice.ServiceNameClient.scoped(ZioTestservice.scala:109)",
// first = OnSuccess(
// trace = "myexample.testservice.ZioTestservice.ServiceNameClientWithResponseMetadata.scoped(ZioTestservice.scala:191)",
// first = OnSuccess(
// trace = "scalapb.zio_grpc.ZManagedChannel.apply(ZManagedChannel.scala:13)",
// first = Sync(
// trace = "scalapb.zio_grpc.ZManagedChannel.apply(ZManagedChannel.scala:13)",
// eval = zio.ZIO$$$Lambda$15080/0x00000001040dd840@3a8a0345
// ),
// successK = zio.ZIO$$$Lambda$15000/0x000000010409e840@7744254c
// ),
// successK = zio.ZIO$$Lambda$15057/0x00000001040c9840@3ff6cac8
// ),
// successK = zio.ZIO$$Lambda$15057/0x00000001040c9840@5950d795
// )
val myAppLogic = ZIO.scoped {
clientScoped.flatMap { client =>
for {
res <- client
.withTimeoutMillis(3000).unary(Request())
} yield res
}
}
// myAppLogic: ZIO[Any, Throwable, Response] = OnSuccess(
// trace = "repl.MdocSession.MdocApp.myAppLogic(deadlines.md:57)",
// first = OnSuccess(
// trace = "repl.MdocSession.MdocApp.myAppLogic(deadlines.md:57)",
// first = Sync(
// trace = "repl.MdocSession.MdocApp.myAppLogic(deadlines.md:57)",
// eval = zio.Scope$ReleaseMap$$$Lambda$15100/0x0000000104123840@7681e017
// ),
// successK = zio.ZIO$$Lambda$15057/0x00000001040c9840@105accc0
// ),
// successK = zio.ZIO$ScopedPartiallyApplied$$$Lambda$15102/0x0000000104125040@275b2f90
// )