Using ScalaPB with Spark
Introduction
By default, Spark uses reflection to derive schemas and encoders from case
classes. This doesn't work well when there are messages that contain types that
Spark does not understand such as enums, ByteStrings and oneofs. To get around this, sparksql-scalapb provides its own Encoders for protocol buffers.
However, it turns out there is another obstacle. Spark does not provide any mechanism to compose user-provided encoders with its own reflection-derived Encoders. Therefore, merely providing an Encoder for protocol buffers is insufficient to derive an encoder for regular case-classes that contain a protobuf as a field. To solve this problem, ScalaPB uses frameless which relies on implicit search to derive encoders. This approach enables combining ScalaPB's encoders with frameless encoders that takes care for all non-protobuf types.
Setting up your project
We are going to use sbt-assembly to deploy a fat JAR containing ScalaPB, and your compiled protos. Make sure in project/plugins.sbt you have a line that adds sbt-assembly:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
To add sparksql-scalapb to your project, add one of the following lines that matches both the version of ScalaPB and Spark you use:
// Spark 3.5 and ScalaPB 0.11
libraryDependencies += "com.thesamet.scalapb" %% "sparksql35-scalapb0_11" % "1.0.4"
// Spark 3.4 and ScalaPB 0.11
libraryDependencies += "com.thesamet.scalapb" %% "sparksql34-scalapb0_11" % "1.0.4"
// Spark 3.3 and ScalaPB 0.11
libraryDependencies += "com.thesamet.scalapb" %% "sparksql33-scalapb0_11" % "1.0.4"
// Spark 3.2 and ScalaPB 0.11
libraryDependencies += "com.thesamet.scalapb" %% "sparksql32-scalapb0_11" % "1.0.4"
// Spark 3.1 and ScalaPB 0.11
libraryDependencies += "com.thesamet.scalapb" %% "sparksql31-scalapb0_11" % "1.0.4"
// Spark 3.0 and ScalaPB 0.11
libraryDependencies += "com.thesamet.scalapb" %% "sparksql30-scalapb0_11" % "1.0.1"
// Spark 3.3 and ScalaPB 0.10
libraryDependencies += "com.thesamet.scalapb" %% "sparksql33-scalapb0_10" % "1.0.4"
// Spark 3.2 and ScalaPB 0.10
libraryDependencies += "com.thesamet.scalapb" %% "sparksql32-scalapb0_10" % "1.0.4"
// Spark 3.1 and ScalaPB 0.10
libraryDependencies += "com.thesamet.scalapb" %% "sparksql31-scalapb0_10" % "1.0.4"
// Spark 3.0 and ScalaPB 0.10
libraryDependencies += "com.thesamet.scalapb" %% "sparksql30-scalapb0_10" % "1.0.1"
// Spark 2.x and ScalaPB 0.10
libraryDependencies += "com.thesamet.scalapb" %% "sparksql-scalapb" % "0.10.4"
// Spark 2.x and ScalaPB 0.9
libraryDependencies += "com.thesamet.scalapb" %% "sparksql-scalapb" % "0.9.3"
Known issue: Spark 3.2.1 is binary incompatible with Spark 3.2.0 in some of its internal APIs being used. If you use Spark 3.2.0, please stick to sparksql-scalapb 1.0.0-M1.
Spark ships with an old version of Google's Protocol Buffers runtime that is not compatible with the current version. In addition, it comes with incompatible versions of scala-collection-compat and shapeless. Therefore, we need to shade these libraries. Add the following to your build.sbt:
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("com.google.protobuf.**" -> "shadeproto.@1").inAll,
ShadeRule.rename("scala.collection.compat.**" -> "shadecompat.@1").inAll,
ShadeRule.rename("shapeless.**" -> "shadeshapeless.@1").inAll
)
See complete example of build.sbt.
Using sparksql-scalapb
We assume you have a SparkSession assigned to the variable spark. In a standalone Scala program, this can be created with:
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("ScalaPB Demo")
.master("local[2]")
.getOrCreate()
// spark: SparkSession = org.apache.spark.sql.SparkSession@3451fde5
IMPORTANT: Ensure you do not import spark.implicits._ to avoid ambiguity between ScalaPB provided encoders and Spark's default encoders. You may want to import StringToColumn to convert $"col name" into a Column. Add an import scalapb.spark.Implicits to add ScalaPB's encoders for protocol buffers into the implicit search scope:
import org.apache.spark.sql.{Dataset, DataFrame, functions => F}
import spark.implicits.StringToColumn
import scalapb.spark.ProtoSQL
import scalapb.spark.Implicits._
The code snippets below use the Person message.
We start by creating some test data:
import scalapb.docs.person.Person
import scalapb.docs.person.Person.{Address, AddressType}
val testData = Seq(
Person(name="John", age=32, addresses=Vector(
Address(addressType=AddressType.HOME, street="Market", city="SF"))
),
Person(name="Mike", age=29, addresses=Vector(
Address(addressType=AddressType.WORK, street="Castro", city="MV"),
Address(addressType=AddressType.HOME, street="Church", city="MV"))
),
Person(name="Bart", age=27)
)
We can create a DataFrame from the test data:
val df = ProtoSQL.createDataFrame(spark, testData)
// df: DataFrame = [name: string, age: int ... 1 more field]
df.printSchema()
// root
// |-- name: string (nullable = true)
// |-- age: integer (nullable = true)
// |-- addresses: array (nullable = false)
// | |-- element: struct (containsNull = false)
// | | |-- address_type: string (nullable = true)
// | | |-- street: string (nullable = true)
// | | |-- city: string (nullable = true)
//
df.show()
// +----+---+--------------------+
// |name|age| addresses|
// +----+---+--------------------+
// |John| 32|[{HOME, Market, SF}]|
// |Mike| 29|[{WORK, Castro, M...|
// |Bart| 27| []|
// +----+---+--------------------+
//
and then process it as any other Dataframe in Spark:
df.select($"name", F.size($"addresses").alias("address_count")).show()
// +----+-------------+
// |name|address_count|
// +----+-------------+
// |John| 1|
// |Mike| 2|
// |Bart| 0|
// +----+-------------+
//
val nameAndAddress = df.select($"name", $"addresses".getItem(0).alias("firstAddress"))
// nameAndAddress: DataFrame = [name: string, firstAddress: struct<address_type: string, street: string ... 1 more field>]
nameAndAddress.show()
// +----+------------------+
// |name| firstAddress|
// +----+------------------+
// |John|{HOME, Market, SF}|
// |Mike|{WORK, Castro, MV}|
// |Bart| null|
// +----+------------------+
//
Using the datasets API it is possible to bring the data back to ScalaPB case classes:
nameAndAddress.as[(String, Option[Address])].collect().foreach(println)
// (John,Some(Address(HOME,Market,SF,UnknownFieldSet(Map()))))
// (Mike,Some(Address(WORK,Castro,MV,UnknownFieldSet(Map()))))
// (Bart,None)
You can create a Dataset directly using Spark APIs:
spark.createDataset(testData)
// res5: Dataset[Person] = [name: string, age: int ... 1 more field]
From Binary to protos and back
In some situations, you may need to deal with datasets that contain serialized protocol buffers. This can be handled by mapping the datasets through ScalaPB's parseFrom and toByteArray functions.
Let's start by preparing a dataset with test binary data by mapping our testData:
val binaryDS: Dataset[Array[Byte]] = spark.createDataset(testData.map(_.toByteArray))
// binaryDS: Dataset[Array[Byte]] = [value: binary]
binaryDS.show()
// +--------------------+
// | value|
// +--------------------+
// |[0A 04 4A 6F 68 6...|
// |[0A 04 4D 69 6B 6...|
// |[0A 04 42 61 72 7...|
// +--------------------+
//
To turn this dataset into a Dataset[Person], we map it through parseFrom:
val protosDS: Dataset[Person] = binaryDS.map(Person.parseFrom(_))
// protosDS: Dataset[Person] = [name: string, age: int ... 1 more field]
to turn a dataset of protos into Dataset[Array[Byte]]:
val protosBinary: Dataset[Array[Byte]] = protosDS.map(_.toByteArray)
// protosBinary: Dataset[Array[Byte]] = [value: binary]
On enums
In SparkSQL-ScalaPB, enums are represented as strings. Unrecognized enum values are represented as strings containing the numeric value.
Dataframes and Datasets from RDDs
import org.apache.spark.rdd.RDD
val protoRDD: RDD[Person] = spark.sparkContext.parallelize(testData)
val protoDF: DataFrame = ProtoSQL.protoToDataFrame(spark, protoRDD)
val protoDS: Dataset[Person] = spark.createDataset(protoRDD)
UDFs
If you need to write a UDF that returns a message, it would not pick up our encoder and you may get a runtime failure. To work around this, sparksql-scalapb provides ProtoSQL.udf to create UDFs. For example, if you need to parse a binary column into a proto:
val binaryDF = protosBinary.toDF("value")
// binaryDF: DataFrame = [value: binary]
val parsePersons = ProtoSQL.udf { bytes: Array[Byte] => Person.parseFrom(bytes) }
// parsePersons: org.apache.spark.sql.Column => org.apache.spark.sql.Column = scalapb.spark.Udfs$$Lambda$14716/0x0000000103bad840@2d973625
binaryDF.withColumn("person", parsePersons($"value"))
// res7: DataFrame = [value: binary, person: struct<name: string, age: int ... 1 more field>]
Primitive wrappers
In ProtoSQL 0.9.x and 0.10.x, primitive wrappers are represented in Spark as structs
witha single field named value. A better representation in Spark would be a
nullable field of the primitive type. The better representation will be the
default in 0.11.x. To enable this representation today, replace the usages of
scalapb.spark.ProtoSQL with scalapb.spark.ProtoSQL.withPrimitiveWrappers.
Instead of importing scalapb.spark.Implicits._, import
scalapb.spark.ProtoSQL.implicits._
See example in WrappersSpec.
Datasets and <none> is not a term
You will see this error if for some reason Spark's Encoders are being picked up
instead of the ones provided by sparksql-scalapb. Please ensure you are not importing spark.implicits._. See instructions above for imports.
Example
Check out a complete example here.