반응형
Notice
Recent Posts
Recent Comments
Link
관리 메뉴

지구정복

[Iceberg] Iceberg Guide Book Summary 본문

데이터 엔지니어링 정복/Iceberg

[Iceberg] Iceberg Guide Book Summary

noohhee 2025. 3. 4. 21:03
728x90
반응형

 

apache iceberg guidebook에서 가져온 내용입니다.

 

 

 

 

A data warehouse acts as a centralized repository for organizations to store all their

data coming in from a multitude of sources, allowing data consumers such as analysts

and BI engineers to access data easily and quickly from one single source to start their analysis

 

 

 

 

The Data Lake

While data warehouses provided a mechanism for running analytics on structured

data, they still had several issues:

 

 

 

What Is Apache Iceberg?

 

Apache Iceberg is a table format

It arose from the need to overcome challenges with performance, consistency,

and many of the challenges previously stated with the Hive table format.

 

 

 

 

Key Features of Apache Iceberg

 

ACID transactions

 

 

Partition evolution

 

Too often, when your partitioning needs to change the only choice you have is to rewrite the entire table,

and at scale this can get very expensive.

 

With Apache Iceberg you can update how the table is partitioned at any time without

the need to rewrite the table and all its data. Since partitioning has everything to do

with the metadata, the operations needed to make this change to your table’s structure

are quick and cheap.

 

 

 

Hidden partitioning

 

Sometimes users don’t know how a table is physically partitioned, and frankly, they

shouldn’t have to care.

 

 

Row-level table operations

copy-on-write (COW) or merge-on-read (MOR)

When using COW, for a change of any row in a given datafile, the entire file is rewritten

When using MOR, for any row-level updates, only a new file that contains the changes to the affected row

that is reconciled on reads is written.

 

 

Time travel

Apache Iceberg provides immutable snapshots, so the information for the table’s

historical state is accessible, allowing you to run queries on the state of the table at a

given point in time in the past, or what’s commonly known as time travel.

 

 

 

Version rollback

it also reverts the table’s current state to any of those previous snapshots. Therefore,

undoing mistakes is as easy as rolling back

 

 

Schema evolution

Regardless of how your table needs to evolve,

Apache Iceberg gives you robust schema evolution features—for example, updating

an int column to a long column as values in the column get larger.

 

CHAPTER 2

The Architecture of Apache Iceberg

 

 

The Apache Iceberg Architecture

 

 

The Data Layer

what stores the actual data of the table.

 

 

Datafiles

Datafiles store the data itself.

Apache Parquet, Apache ORC, and Apache Avro

 

in the real world the file format most commonly used is Apache Parquet.

 

 

 

Delete Files

Delete files track which records in the dataset have been deleted. 

Since it’s a best practice to treat data lake storage as immutable, you can’t update rows in a file in place.

Instead, you need to write a new file.

it can be a new file that only has the changes written, which engines reading the data then coalesce

 

 

That is, delete files only apply to MOR tables

 

 

 

 

Positional delete files

These delete files specify the exact position of rows within a data file that should be considered deleted. They are used when the physical location of the data (i.e., the row's position in the file) is known.

 

 

Equality delete files

 

These delete files mark rows for deletion based on specific column values rather than their position. For example, suppose a record with a particular ID needs to be deleted. In that case, an equality delete file can specify that any row matching this ID should be excluded from query results.

 

 

Key Fields in a Delete File

Here are some of the critical fields you’ll find inside a delete file:

  • file_path: This field indicates the path of the data file to which the delete file applies. It’s essential for mapping the delete operations to the correct data file in the dataset.
  • pos: Present in position delete files, this field specifies the exact position of the row within the data file that should be marked as deleted. This allows for precise, row-level deletions based on the physical layout of the data.
  • row: In equality delete files, the row field contains the values that identify which rows should be deleted. For instance, if a particular ID needs to be deleted across multiple data files, this field will hold that ID value.
  • partition: This field contains the partition information of the data that is subject to deletion. It helps ensure that the delete file is applied only to the relevant partitions, further optimizing the deletion process.
  • sequence_number: Iceberg uses sequence numbers to track the order of changes made to the data. The sequence_number in a delete file indicates when the deletion was committed relative to other changes in the dataset.

 

 

 

 

The Metadata Layer

 

The metadata layer is an integral part of an Iceberg table’s architecture and contains

all the metadata files for an Iceberg table.

 

Manifest Files

 

Manifest files keep track of files in the data layer (i.e., datafiles and delete files) as

well as additional details and statistics about each file, such as the minimum and

maximum values for a datafile’s columns.

 

Manifest Lists

 

A manifest list is a snapshot of an Iceberg table at a given point in time.

A manifest list contains an array of structs, with each struct keeping track of a single

manifest file.

 

 

 

Metadata Files

 

Manifest lists are tracked by metadata files.

 

 

Puffin Files

A puffin file stores statistics and indexes about the data in

the table that improve the performance of an even broader range of queries, such as

the aforementioned example, than the statistics stored in the datafiles and metadata

files.

 

 

 

 

 

The Catalog

This central place where you go to find the current location of the current metadata

pointer is the Iceberg catalog. 

The primary requirement for an Iceberg catalog is that

it must support atomic operations for updating the current metadata pointer. This

support for atomic operations is required so that all readers and writers see the same

state of the table at a given point in time.

 

Within the catalog, there is a reference or pointer for each table to that table’s current

metadata file.

 

 

 

CHAPTER 3

Lifecycle of Write and Read Queries

 

 

 

Writing Queries in Apache Iceberg

 

 

Create the Table

Send the query to the engine

Write the metadata file

Update the catalog file to commit changes

 

Insert the Query

 

Send the query to the engine

Check the catalog

Write the datafiles and metadata files

Update the catalog file to commit changes

 

 

 

Merge Query

Send the query to the engine

Check the catalog

Write datafiles and metadata files

Update the catalog file to commit changes

 

 

 

 

 

Reading Queries in Apache Iceberg

 

The SELECT Query

Send the query to the engine

Check the catalog

Get information from the metadata file

Get information from the manifest list

Get information from the manifest file

 

 

 

The Time-travel Query

 

 

Apache Iceberg provides

two ways to run time-travel queries: using a timestamp and using a snapshot ID

 

To analyze our order table’s history, we will query the history metadata table

 

 

# Spark SQL

SELECT * FROM catalog.db.orders.history;

 

 

 

the timestamp or the snapshot ID we will be targeting is the second one. This

is the query that we will run:

 

# Spark SQL

SELECT * FROM orders

TIMESTAMP AS OF '2023-03-07 20:45:08.914'

 

 

Send the query to the engine

Check the catalog

Get information from the metadata file

Get information from the manifest list

Get information from the manifest file

 

 

CHAPTER 4

Optimizing the Performance

of Iceberg Tables

 

 

Compaction

 

When you are querying your

Apache Iceberg tables, you need to open and scan each file and then close the file

when you’re done. The more files you have to scan for a query, the greater the cost

these file operations will put on your query.

 

it is possible to run into

the “small files problem,” where too many small files have an impact on the speed

and performance of your scans because you’re doing more file operations, have a lot

more metadata to read (there is metadata on each file), and have to delete more files

when doing cleanup and maintenance operations.

 

 

there are fixed costs you can’t avoid and

variable costs you can avoid using different strategies.

 

The solution to this problem is to periodically take the data in all these small files

and rewrite it into fewer larger files

 

This process

is called compaction, as you are compacting many files into a few.

 

 

 

Hands-on with Compaction

 

Apache Iceberg’s Actions

package includes several maintenance procedures (the Actions package is specifically

for Apache Spark, but other engines can create their own maintenance operation

implementation).

 

Table table = catalog.loadTable("myTable");

SparkActions

.get()

.rewriteDataFiles(table)

.option("rewrite-job-order", "files-desc")

.execute();

 

we initiated a new instance of our table and then triggered rewrite

DataFiles, which is the Spark action for compaction.

 

There are several methods you can chain between the call to rewriteDataFiles and

the execute method that begins the job:

 

binPack

Sets the compaction strategy to binpack (discussed later), which is the default and

doesn’t need to be explicitly supplied

 

Sort

Changes the compaction strategy to sort the data rewritten by one or more fields

in a priority order

 

zOrder

Changes the compaction strategy to z-order–sort the data based on multiple

fields with equal weighting

 

filter

Enables you to pass an expression used to limit which files are rewritten

 

option

Changes a single option

 

options

Takes a map of several option configurations

 

 

 

There are several possible options you can pass to configure the job; here are a few

important ones:

 

target-file-size-bytes

This will set the intended size of the output files.

By default, this will use the write.target.file-size-bytes property of the table, which defaults to 512 MB.

 

max-concurrent-file-group-rewrites

This is the ceiling for the number of file groups to write simultaneously.

 

max-file-group-size-bytes

The maximum size of a file group is not one single file.

This setting should be used when dealing with partitions larger than the memory available to the worker writing a particular file group so that it can split that partition into multiple file groups to be written concurrently.

 

partial-progress-enabled

This allows commits to occur while file groups are compacted. Therefore, for

long-running compaction, this can allow concurrent queries to benefit from

already compacted files.

 

partial-progress-max-commits

If partial progress is enabled, this setting sets the maximum number of commits

allowed to complete the job.

 

rewrite-job-order

The order to write file groups, which can matter when using partial progress to

make sure the higher-priority file groups are committed sooner rather than later,

can be based on the groups ordered by byte size or number of files in a group

(bytes-asc, bytes-desc, files-asc, files-desc, none).

 

 

 

which include call procedures that can be called using the following

syntax from Spark SQL:

 

-- using positional arguments

CALL catalog.system.procedure(arg1, arg2, arg3)

-- using named arguments

CALL catalog.system.procedure(argkey1 => argval1, argkey2 => argval2)

 

-- Rewrite Data Files CALL Procedure in SparkSQL

CALL catalog.system.rewrite_data_files(

table => 'musicians',

strategy => 'binpack',

where => 'genre = "rock"',

options => map(

'rewrite-job-order','bytes-asc',

'target-file-size-bytes','1073741824', -- 1GB

'max-file-group-size-bytes','10737418240' -- 10GB

)

)

 

 

Compaction Strategies

 

there are several compaction strategies that you can use when

using the rewriteDataFiles procedure.

 

Automating Compaction

 

Here are a couple of approaches you can take to automate these jobs:

Airflow

cron jobs

 

 

 

Sorting

 

Sorting or “clustering” your data has a very particular benefit when it comes to your

queries: it helps limit the number of files that need to be scanned to get the data

needed for a query.

 

by having the data sorted, you’ve

reduced the number of files you have to scan

 

 

 

 

if you wanted to rewrite the entire dataset with all players sorted by

team globally, you could run the following statement:

 

CALL catalog.system.rewrite_data_files(

table => 'nfl_teams',

strategy => 'sort',

sort_order => 'team ASC NULLS LAST'

)

 

You can sort by additional fields as well.

 

 

CALL catalog.system.rewrite_data_files(

table => 'nfl_teams',

strategy => 'sort',

sort_order => 'team ASC NULLS LAST, name ASC NULLS FIRST'

)

 

 

 

 

Z-order

 

There are times when multiple fields are a priority when querying a table, and this

is where a z-order sort may be quite helpful.

 

if we know what data we are looking for based on fields we z-ordered by, we

can possibly avoid searching large portions of the data since it’s sorted by both

fields.

 

 

even if you only searched by age, you’d see a benefit from clustering by being able

to eliminate at least two of the four quadrants.

 

CALL catalog.system.rewrite_data_files(

table => 'people',

strategy => 'sort',

sort_order => 'zorder(age,height)'

)

 

 

 

it comes with some challenges. First, as new data is

ingested, it becomes unsorted, and until the next compaction job, the data remains

somewhat scattered across multiple files.

 

Second, files may still contain data for multiple values of the sorted

field, which can be inefficient for queries that only require data with a specific value.

For instance, in the earlier example, files contained data for both Lions and Packers

players, making it inefficient to scan Packers records when you were only interested

in Lions players.

To deal with this, we have partitioning.

 

 

 

Partitioning

 

Partitioning by day, month, or year on a timestamp column required you to

create an additional column based on the timestamp expressing the year, month,

or day in isolation.

 

 

Hidden Partitioning

 

https://hoony-612.tistory.com/87

 

Since you don’t need an additional column when using these transforms, you store less in your datafiles.

if you create a table partitioned by month:

 

 

CREATE TABLE catalog.MyTable (...) PARTITIONED BY months(time) USING iceberg;

 

the following query would benefit from partitioning:

 

SELECT * FROM MYTABLE WHERE time BETWEEN '2022-07-01 00:00:00' AND '2022-07-31

00:00:00';

 

 

you apply transforms like

a function on the target column being transformed. Several transforms are available

when planning your partitioning:

 

 

year (just the year)

month (month and year)

day (day, month, and year

hour (hours, day, month, and year)

truncate

bucket

 

 

The year, month, day, and hour transforms work on a timestamp column.

 

Keep in

mind that if you specify month, the partition values as tracked in the metadata will

reflect the month and year of the timestamp, and if you use day, they will reflect the

year, month, and day of the timestamp, so there is no need to use multiple transforms

for more granular partitioning.

 

The truncate transform partitions the table based on the truncated value of a col‐

umn.

 

CREATE TABLE catalog.MyTable (...) PARTITIONED BY truncate(name, 1) USING ice

berg;

 

The bucket transform is perfect for partitioning based on a field with high cardinality

(lots of unique values).

 

The bucket transform will use a hash function to distribute

the records across a specified number of buckets.

 

 

Partition Evolution

 

 

So, if the data in two different

files were written based on two different partition schemes, the Iceberg metadata

would make the engine aware so that it could create a plan with partition scheme A

separately from partition scheme B, creating an overall scan plan at the end.

 

For example, let’s say you have a table of membership records partitioned by the year

in which members registered:

 

CREATE TABLE catalog.members (...) PARTITIONED BY years(registration_ts) USING

iceberg;

 

Then, several years later, the pace of membership growth made it worthwhile to

start breaking the records down by month.

 

ALTER TABLE catalog.members ADD PARTITION FIELD months(registration_ts)

 

 

When a partitioning scheme is updated, it only applies to new data written to the

table going forward, so there is no need to rewrite the existing data. Also, keep in

mind that any data rewritten by the rewriteDataFiles procedure will be rewritten

using the new partitioning scheme, so if you want to keep older data in the old

scheme, make sure to use the proper filters in your compaction jobs to not rewrite it.

 

 

 

Other Partitioning Considerations

 

Say you migrate a Hive table using the migrate procedure (discussed in Chapter 14).

It may currently be partitioned on a derived column (e.g., a month column based on a

timestamp column in the same table), but you want to express to Apache Iceberg that

it should use an Iceberg transform instead. There is a REPLACE PARTITION command

for just this purpose:

 

ALTER TABLE catalog.members REPLACE PARTITION FIELD registration_day WITH

days(registration_ts) AS day_of_registration;

 

 

Copy-on-Write Versus Merge-on-Read

 

728x90
반응형
Comments