지구정복
[Iceberg] Iceberg Guide Book Summary 본문
apache iceberg guidebook에서 가져온 내용입니다.
A data warehouse acts as a centralized repository for organizations to store all their
data coming in from a multitude of sources, allowing data consumers such as analysts
and BI engineers to access data easily and quickly from one single source to start their analysis
The Data Lake
While data warehouses provided a mechanism for running analytics on structured
data, they still had several issues:
What Is Apache Iceberg?
Apache Iceberg is a table format
It arose from the need to overcome challenges with performance, consistency,
and many of the challenges previously stated with the Hive table format.
Key Features of Apache Iceberg
ACID transactions
Partition evolution
Too often, when your partitioning needs to change the only choice you have is to rewrite the entire table,
and at scale this can get very expensive.
With Apache Iceberg you can update how the table is partitioned at any time without
the need to rewrite the table and all its data. Since partitioning has everything to do
with the metadata, the operations needed to make this change to your table’s structure
are quick and cheap.
Hidden partitioning
Sometimes users don’t know how a table is physically partitioned, and frankly, they
shouldn’t have to care.
Row-level table operations
copy-on-write (COW) or merge-on-read (MOR)
When using COW, for a change of any row in a given datafile, the entire file is rewritten
When using MOR, for any row-level updates, only a new file that contains the changes to the affected row
that is reconciled on reads is written.
Time travel
Apache Iceberg provides immutable snapshots, so the information for the table’s
historical state is accessible, allowing you to run queries on the state of the table at a
given point in time in the past, or what’s commonly known as time travel.
Version rollback
it also reverts the table’s current state to any of those previous snapshots. Therefore,
undoing mistakes is as easy as rolling back
Schema evolution
Regardless of how your table needs to evolve,
Apache Iceberg gives you robust schema evolution features—for example, updating
an int column to a long column as values in the column get larger.
CHAPTER 2
The Architecture of Apache Iceberg
The Apache Iceberg Architecture
The Data Layer
what stores the actual data of the table.
Datafiles
Datafiles store the data itself.
Apache Parquet, Apache ORC, and Apache Avro
in the real world the file format most commonly used is Apache Parquet.
Delete Files
Delete files track which records in the dataset have been deleted.
Since it’s a best practice to treat data lake storage as immutable, you can’t update rows in a file in place.
Instead, you need to write a new file.
it can be a new file that only has the changes written, which engines reading the data then coalesce
That is, delete files only apply to MOR tables
Positional delete files
These delete files specify the exact position of rows within a data file that should be considered deleted. They are used when the physical location of the data (i.e., the row's position in the file) is known.
Equality delete files
These delete files mark rows for deletion based on specific column values rather than their position. For example, suppose a record with a particular ID needs to be deleted. In that case, an equality delete file can specify that any row matching this ID should be excluded from query results.
Key Fields in a Delete File
Here are some of the critical fields you’ll find inside a delete file:
- file_path: This field indicates the path of the data file to which the delete file applies. It’s essential for mapping the delete operations to the correct data file in the dataset.
- pos: Present in position delete files, this field specifies the exact position of the row within the data file that should be marked as deleted. This allows for precise, row-level deletions based on the physical layout of the data.
- row: In equality delete files, the row field contains the values that identify which rows should be deleted. For instance, if a particular ID needs to be deleted across multiple data files, this field will hold that ID value.
- partition: This field contains the partition information of the data that is subject to deletion. It helps ensure that the delete file is applied only to the relevant partitions, further optimizing the deletion process.
- sequence_number: Iceberg uses sequence numbers to track the order of changes made to the data. The sequence_number in a delete file indicates when the deletion was committed relative to other changes in the dataset.
The Metadata Layer
The metadata layer is an integral part of an Iceberg table’s architecture and contains
all the metadata files for an Iceberg table.
Manifest Files
Manifest files keep track of files in the data layer (i.e., datafiles and delete files) as
well as additional details and statistics about each file, such as the minimum and
maximum values for a datafile’s columns.
Manifest Lists
A manifest list is a snapshot of an Iceberg table at a given point in time.
A manifest list contains an array of structs, with each struct keeping track of a single
manifest file.
Metadata Files
Manifest lists are tracked by metadata files.
Puffin Files
A puffin file stores statistics and indexes about the data in
the table that improve the performance of an even broader range of queries, such as
the aforementioned example, than the statistics stored in the datafiles and metadata
files.
The Catalog
This central place where you go to find the current location of the current metadata
pointer is the Iceberg catalog.
The primary requirement for an Iceberg catalog is that
it must support atomic operations for updating the current metadata pointer. This
support for atomic operations is required so that all readers and writers see the same
state of the table at a given point in time.
Within the catalog, there is a reference or pointer for each table to that table’s current
metadata file.
CHAPTER 3
Lifecycle of Write and Read Queries
Writing Queries in Apache Iceberg
Create the Table
Send the query to the engine
Write the metadata file
Update the catalog file to commit changes
Insert the Query
Send the query to the engine
Check the catalog
Write the datafiles and metadata files
Update the catalog file to commit changes
Merge Query
Send the query to the engine
Check the catalog
Write datafiles and metadata files
Update the catalog file to commit changes
Reading Queries in Apache Iceberg
The SELECT Query
Send the query to the engine
Check the catalog
Get information from the metadata file
Get information from the manifest list
Get information from the manifest file
The Time-travel Query
Apache Iceberg provides
two ways to run time-travel queries: using a timestamp and using a snapshot ID
To analyze our order table’s history, we will query the history metadata table
# Spark SQL
SELECT * FROM catalog.db.orders.history;
the timestamp or the snapshot ID we will be targeting is the second one. This
is the query that we will run:
# Spark SQL
SELECT * FROM orders
TIMESTAMP AS OF '2023-03-07 20:45:08.914'
Send the query to the engine
Check the catalog
Get information from the metadata file
Get information from the manifest list
Get information from the manifest file
CHAPTER 4
Optimizing the Performance
of Iceberg Tables
Compaction
When you are querying your
Apache Iceberg tables, you need to open and scan each file and then close the file
when you’re done. The more files you have to scan for a query, the greater the cost
these file operations will put on your query.
it is possible to run into
the “small files problem,” where too many small files have an impact on the speed
and performance of your scans because you’re doing more file operations, have a lot
more metadata to read (there is metadata on each file), and have to delete more files
when doing cleanup and maintenance operations.
there are fixed costs you can’t avoid and
variable costs you can avoid using different strategies.
The solution to this problem is to periodically take the data in all these small files
and rewrite it into fewer larger files
This process
is called compaction, as you are compacting many files into a few.
Hands-on with Compaction
Apache Iceberg’s Actions
package includes several maintenance procedures (the Actions package is specifically
for Apache Spark, but other engines can create their own maintenance operation
implementation).
Table table = catalog.loadTable("myTable");
SparkActions
.get()
.rewriteDataFiles(table)
.option("rewrite-job-order", "files-desc")
.execute();
we initiated a new instance of our table and then triggered rewrite
DataFiles, which is the Spark action for compaction.
There are several methods you can chain between the call to rewriteDataFiles and
the execute method that begins the job:
binPack
Sets the compaction strategy to binpack (discussed later), which is the default and
doesn’t need to be explicitly supplied
Sort
Changes the compaction strategy to sort the data rewritten by one or more fields
in a priority order
zOrder
Changes the compaction strategy to z-order–sort the data based on multiple
fields with equal weighting
filter
Enables you to pass an expression used to limit which files are rewritten
option
Changes a single option
options
Takes a map of several option configurations
There are several possible options you can pass to configure the job; here are a few
important ones:
target-file-size-bytes
This will set the intended size of the output files.
By default, this will use the write.target.file-size-bytes property of the table, which defaults to 512 MB.
max-concurrent-file-group-rewrites
This is the ceiling for the number of file groups to write simultaneously.
max-file-group-size-bytes
The maximum size of a file group is not one single file.
This setting should be used when dealing with partitions larger than the memory available to the worker writing a particular file group so that it can split that partition into multiple file groups to be written concurrently.
partial-progress-enabled
This allows commits to occur while file groups are compacted. Therefore, for
long-running compaction, this can allow concurrent queries to benefit from
already compacted files.
partial-progress-max-commits
If partial progress is enabled, this setting sets the maximum number of commits
allowed to complete the job.
rewrite-job-order
The order to write file groups, which can matter when using partial progress to
make sure the higher-priority file groups are committed sooner rather than later,
can be based on the groups ordered by byte size or number of files in a group
(bytes-asc, bytes-desc, files-asc, files-desc, none).
which include call procedures that can be called using the following
syntax from Spark SQL:
-- using positional arguments
CALL catalog.system.procedure(arg1, arg2, arg3)
-- using named arguments
CALL catalog.system.procedure(argkey1 => argval1, argkey2 => argval2)
-- Rewrite Data Files CALL Procedure in SparkSQL
CALL catalog.system.rewrite_data_files(
table => 'musicians',
strategy => 'binpack',
where => 'genre = "rock"',
options => map(
'rewrite-job-order','bytes-asc',
'target-file-size-bytes','1073741824', -- 1GB
'max-file-group-size-bytes','10737418240' -- 10GB
)
)
Compaction Strategies
there are several compaction strategies that you can use when
using the rewriteDataFiles procedure.
Automating Compaction
Here are a couple of approaches you can take to automate these jobs:
Airflow
cron jobs
Sorting
Sorting or “clustering” your data has a very particular benefit when it comes to your
queries: it helps limit the number of files that need to be scanned to get the data
needed for a query.
by having the data sorted, you’ve
reduced the number of files you have to scan
if you wanted to rewrite the entire dataset with all players sorted by
team globally, you could run the following statement:
CALL catalog.system.rewrite_data_files(
table => 'nfl_teams',
strategy => 'sort',
sort_order => 'team ASC NULLS LAST'
)
You can sort by additional fields as well.
CALL catalog.system.rewrite_data_files(
table => 'nfl_teams',
strategy => 'sort',
sort_order => 'team ASC NULLS LAST, name ASC NULLS FIRST'
)
Z-order
There are times when multiple fields are a priority when querying a table, and this
is where a z-order sort may be quite helpful.
if we know what data we are looking for based on fields we z-ordered by, we
can possibly avoid searching large portions of the data since it’s sorted by both
fields.
even if you only searched by age, you’d see a benefit from clustering by being able
to eliminate at least two of the four quadrants.
CALL catalog.system.rewrite_data_files(
table => 'people',
strategy => 'sort',
sort_order => 'zorder(age,height)'
)
it comes with some challenges. First, as new data is
ingested, it becomes unsorted, and until the next compaction job, the data remains
somewhat scattered across multiple files.
Second, files may still contain data for multiple values of the sorted
field, which can be inefficient for queries that only require data with a specific value.
For instance, in the earlier example, files contained data for both Lions and Packers
players, making it inefficient to scan Packers records when you were only interested
in Lions players.
To deal with this, we have partitioning.
Partitioning
Partitioning by day, month, or year on a timestamp column required you to
create an additional column based on the timestamp expressing the year, month,
or day in isolation.
Hidden Partitioning
https://hoony-612.tistory.com/87
Since you don’t need an additional column when using these transforms, you store less in your datafiles.
if you create a table partitioned by month:
CREATE TABLE catalog.MyTable (...) PARTITIONED BY months(time) USING iceberg;
the following query would benefit from partitioning:
SELECT * FROM MYTABLE WHERE time BETWEEN '2022-07-01 00:00:00' AND '2022-07-31
00:00:00';
you apply transforms like
a function on the target column being transformed. Several transforms are available
when planning your partitioning:
year (just the year)
month (month and year)
day (day, month, and year
hour (hours, day, month, and year)
truncate
bucket
The year, month, day, and hour transforms work on a timestamp column.
Keep in
mind that if you specify month, the partition values as tracked in the metadata will
reflect the month and year of the timestamp, and if you use day, they will reflect the
year, month, and day of the timestamp, so there is no need to use multiple transforms
for more granular partitioning.
The truncate transform partitions the table based on the truncated value of a col‐
umn.
CREATE TABLE catalog.MyTable (...) PARTITIONED BY truncate(name, 1) USING ice
berg;
The bucket transform is perfect for partitioning based on a field with high cardinality
(lots of unique values).
The bucket transform will use a hash function to distribute
the records across a specified number of buckets.
Partition Evolution
So, if the data in two different
files were written based on two different partition schemes, the Iceberg metadata
would make the engine aware so that it could create a plan with partition scheme A
separately from partition scheme B, creating an overall scan plan at the end.
For example, let’s say you have a table of membership records partitioned by the year
in which members registered:
CREATE TABLE catalog.members (...) PARTITIONED BY years(registration_ts) USING
iceberg;
Then, several years later, the pace of membership growth made it worthwhile to
start breaking the records down by month.
ALTER TABLE catalog.members ADD PARTITION FIELD months(registration_ts)
When a partitioning scheme is updated, it only applies to new data written to the
table going forward, so there is no need to rewrite the existing data. Also, keep in
mind that any data rewritten by the rewriteDataFiles procedure will be rewritten
using the new partitioning scheme, so if you want to keep older data in the old
scheme, make sure to use the proper filters in your compaction jobs to not rewrite it.
Other Partitioning Considerations
Say you migrate a Hive table using the migrate procedure (discussed in Chapter 14).
It may currently be partitioned on a derived column (e.g., a month column based on a
timestamp column in the same table), but you want to express to Apache Iceberg that
it should use an Iceberg transform instead. There is a REPLACE PARTITION command
for just this purpose:
ALTER TABLE catalog.members REPLACE PARTITION FIELD registration_day WITH
days(registration_ts) AS day_of_registration;
Copy-on-Write Versus Merge-on-Read