데이터 엔지니어링 정복/Iceberg

[Iceberg] Iceberg Guide Book Summary | CHAPTER 4. Optimizing the Performance of Iceberg Tables

noohhee 2025. 3. 9. 21:09
728x90
반응형


CHAPTER 4 Optimizing the Performance of Iceberg Tables

Compaction

 

When you are querying your

Apache Iceberg tables, you need to open and scan each file and then close the file

when you’re done. The more files you have to scan for a query, the greater the cost

these file operations will put on your query.

 

it is possible to run into

the “small files problem,” where too many small files have an impact on the speed

and performance of your scans because you’re doing more file operations, have a lot

more metadata to read (there is metadata on each file), and have to delete more files

when doing cleanup and maintenance operations.

 

 

there are fixed costs you can’t avoid and

variable costs you can avoid using different strategies.

 

The solution to this problem is to periodically take the data in all these small files

and rewrite it into fewer larger files

 

This process

is called compaction, as you are compacting many files into a few.

 

 

 

Hands-on with Compaction

 

Apache Iceberg’s Actions

package includes several maintenance procedures (the Actions package is specifically

for Apache Spark, but other engines can create their own maintenance operation

implementation).

 

Table table = catalog.loadTable("myTable");

SparkActions

.get()

.rewriteDataFiles(table)

.option("rewrite-job-order", "files-desc")

.execute();

 

we initiated a new instance of our table and then triggered rewrite

DataFiles, which is the Spark action for compaction.

 

There are several methods you can chain between the call to rewriteDataFiles and

the execute method that begins the job:

 

binPack

Sets the compaction strategy to binpack (discussed later), which is the default and

doesn’t need to be explicitly supplied

 

Sort

Changes the compaction strategy to sort the data rewritten by one or more fields

in a priority order

 

zOrder

Changes the compaction strategy to z-order–sort the data based on multiple

fields with equal weighting

 

filter

Enables you to pass an expression used to limit which files are rewritten

 

option

Changes a single option

 

options

Takes a map of several option configurations

 

 

 

There are several possible options you can pass to configure the job; here are a few

important ones:

 

target-file-size-bytes

This will set the intended size of the output files.

By default, this will use the write.target.file-size-bytes property of the table, which defaults to 512 MB.

 

max-concurrent-file-group-rewrites

This is the ceiling for the number of file groups to write simultaneously.

 

max-file-group-size-bytes

The maximum size of a file group is not one single file.

This setting should be used when dealing with partitions larger than the memory available to the worker writing a particular file group so that it can split that partition into multiple file groups to be written concurrently.

 

partial-progress-enabled

This allows commits to occur while file groups are compacted. Therefore, for

long-running compaction, this can allow concurrent queries to benefit from

already compacted files.

 

partial-progress-max-commits

If partial progress is enabled, this setting sets the maximum number of commits

allowed to complete the job.

 

rewrite-job-order

The order to write file groups, which can matter when using partial progress to

make sure the higher-priority file groups are committed sooner rather than later,

can be based on the groups ordered by byte size or number of files in a group

(bytes-asc, bytes-desc, files-asc, files-desc, none).

 

 

 

which include call procedures that can be called using the following

syntax from Spark SQL:

 

-- using positional arguments

CALL catalog.system.procedure(arg1, arg2, arg3)

-- using named arguments

CALL catalog.system.procedure(argkey1 => argval1, argkey2 => argval2)

 

-- Rewrite Data Files CALL Procedure in SparkSQL

CALL catalog.system.rewrite_data_files(

table => 'musicians',

strategy => 'binpack',

where => 'genre = "rock"',

options => map(

'rewrite-job-order','bytes-asc',

'target-file-size-bytes','1073741824', -- 1GB

'max-file-group-size-bytes','10737418240' -- 10GB

)

)

 

 

Compaction Strategies

 

there are several compaction strategies that you can use when

using the rewriteDataFiles procedure.

 

Automating Compaction

 

Here are a couple of approaches you can take to automate these jobs:

Airflow

cron jobs

 

 

 

Sorting

 

Sorting or “clustering” your data has a very particular benefit when it comes to your

queries: it helps limit the number of files that need to be scanned to get the data

needed for a query.

 

by having the data sorted, you’ve

reduced the number of files you have to scan

 

 

 

 

if you wanted to rewrite the entire dataset with all players sorted by

team globally, you could run the following statement:

 

CALL catalog.system.rewrite_data_files(

table => 'nfl_teams',

strategy => 'sort',

sort_order => 'team ASC NULLS LAST'

)

 

You can sort by additional fields as well.

 

 

CALL catalog.system.rewrite_data_files(

table => 'nfl_teams',

strategy => 'sort',

sort_order => 'team ASC NULLS LAST, name ASC NULLS FIRST'

)

 

 

 

 

Z-order

 

There are times when multiple fields are a priority when querying a table, and this

is where a z-order sort may be quite helpful.

 

if we know what data we are looking for based on fields we z-ordered by, we

can possibly avoid searching large portions of the data since it’s sorted by both

fields.

 

 

even if you only searched by age, you’d see a benefit from clustering by being able

to eliminate at least two of the four quadrants.

 

CALL catalog.system.rewrite_data_files(

table => 'people',

strategy => 'sort',

sort_order => 'zorder(age,height)'

)

 

 

 

it comes with some challenges. First, as new data is

ingested, it becomes unsorted, and until the next compaction job, the data remains

somewhat scattered across multiple files.

 

Second, files may still contain data for multiple values of the sorted

field, which can be inefficient for queries that only require data with a specific value.

For instance, in the earlier example, files contained data for both Lions and Packers

players, making it inefficient to scan Packers records when you were only interested

in Lions players.

To deal with this, we have partitioning.

 

 

 

Partitioning

 

Partitioning by day, month, or year on a timestamp column required you to

create an additional column based on the timestamp expressing the year, month,

or day in isolation.

 

 

Hidden Partitioning

내용이해도움되는 글: https://hoony-612.tistory.com/87

 

Since you don’t need an additional column when using these transforms, you store less in your datafiles.

if you create a table partitioned by month:

 

 

CREATE TABLE catalog.MyTable (...) PARTITIONED BY months(time) USING iceberg;

 

the following query would benefit from partitioning:

 

SELECT * FROM MYTABLE WHERE time BETWEEN '2022-07-01 00:00:00' AND '2022-07-31

00:00:00';

 

 

you apply transforms like

a function on the target column being transformed. Several transforms are available

when planning your partitioning:

 

 

year (just the year)

month (month and year)

day (day, month, and year

hour (hours, day, month, and year)

truncate

bucket

 

 

The year, month, day, and hour transforms work on a timestamp column.

 

Keep in

mind that if you specify month, the partition values as tracked in the metadata will

reflect the month and year of the timestamp, and if you use day, they will reflect the

year, month, and day of the timestamp, so there is no need to use multiple transforms

for more granular partitioning.

 

The truncate transform partitions the table based on the truncated value of a col‐

umn.

 

CREATE TABLE catalog.MyTable (...) PARTITIONED BY truncate(name, 1) USING ice

berg;

 

The bucket transform is perfect for partitioning based on a field with high cardinality

(lots of unique values).

 

The bucket transform will use a hash function to distribute

the records across a specified number of buckets.

 

 

Partition Evolution

 

So, if the data in two different

files were written based on two different partition schemes, the Iceberg metadata

would make the engine aware so that it could create a plan with partition scheme A

separately from partition scheme B, creating an overall scan plan at the end.

 

For example, let’s say you have a table of membership records partitioned by the year

in which members registered:

 

CREATE TABLE catalog.members (...) PARTITIONED BY years(registration_ts) USING

iceberg;

 

Then, several years later, the pace of membership growth made it worthwhile to

start breaking the records down by month.

 

ALTER TABLE catalog.members ADD PARTITION FIELD months(registration_ts)

 

 

When a partitioning scheme is updated, it only applies to new data written to the

table going forward, so there is no need to rewrite the existing data. Also, keep in

mind that any data rewritten by the rewriteDataFiles procedure will be rewritten

using the new partitioning scheme, so if you want to keep older data in the old

scheme, make sure to use the proper filters in your compaction jobs to not rewrite it.

 

 

 

Other Partitioning Considerations

 

Say you migrate a Hive table using the migrate procedure (discussed in Chapter 14).

It may currently be partitioned on a derived column (e.g., a month column based on a

timestamp column in the same table), but you want to express to Apache Iceberg that

it should use an Iceberg transform instead. There is a REPLACE PARTITION command

for just this purpose:

 

ALTER TABLE catalog.members REPLACE PARTITION FIELD registration_day WITH

days(registration_ts) AS day_of_registration;

 

 

Copy-on-Write Versus Merge-on-Read

when you want to update preexisting rows to either update or delete
them, there are some considerations you need to be aware of:

 

There are three approaches to dealing with row-level updates, covered in detail
throughout this section and summarized in Table 4-2.

 

 

Copy-on-Write

The default approach is referred to as copy-on-write (COW).

In this approach, if even
a single row in a datafile is updated or deleted, that datafile is rewritten and the new
file takes its place in the new snapshot.

The pros
of this approach include faster reads, while the cons involve slower row-level updates
and deletes.

 

 

Merge-on-Read

you capture in a delete file the records to be updated in the
existing file, with the delete file tracking which records should be ignored.

• The record is listed in a delete file.
• When a reader reads the table it will reconcile the datafile with the delete file.

 

If you are updating a record:
• The record to be updated is tracked in a delete file.
• A new datafile is created with only the updated record.
• When a reader reads the table it will ignore the old version of the record because
of the delete file and use the new version in the new datafile.

 

To minimize the cost of reads, you’ll want to run regular compaction jobs, and to
keep those compaction jobs running efficiently, you’ll want to take advantage of some
of the properties you learned before:

 

• Use a filter/where clause to only run compaction on the files ingested in the last
time frame (hour, day).
• Use partial progress mode to make commits as file groups are rewritten so that
readers can start seeing marginal improvements sooner rather than later.

 

 

We’ll use an analogy to help you
understand the high-level concept between the different types of delete files. (Keep in
mind which types of delete files are written, as this is usually decided by the engine
for particular use cases, not typically by table settings.)

 

When you have a ton of data and you want to kick out a specific row, you have a
couple of options:
1. You can look for the row data based on where it’s sitting in the dataset, kind of
like finding your friend in a movie theater by their seat number.
2. You can look for the row data based on what it’s made of, like picking out your
friend in a crowd because they’re wearing a bright red hat.

 

If you use option 1, you’ll use what are called positional delete files. But if you use
option 2, you’ll need equality delete files. Each method has its own strengths and
weaknesses. This means that depending on the situation, you might want to pick one
over the other. It’s all about what works best for you!

 

When reading the specified files the position delete file will skip the row at the
specified position.

However, this has write time costs, since
the writer of the delete file will need to know the position of the deleted record, which
requires it to read the file with the deleted records to identify those positions.

 

 

Equality deletes instead specify values that, if a record matches, should be ignored.

 

This requires no write time costs since you don’t need to open and read files to track
the targeted values, but it has much greater read time costs. The read time costs
exist because there is no information where records with matching values exist, so
when reading the data there has to be a comparison with every record that could
possibly contain a matching record. Equality deletes are great if you need the highest
write speed possible, but aggressive compaction should be planned to reconcile those
equality deletes to reduce the impact on your reads.

 

 

Configuring COW and MOR

The following table properties determine whether a particular transaction is handled
via COW or MOR:

write.delete.mode
  Approach to use for delete transactions
write.update.mode
  Approach to use for update transactions
write.merge.mode
  Approach to use for merge transactions

 

 

all these properties are honored from within Spark, and they can be
set at the creation of a table in Spark like so:

CREATE TABLE catalog.people (
id int,
first_name string,
last_name string
) TBLPROPERTIES (
'write.delete.mode'='copy-on-write',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
) USING iceberg;

This property can also be set after the table is created using an ALTER TABLE statement:

ALTER TABLE catalog.people SET TBLPROPERTIES (
'write.delete.mode'='merge-on-read',
'write.update.mode'='copy-on-write',
'write.merge.mode'='copy-on-write'
);

 

• Table properties may or may not be honored. It’s up to the engine to implement
support.
• When using MOR, make sure the engines you use to query your data can read
delete files.

 

 

Other Considerations

Metrics Collection

the manifest for each group of datafiles is tracking metrics for
each field in the table to help with min/max filtering and other optimizations. The
types of column-level metrics that are tracked include:
• Counts of values, null values, and distinct values
• Upper and lower bound values

 

If you have very wide tables (i.e., tables with lots of fields; e.g., 100+), the number
of metrics being tracked can start to become a burden on reading your metadata.

 

you can fine-tune which columns
have their metrics tracked and which columns don’t.

 

You can tailor the level of metrics collection for the columns you want (you don’t
need to specify all of them) using table properties, like so:

ALTER TABLE catalog.db.students SET TBLPROPERTIES (
'write.metadata.metrics.column.col1'='none',
'write.metadata.metrics.column.col2'='full',
'write.metadata.metrics.column.col3'='counts',
'write.metadata.metrics.column.col4'='truncate(16)',
);

 

-none
Don’t collect any metrics.
-counts
Only collect counts (values, distinct values, null values).
-truncate(XX)
Count and truncate the value to a certain number of characters, and base the
upper/lower bounds on that. So, for example, a string column may be truncated
to 16 characters and have its metadata value ranges be based on the abbreviated
string values.
-full
Base the counts and upper/lower bounds on the full value.

 

You don’t need to set this explicitly for every column as, by default, Iceberg sets this to
truncate(16).

 

Rewriting Manifests

While manifests are more lightweight, more manifests still
means more file operations.

There is a separate rewriteManifests procedure to
rewrite only the manifest files so that you have a smaller total number of manifest
files, and those manifest files list a large number of datafiles:

CALL catalog.system.rewrite_manifests('MyTable')

If you run into any memory issues while running this operation, you can turn off
Spark caching by passing a second argument: false.

CALL catalog.system.rewrite_manifests('MyTable', false)

When it would be good to run this operation is a matter of when your datafile sizes
are optimal but the number of manifest files isn’t.

 

 

Optimizing Storage

To prevent storing a bunch of unneeded data, you should periodically
expire snapshots. Keep in mind that you cannot time-travel to an expired snapshot.

 

During expiration, any datafiles not associated with still-valid snapshots will get
deleted.

 

You can expire snapshots that were created on or before a particular timestamp:

CALL catalog.system.expire_snapshots('MyTable', TIMESTAMP '2023-02-01
00:00:00.000', 100)

 

But if the snapshot falls within the 100 most recent snapshots,
it will not expire.

You can also expire particular snapshot IDs:

CALL catalog.system.expire_snapshots(table => 'MyTable', snapshot_ids =>
ARRAY(53))

 

There are six arguments that can be passed to the expire_snapshots procedure:

-table
Table to run the operation on
-older_than
Expires all snapshots on or before this timestamp
-retain_last
Minimum number of snapshots to retain
-snapshot_ids
Specific snapshot IDs to expire
-max_concurrent _deletes
Number of threads to use for deleting files
-stream_results
When true, sends deleted files to the Spark driver by Resilient Distributed Dataset
(RDD) partition, which is useful for avoiding OOM issues when deleting
large files

 

 

Another consideration when optimizing storage is orphan files.

These are files and
artifacts that accumulate in the table’s data directory but are not tracked in the
metadata tree because they were written by failed jobs. These files will not be cleaned
up by expiring snapshots, so a special procedure should sporadically be run to deal
with this.

 

This can be an intensive process.

To delete orphan files, run a command such as the following:

CALL catalog.system.remove_orphan_files(table => 'MyTable')

You can pass the following arguments to the removeOrphanFiles procedure:

-table
Table to operate on
-older_than
Only deletes files created on or before this timestamp

-location
Where to look for orphan files; defaults to the table’s default location
-dry_run
Boolean if true; won’t delete files, but will return a list of what would be deleted
-max_concurrent_deletes
Lists the max number of threads for deleting files

 

Write Distribution Mode

The write distribution is how the records to be written are distributed across these tasks. 

If no specific write distribution mode is set, data will be distributed arbitrarily. 

The first X number of records will go to the first task, the next X number to the next task, and so on.

 

Therefore, if you have 10 records that belong in partition A distributed across 10 tasks, you will end up with 10 files in that partition with one record each, which isn’t ideal.

It would be better if all the records for that partition were allocated to the same tasks so that they can be written to the same file.

There are three options:

-none
There is no special distribution. This is the fastest during write time and is ideal for presorted data.
-hash
The data is hash-distributed by partition key.
-range
The data is range-distributed by partition key or sort order.

 

In a hash distribution, the value of each record is put through a hash function and grouped together based on the result. 

Multiple values may end up in the same grouping based on the hash function.

 

 

In a range distribution, the data is sorted and distributed, so you’d likely have values 1 and 2 in task A, 3 and 4 in task B, and 5 and 6 in task C. 

This sorting will be done by the partition value or by the SortOrder if the table has one.

 

In other words, if a SortOrder is specified, data will be grouped into tasks not just by partition value but
also by the value of the SortOrder field.

However, sorting the data for distribution sequentially has more overhead than throwing the data in a hash function and distributing it based on the output.

 

There is also a write distribution property to specify the behavior for deletes, updates, and merges:

ALTER TABLE catalog.MyTable SET TBLPROPERTIES (
'write.distribution-mode'='hash',
'write.delete.distribution-mode'='none',
'write.update.distribution-mode'='range',
'write.merge.distribution-mode'='hash',
);

In a situation where you are regularly updating many rows but rarely deleting rows, you may want to have different distribution modes, as a different distribution mode may be more advantageous depending on your query patterns.

 

 

 

Object Storage Considerations

Object storage, which most cloud vendors provide, is ideal for data lakes and data lakehouses, but it has one potential bottleneck.

Because of the architecture and how object stores handle parallelism, there are often limits on how many requests can go to files under the same “prefix.”

Therefore, if you wanted to access /prefix1/fleA.txt and /prefix1/fileB.txt, even though they are different files, accessing both counts toward the limit on prefix1.

This becomes a problem in partitions with lots of files, as queries can result in many requests to these partitions and can then run into throttling, which slows down the query.

 

 

You can enable this in your table properties like so:

ALTER TABLE catalog.MyTable SET TBLPROPERTIES (
'write.object-storage.enabled'= true
);

This will distribute files in the same partition across many prefixes, including a hash to avoid potential throttling.

So, instead of this:

s3://bucket/database/table/field=value1/datafile1.parquet
s3://bucket/database/table/field=value1/datafile2.parquet
s3://bucket/database/table/field=value1/datafile3.parquet

you’ll get this:

s3://bucket/4809098/database/table/field=value1/datafile1.parquet
s3://bucket/5840329/database/table/field=value1/datafile2.parquet
s3://bucket/2342344/database/table/field=value1/datafile3.parquet

 

Datafile Bloom Filters

A bloom filter is a way of knowing whether a value possibly exists in a dataset.

For example, let’s say we feed 1,000 records through a bloom filter that has 10 bits. When it’s done, our bloom filter might look like this:

[0,1,1,0,0,1,1,1,1,0]

 

Now let’s say we want to find a certain value; we’ll call it X.

We put X through the same hash function, and it points us to spot number 3 on our bit lineup.

According to our bloom filter, there’s a 1 in that third spot. 

This means there’s a chance our value X could be in the dataset because a value hashed to this spot before. 

So we go ahead and check the dataset to see if X is really there.

 

Now let’s look for a different value; we’ll call it Y. 

When we run Y through our hash function, it points us to the fourth spot on our bit lineup. 

But our bloom filter has a 0 there, which means no value hashed to this spot. 

So we can confidently say that Y is definitely not in our dataset, and we can save time by not digging through the data.

 

Bloom filters are handy because they can help us avoid unnecessary data scans.

 

the more we add, the bigger our bloom filter gets, and the more space it will need. As with most things in life, it’s a balancing act. 

Everything is a trade-off.

 

You can enable the writing of bloom filters for a particular column in your Parquet files (this can also be done for ORC files) via your table properties:

ALTER TABLE catalog.MyTable SET TBLPROPERTIES (
'write.parquet.bloom-filter-enabled.column.col1'= true,
'write.parquet.bloom-filter-max-bytes'= 1048576
);

 

 

728x90
반응형