Scala
Data Definition Language (DDL)
OpenHouse tables supports Apache Iceberg as the underlying table format. You can use native Spark syntax to create, alter, and drop tables, but do note there are some constraints OpenHouse imposes.
For DDLs such as CREATE TABLE
, ALTER TABLE
, GRANT/REVOKE
etc; use .sql()
in SparkSession. You can also use
native Spark Scala syntax that creates table if not exists.
import org.apache.spark.sql.functions;
// Your code preparing DataFrame
df.writeTo("openhouse.<dbName>.<tableName>").create()
Reads
To query a table, run the following:
val df = spark.table("openhouse.db.table")
You can also filter data using custom filters as follows:
val filtered_df = df.filter(col("datepartition") > "2022-05-10")
With Time-Travel
Identify older snapshot you want to read, by Inspecting Metadata. Then run the following:
spark.read.option("snapshot-id", 1031031135387001532L).table("openhouse.db.table")
Writes
We highly recommend users adopt Apache Spark’s new DataFrameWriterV2 API in Spark 3 when programming with DataFrame API.
The following are example Scala statements in Spark 3 to write to a partitioned table.
Create Table
import org.apache.spark.sql.functions;
// Your code preparing DataFrame
df.writeTo("openhouse.db.table").create()
Create Partitioned Table
import org.apache.spark.sql.functions;
// Your code to create a table through existing data frame.
df.sortWithinPartitions("datepartition")
.writeTo("openhouse.db.table")
.partitionedBy(functions.col("datepartition"))
.create()
Append Data to Partitioned Table
append_df.sortWithinPartitions("datepartition")
.writeTo("openhouse.db.table")
.append()
Overwrite Data
// You can dynamically overwrite partitions, which means
// any partitions with at least one row matched will be overwritten.
// To make overwritePartitions work for table-overwrite:
overwrite_df.sortWithinPartitions("datepartition")
.writeTo("openhouse.db.table")
.overwritePartitions()
// To explicitly overwrite, use the following
overwrite_df.sortWithinPartitions("datepartition")
.writeTo("openhouse.db.table")
.overwrite($"level" === "INFO")
Note that explicit sort is necessary in partition-write because Spark doesn’t allow Iceberg to request a sort before writing as of Spark 3.0. See more at link.