- documentation
- Table stream reads and writes
- Delta Lake GitHub Repository
Delta Lake is deeply integrated withSpark Structured Transmissionthroughread stream
ywriteStream
. Delta Lake overcomes many of the limitations typically associated with streaming file and systems, including:
Maintain "exact once" processing with more than one stream (or concurrent batch jobs)
Effectively detects which files are new when using files as the source of a stream
For many Delta Lake operations on tables, enable integration with Apache Spark DataSourceV2 and Catalog API (since 3.0) by setting configurations when you create a newSparkSession
. SeConfigurar SparkSession.
In this article:
Delta table as source
Limit input speed
Ignore updates and deletions
Enter start position
Process the initial snapshot without deleting data
Delta table as sink
add condition
complete condition
idempotent table writes to foreachBatch
Example
Delta table as source
When you load a Delta table as a streaming source and use it in a streaming query, the query processes all data present in the table, as well as any new data that arrives after streaming begins.
spark - spark.read stream.Format("delta") .charge("/tmp/delta/events")matter yo.delta.implicit._spark - spark.read stream.delta("/tmp/delta/events")
In this episode:
Limit input speed
Ignore updates and deletions
Enter start position
Process the initial snapshot without deleting data
Limit input speed
The following options are available for managing microlots:
maxFilesPerTrigger
: How many new files to consider in each microlot. The default is 1000.maxBytesPerTrigger
: How much data is processed in each microbatch. This option sets a "soft maximum", which means that a batch processes approximately this amount of data and can process more than the limit to push the streaming query forward in cases where the smallest input unit is greater than this limit. if you are usingTrigger.Once
for streaming, this option is ignored. This is not set by default.
if you are usingmaxBytesPerTrigger
In connection withmaxFilesPerTrigger
, the microbatch processes the data untilmaxFilesPerTrigger
omaxBytesPerTrigger
the limit has been reached.
Use
In cases where transactions from the source table are cleaned up due tologRetentionDuration
settingand the flow is late in processing, Delta Lake processes the data for the last available transaction history for the source table, but the flow does not fail. This may result in deletion of data.
Ignore updates and deletions
Structured streaming does not handle non-stub input and throws an exception if there are changes to the source table. There are two main strategies for handling changes that cannot be automatically propagated down:
You can delete the start and checkpoint and restart the stream from the beginning.
You can set one of these two options:
ignoreDeletions
: Ignore transactions that delete data at partition boundaries.ignore changes
– Reprocess updates if the files had to be written back to the source table due to a data change operation such asUPDATE
,FUSION IND I
,DELETE
(within partitions), orOVERWRITE
. Unchanged rows can still be emitted, so your downstream consumers should be able to handle duplicates. Deletions are not propagated downstream.ignore changes
subordinateignoreDeletions
. So if you useignore changes
, your stream will not be affected by deletions or updates to the source table.
Example
For example, suppose you have a tableuser_events
congiven
,user_email
, ydriving
columns divided bygiven
. you flow out ofuser_events
table and you need to delete data from it due to GDPR.
When deleting at partition boundaries (i.e.,WHERE
is in a partition column), the files are already segmented by value, so deletion only removes those files from the metadata. So if you only want to delete data from some partitions you can use:
spark - spark.read stream.Format("delta") .possibility("ignorerRemove", "GOOD") .charge("/tmp/delta/user_events")
But if you need to delete data fromuser_email
, then you should use:
spark - spark.read stream.Format("delta") .possibility("Ignore Changes", "GOOD") .charge("/tmp/delta/user_events")
And update oneuser_email
conUPDATE
statement, the file containinguser_email
in question has been rewritten. when you useignore changes
, the new record is propagated down with all other unchanged records that were in the same file. Your logic should be able to handle these incoming duplicate records.
Enter start position
You can use the following options to specify the starting point of the Delta Lake transmission source without processing the entire table.
home version
: The Delta Lake version to start with. All table changes of this version (inclusive) will be read by the broadcast source. You can download the commit releases fromversion
column ofDESCRIBE THE STORYcommand output.Specify only to return the most recent changes
more recent
.
start timestamp
: The timestamp to start from. All table changes made on or after the timestamp (inclusive) will be read by the streaming source. One of:A timestamp string. For example,
"2019-01-01T00:00:00.000Z"
.A date string. For example,
"2019-01-01"
.
You cannot set both options at the same time; you can only use one of them. They only take effect when you start a new streaming request. If a streaming query has been started and progress has been posted to its checkpoint, these options are ignored.
Important
Although you can start the streaming source from a specific version or timestamp, the streaming source schema is always the most recent schema in the Delta table. You must ensure that there are no incompatible schema changes in the Delta table after the specified version or timestamp. Otherwise, the streaming source may return incorrect results when the data is read with an incorrect schema.
Example
For example, suppose you have a tableuser_events
. To read changes since version 5, use:
spark - spark.read stream.Format("delta") .possibility("starter version", "5") .charge("/tmp/delta/user_events")
To read the changes since 2018-10-18, use:
spark - spark.read stream.Format("delta") .possibility("start timestamp", "2018-10-18") .charge("/tmp/delta/user_events")
Process the initial snapshot without deleting data
When you use a Delta table as a flow source, the query first processes all the data contained in the table. The delta table in this release is called an initial snapshot. By default, Delta table data files are processed based on the file that was last modified. However, the last change time does not necessarily represent the time sequence of the log event.
In a stateful streaming query with a defined watermark, processing files after the modification time can cause the records to be processed in the wrong order. This can lead to records dropping as late events in the watermark.
You can avoid the data loss issue by enabling the following option:
withEventTimeOrder – Whether to process the initial snapshot with the event time order.
With event time ordering enabled, the event time interval for the initial snapshot data is divided into time intervals. Each micro batch processes one cube by filtering data within the time interval. The maxFilesPerTrigger and maxBytesPerTrigger configuration options are still applicable for microbatch size control, but only roughly due to the nature of processing.
The following graphic shows this process:
Highlights about this feature:
The data drop issue only occurs when the initial Delta snapshot of a stateful streaming query is processed in the default order.
you can't change
med EventTimeOrder
when the stream query is started while the initial snapshot is still being processed. To reset withmed EventTimeOrder
modified, delete the checkpoint.If you run a stream query with EventTimeOrder enabled, you cannot downgrade it to a version of DBR that does not support this feature until the initial snapshot processing is complete. If you need to downgrade, you can wait for the initial snapshot to complete, or you can remove the checkpoint and restart the query.
This feature is not supported in the following unusual scenarios:
The event time column is a generated column and there are non-projection transforms between the Delta source and the watermark.
There is a watermark that has more than one Delta source in the streaming query.
With event time ordering enabled, execution of Delta initial snapshot processing may be slower.
Each micro batch scans the initial snapshot to filter the data within the corresponding event time range. For faster filtering action, it is recommended to use a Delta source column as the event time so that data omission can be applied (check_when relevant). Additionally, partitioning tables along the event time column can further speed up processing. You can query the Spark UI to see how many delta files are being parsed for a specific microbatch.
Example
Suppose you have a tableuser_events
with aEvent time
column. Your streaming query is an aggregation query. To ensure that no data is deleted during the initial processing of the snapshot, you can use:
spark - spark.read stream.Format("delta") .possibility("with event time order", "GOOD") .charge("/tmp/delta/user_events") .with watermark("Event time", "10 seconds")
..note: You can also enable this with the clustered Spark setting that will apply to all streaming queries: spark.databricks.delta.withEventTimeOrder.enabled true
Delta table as sink
You can also write data to a delta table using structured streaming. The transaction log allows Delta Lake to guarantee exactly one processing even when there are other streams or batch queries running concurrently on the table.
Use
lago deltaEMPTY
The function removes all files not managed by Delta Lake, but skips all folders beginning with_
. You can safely store checkpoints along with other data and metadata in a Delta table using a folder structure like
.
In this episode:
add condition
complete condition
add condition
By default, streams run in add mode, which adds new records to the table.
You can use the route method:
events.writeStream .Format("delta") .tilt outlet("Add") .possibility("checkpoint location", "/tmp/delta/_checkpoints/") .Begin("/delta/events")
events.writeStream .Format("delta") .tilt outlet("Add") .possibility("checkpoint location", "/tmp/delta/events/_checkpoints/") .Begin("/tmp/delta/events")matter yo.delta.implicit._events.writeStream .tilt outlet("Add") .possibility("checkpoint location", "/tmp/delta/events/_checkpoints/") .delta("/tmp/delta/events")
or the oneto the table
method in Spark 3.1 and higher (Delta Lake library 8.3 and higher), as follows. (In versions of Spark prior to 3.1 (Delta Lake library 8.2 and earlier), useboard
method instead.)
events.writeStream .Format("delta") .tilt outlet("Add") .possibility("checkpoint location", "/tmp/delta/events/_checkpoints/") .to the table("events")
events.writeStream .tilt outlet("Add") .possibility("checkpoint location", "/tmp/delta/events/_checkpoints/") .to the table("events")
complete condition
You can also use Structured Streaming to replace the entire table with each batch. An example is calculating an overview using aggregation:
(spark - spark.read stream .Format("delta") .charge("/tmp/delta/events") .group by("Customer identification") .tell() .writeStream .Format("delta") .tilt outlet("complete") .possibility("checkpoint location", "/tmp/delta/eventsByCustomer/_checkpoints/") .Begin("/tmp/delta/eventsPerCustomer"))
spark - spark.read stream .Format("delta") .charge("/tmp/delta/events") .group by("Customer identification") .tell() .writeStream .Format("delta") .tilt outlet("complete") .possibility("checkpoint location", "/tmp/delta/eventsByCustomer/_checkpoints/") .Begin("/tmp/delta/eventsPerCustomer")
The previous example continually updates a table containing the total number of events per customer.
For applications with more relaxed latency requirements, you can save compute resources with single triggers. Use them to update summary aggregation tables on a set schedule, processing only new data that has arrived since the last update.
idempotent table writes to foreachBatch
Use
Available in Delta Lake 2.0.0 and later.
The commandforeachLoteallows you to specify a function that is performed on the output of each microbatch after arbitrary transformations in the streaming query. This makes it possible to implement aforeachLote
function that can write microbatch output to one or more destination delta table destinations. However,foreachLote
does not make these writes idempotent, since these write attempts lack information about whether or not the batch will be redone. For example, rerunning a failed batch can cause duplicate data writes.
To address this, Delta tables support the followingdata frame writer
options to make the write idempotent:
txnAppId
: A unique string, which you can send on everydata frame
write. For example, you can use the StreamingQuery ID astxnAppId
.txnVersion
: A monotonically increasing number that acts as the transaction version.
The Delta table uses the combination oftxnAppId
ytxnVersion
to identify duplicate scriptures and ignore them.
If a batch write is aborted with an error, rerunning the batch uses the same program and the same batch ID, which will help the runtime correctly identify duplicate writes and ignore them. app id (txnAppId
) can be any unique user-generated string and need not be related to the stream ID.
Warning
If you remove the broadcast checkpoint and restart the query with a new checkpoint, you must specify a different oneapp id
; otherwise, writes from the restarted query will be ignored because it will contain the sametxnAppId
and the batch ID starts from 0.
The samedata frame writer
The options can be used to achieve idempotent writes in non-streaming jobs. for detailsidempotent printer.
Example
application_id = ... # A unique string used as the program ID.definitely writeToDeltaLakeTableIdempotent(lote_df, batch ID): lote_df.write.Format(...).possibility("txn version", batch ID).possibility("txnAppId", application_id).Saving(...) # location 1 lote_df.write.Format(...).possibility("txn version", batch ID).possibility("txnAppId", application_id).Saving(...) # location 2
valor app id = ... // A unique string used as the program ID.streamingDF.writeStream.foreachLote { (loteDF: data frame, batch ID: language) => loteDF.write.Format(...).possibility("txn version", batch ID).possibility("txnAppId", app id).Saving(...) // place 1 loteDF.write.Format(...).possibility("txn version", batch ID).possibility("txnAppId", app id).Saving(...) // location 2}