In the previous article (Click here) we talked about common big data challenges. One of the most frequent problems for data engineers is the execution of queries. Using several techniques listed below, Delta Lake addresses these challenges and can significantly improve query performance:
- Data Indexing: Delta automatically creates and maintains file indexes (paths). The index is used to skip data.
- Data Skip: When calculating basic file statistics (such as min and max values), indexing and partitioning information is preserved, as only relevant subsets of the data are read when queried by filters.
- Compression (containerization): Delta manages the sizes of the underlying parquet files by merging small files into larger files.
- Z-sorting (multidimensional grouping): Delta rearranges data to place related information in the same set of files
- Data Caching: Delta automatically caches high-access data, resulting in significantly improved read speeds.
- Union optimization: By specifying the range, point in range, or overlap range, we can help speed up the union query.Reference documents for more information.
Let's see some examples of how we can speed up query time.
Data compression (cube packing)
As source data, we will use public data provided by Databricks. It is located in the Databricks internal file system in the path dbfs:/databricks-datasets/samples/lending_club/parquet/.under this linkYou can find the original source of this data in the Kaggle data science community. All queries used in this article were conducted free of charge.Data Brick Community Edition (DBCE)using a standard community optimized cluster with 6 GB memory, 0.88 cores, 1 DBU and Databricks Runtime 6.3.
By default, the loan statistics data is divided into 5 files that are between 20 MB and 70 MB in size. However, very often, engineers have to deal with hundreds and thousands of relatively small files (~1 MB less). Too many small files drastically reduce the efficiency of data set operations. Let's simulate this situation and divide the data frame into 1200 parts.
# Configure locationssourcedata_path = "/databricks-datasets/samples/lending_club/parquet/"deltalake_path = "/tmp/lending_club_delta/"# Read loan statisticsloan_stats = spark.read.parquet(sourcedata_path)# Delete table if existedbutils.fs.rm( deltalake_path, recurse=True)# Save the table in Delta format as 1200 partsloan_stats.repartition(1200).write.format("delta").mode("overwrite").save(deltalake_path)# Reread as Delta Lakeloan_stats_delta = spark. .format("delta").load(deltalake_path)spark.sql("DELETE TABLE IF EXISTS delta_loans)spark.sql("CREATE TABLE delta_loans USING DELTA LOCATION '/tmp/udlån_klub_delta'")
Once the data is loaded, we run a simple SELECT query with an aggregation and a filter. Please note that it takes a few minutes (~3-4 minutes) to process the request.
%sqlSELECT purpose, term, grade, avg(annual_inc) FROM loan_statistics_delta WHERE purpose IN ('car', 'credit card', 'house') GROUP BY purpose, term, gradeORDER BY purpose, term, grade
There is also a suggestion from the Databricks engine that we should OPTIMIZE the dataset as there are simply too many small files. But before we do that, let's take a look at the files below. They are all around 0.2MB in size and this is considered a very small file size for Spark. Spark really wants to read files that are hundreds and even 1 GB in size. When we issue the above query, all 1200 small files need to be open and read, and this causes performance not to be what we want.
%fs ls /tmp/lending_club_delta
OPTIMIZEdomain (while). This is the key feature of Delta Lake optimization methods. What it does is simply collect and compress the files into larger archives (containerization optimization). The sweet spot of file size studied by Databricks forbest query performance is around 1 GBy
OPTIMIZEthe command will eventually produce files of that size if possible. However, we can check this "sweet spot" size and change it if necessary.
Let's run it.
...and check again the same SELECT query that we ran a moment ago.
There is onesignificant improvement in query execution time (less than 10 seconds). But as you may have noticed, this "performance improvement" mostly comes at a huge additional cost.OPTIMIZE is a costly operation in terms of time and resources. In our case, it took more than 3 minutes to complete the optimization process. that's why it isdoes not fire automaticallyDelta Lake and engineering must analyze and decide a trade-off between performance and cost when it comes to how often OPTIMIZE is run.
What do our files look like in DBFS?
There are still a lot of small files. This is because Databricks Delta manages the transactions; there may be longer running queries or processes still accessing older files after the compression is complete. Any new queries or jobs submitted at this point will end up accessing the newest and largest files, but any existing jobs will still query the oldest files.
You can clean at regular intervals by calling
EMPTYdelete files older than 7 days. But you can manually set your own hold by specifying one
KEEPparameter. It is strongly recommended that you do not set the retention to zero hours unless you are absolutely certain that no other processes are writing to or reading from your table. To set the retention below 168 hours, we need to change the cluster configuration that disables retention checks:
%sqlEMPTY loan_statistics_delta LOOK 0 TIMER
Having a lot of small files is not by default a negative scenario. Conversely, there are scenarios where queries run much faster because they operate on files smaller than 1 GB. E.g:
- Streaming use cases where latency on the order of minutes is acceptable.
MERGE INTOis the preferred method of logging into Delta Lake. (Large files are good for queries, not MERGE)
CREATE TABLE HOW TO SELECTo
INSERT INTOare commonly used operations.
For such cases thereAUTOMATIC OPTIMIZATIONcreated that will automatically compress small files during individual writes to a Delta table, but will compress files for128 MBinstead of 1GB. For more information on how to use it, pleasesee the docs.
Let's go a step further to improve query performance in our example. We can use the Delta Cache feature which will automatically create copies of remote files on the local storage of the nodes. Thanks to it, any successive reading of the same data is done locally without the need to move the data between nodes.
By default, the Delta Cache feature is disabled if we did not select the Delta Cache accelerated worker type when we configured the cluster.
We can check if it is activated with
spark.conf.get("spark.databricks.io.cache.enabled")and turn it on if disabled
Let's run our reference query again. Look at the execution time: in our case, it is more than 4 seconds.
%sql SELECT purpose, term, grade, avg(annual_inc) FROM loan_stats_delta WHERE purpose IN ('car', 'credit_card', 'house') GROUP BY purpose, term, grade SORT BY purpose, term, grade
If we take a look at the Spark log of the query we just ran, you'll notice that no data was read from the cache, but some MB of data was written to the cache.
Rerun the above query. Now the query ran 2-4 times faster, since some of the data was read from the Delta cache.
We can monitor the current state of the Delta cache on each of the executors in the Storage tab of the Spark UI.
To improve query performance, Delta Lake introduced a number of features and continues to implement new ones. By default, Delta Lake maintains basic statistics about the data, as well as the index of files and partitions where the data is stored. Thanks to that, every time we execute a query, the engine does not need to scan the entire data frame, but using the indices and statistics stored in the Delta Log, it can skip most of the data and open only the necessary files. to process the query. query In addition, we have the option to tune performance by manually running the OPTIMIZE command with Z-ORDER to compress small files and fit the most frequently used data, or we can enable automatic delta caching and/or automatic optimization. However, we must take into account that while some optimization functions (which redistribute the data in the files) can help us to speed up the execution time of some queries (for example, OPTIMIZE), at the same time they can make other queries are slower (for example, much smaller files). than OPTIMIZE's standard 1GB)
There are also other optimization techniques you can implement to further tune your transformation, especially joins.
The Delta Lake team is constantly updating the docs and adding new options to improve performance, so it pays to update the docs.(Click here)in the development of the ETL pipeline using the Delta Lake.
you can download hereAdatis_PerformanceEnhancements_20200311the databricks notebook with the sample codes above that you can import and run on Databrick Community Edition (DBCE). You must extract and import the HTML into DBCE.
If you want to know how we can help you with your data,please contact.
Dataflow Parte 2: Toppointer and Azure Data Lake
The Common Data Model in Azure Data Lake Storage - Export to Data Lake
The Common Data Model in Azure Data Lake Storage - Power BI Dataflows