Table Stream Reads and Writes — Delta Lake Documentation (2023)

Delta Lake is deeply integrated withSpark Structured Transmissionthroughread streamywriteStream. Delta Lake overcomes many of the limitations typically associated with streaming file and systems, including:

  • Maintain "exact once" processing with more than one stream (or concurrent batch jobs)

  • Effectively detects which files are new when using files as the source of a stream

For many Delta Lake operations on tables, enable integration with Apache Spark DataSourceV2 and Catalog API (since 3.0) by setting configurations when you create a newSparkSession. SeConfigurar SparkSession.

In this article:

  • Delta table as source

    • Limit input speed

    • Ignore updates and deletions

    • Enter start position

    • Process the initial snapshot without deleting data

  • Delta table as sink

    • add condition

    • complete condition

  • idempotent table writes to foreachBatch

    • Example

Delta table as source

When you load a Delta table as a streaming source and use it in a streaming query, the query processes all data present in the table, as well as any new data that arrives after streaming begins.

spark - spark.read stream.Format("delta") .charge("/tmp/delta/events")matter yo.delta.implicit._spark - spark.read stream.delta("/tmp/delta/events")

In this episode:

  • Limit input speed

  • Ignore updates and deletions

  • Enter start position

  • Process the initial snapshot without deleting data

Limit input speed

The following options are available for managing microlots:

  • maxFilesPerTrigger: How many new files to consider in each microlot. The default is 1000.

  • maxBytesPerTrigger: How much data is processed in each microbatch. This option sets a "soft maximum", which means that a batch processes approximately this amount of data and can process more than the limit to push the streaming query forward in cases where the smallest input unit is greater than this limit. if you are usingTrigger.Oncefor streaming, this option is ignored. This is not set by default.

if you are usingmaxBytesPerTriggerIn connection withmaxFilesPerTrigger, the microbatch processes the data untilmaxFilesPerTriggeromaxBytesPerTriggerthe limit has been reached.

Use

In cases where transactions from the source table are cleaned up due tologRetentionDuration settingand the flow is late in processing, Delta Lake processes the data for the last available transaction history for the source table, but the flow does not fail. This may result in deletion of data.

Ignore updates and deletions

Structured streaming does not handle non-stub input and throws an exception if there are changes to the source table. There are two main strategies for handling changes that cannot be automatically propagated down:

  • You can delete the start and checkpoint and restart the stream from the beginning.

  • You can set one of these two options:

    • ignoreDeletions: Ignore transactions that delete data at partition boundaries.

    • ignore changes– Reprocess updates if the files had to be written back to the source table due to a data change operation such asUPDATE,FUSION IND I,DELETE(within partitions), orOVERWRITE. Unchanged rows can still be emitted, so your downstream consumers should be able to handle duplicates. Deletions are not propagated downstream.ignore changessubordinateignoreDeletions. So if you useignore changes, your stream will not be affected by deletions or updates to the source table.

Example

For example, suppose you have a tableuser_eventscongiven,user_email, ydrivingcolumns divided bygiven. you flow out ofuser_eventstable and you need to delete data from it due to GDPR.

When deleting at partition boundaries (i.e.,WHEREis in a partition column), the files are already segmented by value, so deletion only removes those files from the metadata. So if you only want to delete data from some partitions you can use:

spark - spark.read stream.Format("delta") .possibility("ignorerRemove", "GOOD") .charge("/tmp/delta/user_events")

But if you need to delete data fromuser_email, then you should use:

spark - spark.read stream.Format("delta") .possibility("Ignore Changes", "GOOD") .charge("/tmp/delta/user_events")

And update oneuser_emailconUPDATEstatement, the file containinguser_emailin question has been rewritten. when you useignore changes, the new record is propagated down with all other unchanged records that were in the same file. Your logic should be able to handle these incoming duplicate records.

Enter start position

You can use the following options to specify the starting point of the Delta Lake transmission source without processing the entire table.

  • home version: The Delta Lake version to start with. All table changes of this version (inclusive) will be read by the broadcast source. You can download the commit releases fromversioncolumn ofDESCRIBE THE STORYcommand output.

    • Specify only to return the most recent changesmore recent.

  • start timestamp: The timestamp to start from. All table changes made on or after the timestamp (inclusive) will be read by the streaming source. One of:

    • A timestamp string. For example,"2019-01-01T00:00:00.000Z".

    • A date string. For example,"2019-01-01".

You cannot set both options at the same time; you can only use one of them. They only take effect when you start a new streaming request. If a streaming query has been started and progress has been posted to its checkpoint, these options are ignored.

Important

Although you can start the streaming source from a specific version or timestamp, the streaming source schema is always the most recent schema in the Delta table. You must ensure that there are no incompatible schema changes in the Delta table after the specified version or timestamp. Otherwise, the streaming source may return incorrect results when the data is read with an incorrect schema.

Example

For example, suppose you have a tableuser_events. To read changes since version 5, use:

spark - spark.read stream.Format("delta") .possibility("starter version", "5") .charge("/tmp/delta/user_events")

To read the changes since 2018-10-18, use:

spark - spark.read stream.Format("delta") .possibility("start timestamp", "2018-10-18") .charge("/tmp/delta/user_events")

Process the initial snapshot without deleting data

When you use a Delta table as a flow source, the query first processes all the data contained in the table. The delta table in this release is called an initial snapshot. By default, Delta table data files are processed based on the file that was last modified. However, the last change time does not necessarily represent the time sequence of the log event.

In a stateful streaming query with a defined watermark, processing files after the modification time can cause the records to be processed in the wrong order. This can lead to records dropping as late events in the watermark.

You can avoid the data loss issue by enabling the following option:

  • withEventTimeOrder – Whether to process the initial snapshot with the event time order.

With event time ordering enabled, the event time interval for the initial snapshot data is divided into time intervals. Each micro batch processes one cube by filtering data within the time interval. The maxFilesPerTrigger and maxBytesPerTrigger configuration options are still applicable for microbatch size control, but only roughly due to the nature of processing.

The following graphic shows this process:

Table Stream Reads and Writes — Delta Lake Documentation (1)

Highlights about this feature:

  • The data drop issue only occurs when the initial Delta snapshot of a stateful streaming query is processed in the default order.

  • you can't changemed EventTimeOrderwhen the stream query is started while the initial snapshot is still being processed. To reset withmed EventTimeOrdermodified, delete the checkpoint.

  • If you run a stream query with EventTimeOrder enabled, you cannot downgrade it to a version of DBR that does not support this feature until the initial snapshot processing is complete. If you need to downgrade, you can wait for the initial snapshot to complete, or you can remove the checkpoint and restart the query.

  • This feature is not supported in the following unusual scenarios:

    • The event time column is a generated column and there are non-projection transforms between the Delta source and the watermark.

    • There is a watermark that has more than one Delta source in the streaming query.

  • With event time ordering enabled, execution of Delta initial snapshot processing may be slower.

  • Each micro batch scans the initial snapshot to filter the data within the corresponding event time range. For faster filtering action, it is recommended to use a Delta source column as the event time so that data omission can be applied (check_when relevant). Additionally, partitioning tables along the event time column can further speed up processing. You can query the Spark UI to see how many delta files are being parsed for a specific microbatch.

Example

Suppose you have a tableuser_eventswith aEvent timecolumn. Your streaming query is an aggregation query. To ensure that no data is deleted during the initial processing of the snapshot, you can use:

spark - spark.read stream.Format("delta") .possibility("with event time order", "GOOD") .charge("/tmp/delta/user_events") .with watermark("Event time", "10 seconds")

..note: You can also enable this with the clustered Spark setting that will apply to all streaming queries: spark.databricks.delta.withEventTimeOrder.enabled true

Delta table as sink

You can also write data to a delta table using structured streaming. The transaction log allows Delta Lake to guarantee exactly one processing even when there are other streams or batch queries running concurrently on the table.

Use

lago deltaEMPTYThe function removes all files not managed by Delta Lake, but skips all folders beginning with_. You can safely store checkpoints along with other data and metadata in a Delta table using a folder structure like/_checkpoints.

In this episode:

  • add condition

  • complete condition

add condition

By default, streams run in add mode, which adds new records to the table.

You can use the route method:

events.writeStream .Format("delta") .tilt outlet("Add") .possibility("checkpoint location", "/tmp/delta/_checkpoints/") .Begin("/delta/events")
events.writeStream .Format("delta") .tilt outlet("Add") .possibility("checkpoint location", "/tmp/delta/events/_checkpoints/") .Begin("/tmp/delta/events")matter yo.delta.implicit._events.writeStream .tilt outlet("Add") .possibility("checkpoint location", "/tmp/delta/events/_checkpoints/") .delta("/tmp/delta/events")

or the oneto the tablemethod in Spark 3.1 and higher (Delta Lake library 8.3 and higher), as follows. (In versions of Spark prior to 3.1 (Delta Lake library 8.2 and earlier), useboardmethod instead.)

events.writeStream .Format("delta") .tilt outlet("Add") .possibility("checkpoint location", "/tmp/delta/events/_checkpoints/") .to the table("events")
events.writeStream .tilt outlet("Add") .possibility("checkpoint location", "/tmp/delta/events/_checkpoints/") .to the table("events")

complete condition

You can also use Structured Streaming to replace the entire table with each batch. An example is calculating an overview using aggregation:

(spark - spark.read stream .Format("delta") .charge("/tmp/delta/events") .group by("Customer identification") .tell() .writeStream .Format("delta") .tilt outlet("complete") .possibility("checkpoint location", "/tmp/delta/eventsByCustomer/_checkpoints/") .Begin("/tmp/delta/eventsPerCustomer"))
spark - spark.read stream .Format("delta") .charge("/tmp/delta/events") .group by("Customer identification") .tell() .writeStream .Format("delta") .tilt outlet("complete") .possibility("checkpoint location", "/tmp/delta/eventsByCustomer/_checkpoints/") .Begin("/tmp/delta/eventsPerCustomer")

The previous example continually updates a table containing the total number of events per customer.

For applications with more relaxed latency requirements, you can save compute resources with single triggers. Use them to update summary aggregation tables on a set schedule, processing only new data that has arrived since the last update.

idempotent table writes to foreachBatch

Use

Available in Delta Lake 2.0.0 and later.

The commandforeachLoteallows you to specify a function that is performed on the output of each microbatch after arbitrary transformations in the streaming query. This makes it possible to implement aforeachLotefunction that can write microbatch output to one or more destination delta table destinations. However,foreachLotedoes not make these writes idempotent, since these write attempts lack information about whether or not the batch will be redone. For example, rerunning a failed batch can cause duplicate data writes.

To address this, Delta tables support the followingdata frame writeroptions to make the write idempotent:

  • txnAppId: A unique string, which you can send on everydata framewrite. For example, you can use the StreamingQuery ID astxnAppId.

  • txnVersion: A monotonically increasing number that acts as the transaction version.

The Delta table uses the combination oftxnAppIdytxnVersionto identify duplicate scriptures and ignore them.

If a batch write is aborted with an error, rerunning the batch uses the same program and the same batch ID, which will help the runtime correctly identify duplicate writes and ignore them. app id (txnAppId) can be any unique user-generated string and need not be related to the stream ID.

Warning

If you remove the broadcast checkpoint and restart the query with a new checkpoint, you must specify a different oneapp id; otherwise, writes from the restarted query will be ignored because it will contain the sametxnAppIdand the batch ID starts from 0.

The samedata frame writerThe options can be used to achieve idempotent writes in non-streaming jobs. for detailsidempotent printer.

Example

application_id = ... # A unique string used as the program ID.definitely writeToDeltaLakeTableIdempotent(lote_df, batch ID): lote_df.write.Format(...).possibility("txn version", batch ID).possibility("txnAppId", application_id).Saving(...) # location 1 lote_df.write.Format(...).possibility("txn version", batch ID).possibility("txnAppId", application_id).Saving(...) # location 2
valor app id = ... // A unique string used as the program ID.streamingDF.writeStream.foreachLote { (loteDF: data frame, batch ID: language) => loteDF.write.Format(...).possibility("txn version", batch ID).possibility("txnAppId", app id).Saving(...) // place 1 loteDF.write.Format(...).possibility("txn version", batch ID).possibility("txnAppId", app id).Saving(...) // location 2}
Top Articles
Latest Posts
Article information

Author: Jeremiah Abshire

Last Updated: 07/07/2023

Views: 6223

Rating: 4.3 / 5 (54 voted)

Reviews: 93% of readers found this page helpful

Author information

Name: Jeremiah Abshire

Birthday: 1993-09-14

Address: Apt. 425 92748 Jannie Centers, Port Nikitaville, VT 82110

Phone: +8096210939894

Job: Lead Healthcare Manager

Hobby: Watching movies, Watching movies, Knapping, LARPing, Coffee roasting, Lacemaking, Gaming

Introduction: My name is Jeremiah Abshire, I am a outstanding, kind, clever, hilarious, curious, hilarious, outstanding person who loves writing and wants to share my knowledge and understanding with you.