Iceberg connector#
Overview#
Apache Iceberg is an open table format for huge analytic datasets. The Iceberg connector allows querying data stored in files written in Iceberg format, as defined in the Iceberg Table Spec. It supports Apache Iceberg table spec version 1.
The Iceberg table state is maintained in metadata files. All changes to table state create a new metadata file and replace the old metadata with an atomic swap. The table metadata file tracks the table schema, partitioning config, custom properties, and snapshots of the table contents.
Iceberg data files can be stored in either Parquet or ORC format, as
determined by the format
property in the table definition. The
table format
defaults to ORC
.
Iceberg is designed to improve on the known scalability limitations of Hive, which stores table metadata in a metastore that is backed by a relational database such as MySQL. It tracks partition locations in the metastore, but not individual data files. Trino queries using the Hive connector must first call the metastore to get partition locations, then call the underlying filesystem to list all data files inside each partition, and then read metadata from each data file.
Since Iceberg stores the paths to data files in the metadata files, it only consults the underlying file system for files that must be read.
Requirements#
To use Iceberg, you need:
Network access from the Trino coordinator and workers to the distributed object storage.
Access to a Hive metastore service (HMS).
Network access from the Trino coordinator to the HMS. Hive metastore access with the Thrift protocol defaults to using port 9083.
Configuration#
Iceberg supports the same metastore configuration properties as the Hive connector.
At a minimum, hive.metastore.uri
must be configured:
connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083
Property name |
Description |
Default |
---|---|---|
|
Define the data storage file format for Iceberg tables. Possible values are
|
|
|
The compression codec to be used when writing files. Possible values are
|
|
|
Maximum number of partitions handled per writer. |
100 |
Authorization checks#
You can enable authorization checks for the connector by setting
the iceberg.security
property in the catalog properties file. This
property must be one of the following values:
Property value |
Description |
---|---|
|
No authorization checks are enforced. |
|
The connector relies on system-level access control. |
|
Operations that read data or metadata, such as SELECT are permitted. No operations that write data or metadata, such as CREATE TABLE, INSERT, or DELETE are allowed. |
|
Authorization checks are enforced using a catalog-level access control
configuration file whose path is specified in the |
SQL support#
This connector provides read access and write access to data and metadata in Iceberg. In addition to the globally available and read operation statements, the connector supports the following features:
DELETE, see also Deletion by partition
ALTER MATERIALIZED VIEW SET PROPERTIES#
The connector does not support ALTER MATERIALIZED VIEW SET PROPERTIES statements.
Type mapping#
Both Iceberg and Trino have types that are not supported by the Iceberg connector. The following sections explain their type mapping.
Iceberg to Trino type mapping#
Trino supports selecting Iceberg data types. The following table shows the Iceberg to Trino type mapping:
Iceberg type |
Trino type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Trino to Iceberg type mapping#
Trino supports creating tables with the following types in Iceberg. The table shows the mappings from Trino to Iceberg data types:
Trino type |
Iceberg type |
Notes |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Other precisions not supported |
|
|
Other precisions not supported |
|
|
Other precisions not supported |
|
|
|
|
|
|
|
|
|
|
|
All fields must have a name |
|
|
|
|
|
Partitioned tables#
Iceberg supports partitioning by specifying transforms over the table columns. A partition is created for each unique tuple value produced by the transforms. Identity transforms are simply the column name. Other transforms are:
Transform |
Description |
---|---|
|
A partition is created for each year. The partition value is the
integer difference in years between |
|
A partition is created for each month of each year. The partition
value is the integer difference in months between |
|
A partition is created for each day of each year. The partition
value is the integer difference in days between |
|
A partition is created hour of each day. The partition value is a timestamp with the minutes and seconds set to zero. |
|
The data is hashed into the specified number of buckets. The
partition value is an integer hash of |
|
The partition value is the first |
In this example, the table is partitioned by the month of order_date
, a hash of
account_number
(with 10 buckets), and country
:
CREATE TABLE iceberg.testdb.customer_orders (
order_id BIGINT,
order_date DATE,
account_number BIGINT,
customer VARCHAR,
country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country'])
Deletion by partition#
For partitioned tables, the Iceberg connector supports the deletion of entire
partitions if the WHERE
clause specifies filters only on the identity-transformed
partitioning columns, that can match entire partitions. Given the table definition
above, this SQL will delete all partitions for which country
is US
:
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US'
Currently, the Iceberg connector only supports deletion by partition.
This SQL below will fail because the WHERE
clause selects only some of the rows
in the partition:
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US' AND customer = 'Freds Foods'
Rolling back to a previous snapshot#
Iceberg supports a “snapshot” model of data, where table snapshots are identified by an snapshot IDs.
The connector provides a system snapshots table for each Iceberg table. Snapshots are
identified by BIGINT snapshot IDs. You can find the latest snapshot ID for table
customer_orders
by running the following command:
SELECT snapshot_id FROM iceberg.testdb."customer_orders$snapshots" ORDER BY committed_at DESC LIMIT 1
A SQL procedure system.rollback_to_snapshot
allows the caller to roll back
the state of the table to a previous snapshot id:
CALL iceberg.system.rollback_to_snapshot('testdb', 'customer_orders', 8954597067493422955)
Schema evolution#
Iceberg and the Iceberg connector support schema evolution, with safe column add, drop, reorder and rename operations, including in nested structures. Table partitioning can also be changed and the connector can still query data created before the partitioning change.
Migrating existing tables#
The connector can read from or write to Hive tables that have been migrated to Iceberg. There is no Trino support for migrating Hive tables to Iceberg, so you need to either use the Iceberg API or Apache Spark.
System tables and columns#
The connector supports queries of the table partitions. Given a table customer_orders
,
SELECT * FROM iceberg.testdb."customer_orders$partitions"
shows the table partitions, including the minimum
and maximum values for the partition columns.
Iceberg table properties#
Property Name |
Description |
---|---|
|
Optionally specifies the format of table data files;
either |
|
Optionally specifies table partitioning.
If a table is partitioned by columns |
|
Optionally specifies the file system location URI for the table. |
The table definition below specifies format Parquet, partitioning by columns c1
and c2
,
and a file system location of /var/my_tables/test_table
:
CREATE TABLE test_table (
c1 integer,
c2 date,
c3 double)
WITH (
format = 'PARQUET',
partitioning = ARRAY['c1', 'c2'],
location = '/var/my_tables/test_table')
Metadata tables#
The connector exposes several metadata tables for each Iceberg table. These metadata tables contain information about the internal structure of the Iceberg table. You can query each metadata table by appending the metadata table name to the table name:
SELECT * FROM "test_table$data"
$data
table#
The $data
table is an alias for the Iceberg table itself.
The statement:
SELECT * FROM "test_table$data"
is equivalent to:
SELECT * FROM test_table
$properties
table#
The $properties
table provides access to general information about Iceberg
table configuration and any additional metadata key/value pairs that the table
is tagged with.
You can retrieve the properties of the current snapshot of the Iceberg
table test_table
by using the following query:
SELECT * FROM "test_table$properties"
key | value |
-----------------------+----------+
write.format.default | PARQUET |
format-version | 2 |
$history
table#
The $history
table provides a log of the metadata changes performed on
the Iceberg table.
You can retrieve the changelog of the Iceberg table test_table
by using the following query:
SELECT * FROM "test_table$history"
made_current_at | snapshot_id | parent_id | is_current_ancestor
----------------------------------+----------------------+----------------------+--------------------
2022-01-10 08:11:20 Europe/Vienna | 8667764846443717831 | <null> | true
2022-01-10 08:11:34 Europe/Vienna | 7860805980949777961 | 8667764846443717831 | true
The output of the query has the following columns:
Name |
Type |
Description |
---|---|---|
|
|
The time when the snapshot became active |
|
|
The identifier of the snapshot |
|
|
The identifier of the parent snapshot |
|
|
Whether or not this snapshot is an ancestor of the current snapshot |
$snapshots
table#
The $snapshots
table provides a detailed view of snapshots of the
Iceberg table. A snapshot consists of one or more file manifests,
and the complete table contents is represented by the union
of all the data files in those manifests.
You can retrieve the information about the snapshots of the Iceberg table
test_table
by using the following query:
SELECT * FROM "test_table$snapshots"
committed_at | snapshot_id | parent_id | operation | manifest_list | summary
----------------------------------+----------------------+----------------------+--------------------+------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2022-01-10 08:11:20 Europe/Vienna | 8667764846443717831 | <null> | append | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/snap-8667764846443717831-1-100cf97e-6d56-446e-8961-afdaded63bc4.avro | {changed-partition-count=0, total-equality-deletes=0, total-position-deletes=0, total-delete-files=0, total-files-size=0, total-records=0, total-data-files=0}
2022-01-10 08:11:34 Europe/Vienna | 7860805980949777961 | 8667764846443717831 | append | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/snap-7860805980949777961-1-faa19903-1455-4bb8-855a-61a1bbafbaa7.avro | {changed-partition-count=1, added-data-files=1, total-equality-deletes=0, added-records=1, total-position-deletes=0, added-files-size=442, total-delete-files=0, total-files-size=442, total-records=1, total-data-files=1}
The output of the query has the following columns:
Name |
Type |
Description |
---|---|---|
|
|
The time when the snapshot became active |
|
|
The identifier for the snapshot |
|
|
The identifier for the parent snapshot |
|
|
The type of operation performed on the Iceberg table. The supported operation types in Iceberg are:
|
|
|
The list of avro manifest files containing the detailed information about the snapshot changes. |
|
|
A summary of the changes made from the previous snapshot to the current snapshot |
$manifests
table#
The $manifests
table provides a detailed overview of the manifests
corresponding to the snapshots performed in the log of the Iceberg table.
You can retrieve the information about the manifests of the Iceberg table
test_table
by using the following query:
SELECT * FROM "test_table$manifests"
path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partitions
----------------------------------------------------------------------------------------------------------------+-----------------+----------------------+-----------------------+--------------------------+-----------------------------+-----------------------------+----------------------------------------------------------------------------------------------------------------------------
hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/faa19903-1455-4bb8-855a-61a1bbafbaa7-m0.avro | 6277 | 0 | 7860805980949777961 | 1 | 0 | 0 |{{contains_null=false, lower_bound=1, upper_bound=1},{contains_null=false, lower_bound=2021-01-12, upper_bound=2021-01-12}}
The output of the query has the following columns:
Name |
Type |
Description |
---|---|---|
|
|
The manifest file location |
|
|
The manifest file length |
|
|
The identifier for the partition specification used to write the manifest file |
|
|
The identifier of the snapshot during which this manifest entry has been added |
|
|
The number of data files with status |
|
|
The number of data files with status |
|
|
The number of data files with status |
|
|
Partition range metadata |
$partitions
table#
The $partitions
table provides a detailed overview of the partitions
of the Iceberg table.
You can retrieve the information about the partitions of the Iceberg table
test_table
by using the following query:
SELECT * FROM "test_table$partitions"
partition | record_count | file_count | total_size | data
-----------------------+---------------+---------------+---------------+--------------------------------------
{c1=1, c2=2021-01-12} | 2 | 2 | 884 | {c3={min=1.0, max=2.0, null_count=0}}
{c1=1, c2=2021-01-13} | 1 | 1 | 442 | {c3={min=1.0, max=1.0, null_count=0}}
The output of the query has the following columns:
Name |
Type |
Description |
---|---|---|
|
|
A row which contains the mapping of the partition column name(s) to the partition column value(s) |
|
|
The number of records in the partition |
|
|
The number of files mapped in the partition |
|
|
The size of all the files in the partition |
|
|
Partition range metadata |
$files
table#
The $files
table provides a detailed overview of the data files in current snapshot of the Iceberg table.
To retrieve the information about the data files of the Iceberg table test_table
use the following query:
SELECT * FROM "test_table$files"
content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids
----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+---------------
0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | <null> | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | <null> | <null> | <null>
The output of the query has the following columns:
Name |
Type |
Description |
---|---|---|
|
|
Type of content stored in the file. The supported content types in Iceberg are:
|
|
|
The data file location |
|
|
The format of the data file |
|
|
The number of entries contained in the data file |
|
|
The data file size |
|
|
Mapping between the Iceberg column ID and its corresponding size in the file |
|
|
Mapping between the Iceberg column ID and its corresponding count of entries in the file |
|
|
Mapping between the Iceberg column ID and its corresponding count of |
|
|
Mapping between the Iceberg column ID and its corresponding count of non numerical values in the file |
|
|
Mapping between the Iceberg column ID and its corresponding lower bound in the file |
|
|
Mapping between the Iceberg column ID and its corresponding upper bound in the file |
|
|
Metadata about the encryption key used to encrypt this file, if applicable |
|
|
List of recommended split locations |
|
|
The set of field IDs used for equality comparison in equality delete files |
Materialized views#
The Iceberg connector supports Materialized views management. In the underlying system each materialized view consists of a view definition and an Iceberg storage table. The storage table name is stored as a materialized view property. The data is stored in that storage table.
You can use the Iceberg table properties to control the created storage
table and therefore the layout and performance. For example, you can use the
following clause with CREATE MATERIALIZED VIEW to use the ORC format
for the data files and partition the storage per day using the column
_date
:
WITH ( format = 'ORC', partitioning = ARRAY['event_date'] )
Updating the data in the materialized view with REFRESH MATERIALIZED VIEW deletes the data from the storage table, and inserts the data that is the result of executing the materialized view query into the existing table.
Warning
There is a small time window between the commit of the delete and insert, when the materialized view is empty. If the commit operation for the insert fails, the materialized view remains empty.
Dropping a materialized view with DROP MATERIALIZED VIEW removes the definition and the storage table.