Skip to content

Commit 0c362e9

Browse files
authored
HIVE-29546: Iceberg: [V3] Support of ROW LINEAGE in COMPACTION (#6407)
1 parent f6e75cd commit 0c362e9

7 files changed

Lines changed: 799 additions & 69 deletions

File tree

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
9595
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
9696
import org.apache.hadoop.hive.ql.metadata.Partition;
97+
import org.apache.hadoop.hive.ql.metadata.RowLineageUtils;
9798
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
9899
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
99100
import org.apache.hadoop.hive.ql.parse.AlterTableSnapshotRefSpec;
@@ -313,8 +314,9 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String
313314
tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), opType);
314315
SessionStateUtil.getResource(conf, SessionStateUtil.MISSING_COLUMNS)
315316
.ifPresent(cols -> map.put(SessionStateUtil.MISSING_COLUMNS, String.join(",", (HashSet<String>) cols)));
316-
SessionStateUtil.getResource(conf, SessionStateUtil.ROW_LINEAGE)
317-
.ifPresent(v -> map.put(SessionStateUtil.ROW_LINEAGE, v.toString()));
317+
if (RowLineageUtils.isRowLineageInsert(conf)) {
318+
map.put(SessionStateUtil.ROW_LINEAGE, Boolean.toString(true));
319+
}
318320

319321
}
320322

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java

Lines changed: 107 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hive.ql.metadata.Hive;
3838
import org.apache.hadoop.hive.ql.metadata.HiveException;
3939
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
40+
import org.apache.hadoop.hive.ql.metadata.RowLineageUtils;
4041
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
4142
import org.apache.hadoop.hive.ql.parse.TransformSpec;
4243
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -69,20 +70,23 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
6970

7071
HiveConf conf = new HiveConf(context.getConf());
7172
CompactionInfo ci = context.getCompactionInfo();
73+
7274
String compactionQuery = buildCompactionQuery(context, compactTableName, conf);
7375

7476
SessionState sessionState = setupQueryCompactionSession(conf, ci, tblProperties);
77+
7578
String compactionTarget = "table " + HiveUtils.unparseIdentifier(compactTableName) +
7679
(ci.partName != null ? ", partition " + HiveUtils.unparseIdentifier(ci.partName) : "");
7780

7881
try {
79-
DriverUtils.runOnDriver(conf, sessionState, compactionQuery);
82+
DriverUtils.runOnDriver(sessionState.getConf(), sessionState, compactionQuery);
8083
LOG.info("Completed compaction for {}", compactionTarget);
8184
return true;
8285
} catch (HiveException e) {
8386
LOG.error("Failed compacting {}", compactionTarget, e);
8487
throw e;
8588
} finally {
89+
RowLineageUtils.disableRowLineage(sessionState);
8690
sessionState.setCompaction(false);
8791
}
8892
}
@@ -94,59 +98,113 @@ private String buildCompactionQuery(CompactorContext context, String compactTabl
9498
context.getTable().getTableName());
9599
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
96100
String orderBy = ci.orderByClause == null ? "" : ci.orderByClause;
97-
String fileSizePredicate = null;
98-
String compactionQuery;
99-
100-
if (ci.type == CompactionType.MINOR) {
101-
long fileSizeInBytesThreshold = CompactionEvaluator.getFragmentSizeBytes(table.getParameters());
102-
fileSizePredicate = String.format("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)",
103-
VirtualColumn.FILE_PATH.getName(), compactTableName, fileSizeInBytesThreshold);
104-
conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD, fileSizeInBytesThreshold);
105-
// IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format
106-
// doesn't support vectorization, hence disabling it in this case.
107-
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
108-
}
109-
110-
if (ci.partName == null) {
111-
if (!icebergTable.spec().isPartitioned()) {
112-
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name());
113-
compactionQuery = String.format("insert overwrite table %s select * from %<s %2$s %3$s", compactTableName,
114-
fileSizePredicate == null ? "" : "where " + fileSizePredicate, orderBy);
115-
} else if (icebergTable.specs().size() > 1) {
116-
// Compacting partitions of old partition specs on a partitioned table with partition evolution
117-
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
118-
// A single filter on a virtual column causes errors during compilation,
119-
// added another filter on file_path as a workaround.
120-
compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " +
121-
"where %2$s != %3$d and %4$s is not null %5$s %6$s",
122-
compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
123-
VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : "and " + fileSizePredicate, orderBy);
101+
String fileSizePredicate = buildMinorFileSizePredicate(ci, compactTableName, conf, table);
102+
103+
String columnsList = "*";
104+
if (RowLineageUtils.supportsRowLineage(table)) {
105+
RowLineageUtils.enableRowLineage(conf);
106+
LOG.debug("Row lineage flag set for compaction of table {}", compactTableName);
107+
if (ci.isMajorCompaction() && ci.partName == null) {
108+
columnsList = buildSelectColumnList(icebergTable, conf) + RowLineageUtils.getRowLineageColumnsForCompaction();
124109
} else {
125-
// Partitioned table without partition evolution with partition spec as null in the compaction request - this
126-
// code branch is not supposed to be reachable
127-
throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION);
110+
columnsList = columnsList + RowLineageUtils.getRowLineageColumnsForCompaction();
128111
}
129-
} else {
130-
HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false);
131-
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
112+
}
113+
114+
String compactionQuery = (ci.partName == null) ?
115+
buildFullTableCompactionQuery(compactTableName, conf, icebergTable,
116+
columnsList, fileSizePredicate, orderBy) :
117+
buildPartitionCompactionQuery(ci, compactTableName, conf, icebergTable,
118+
columnsList, fileSizePredicate, orderBy);
119+
120+
LOG.info("Compaction query: {}", compactionQuery);
121+
return compactionQuery;
122+
}
123+
124+
private static String buildMinorFileSizePredicate(
125+
CompactionInfo ci, String compactTableName, HiveConf conf, org.apache.hadoop.hive.ql.metadata.Table table) {
126+
if (ci.type != CompactionType.MINOR) {
127+
return null;
128+
}
129+
130+
long fileSizeInBytesThreshold = CompactionEvaluator.getFragmentSizeBytes(table.getParameters());
131+
conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD, fileSizeInBytesThreshold);
132+
// IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format
133+
// doesn't support vectorization, hence disabling it in this case.
134+
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
135+
136+
return String.format("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)",
137+
VirtualColumn.FILE_PATH.getName(), compactTableName, fileSizeInBytesThreshold);
138+
}
139+
140+
private String buildFullTableCompactionQuery(
141+
String compactTableName,
142+
HiveConf conf,
143+
Table icebergTable,
144+
String columnsList,
145+
String fileSizePredicate,
146+
String orderBy) throws HiveException {
147+
148+
if (!icebergTable.spec().isPartitioned()) {
149+
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name());
150+
return String.format("insert overwrite table %1$s select %2$s from %1$s %3$s %4$s",
151+
compactTableName, columnsList,
152+
fileSizePredicate == null ? "" : "where " + fileSizePredicate, orderBy);
153+
}
154+
155+
if (icebergTable.specs().size() > 1) {
156+
// Compacting partitions of old partition specs on a partitioned table with partition evolution
132157
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
133-
conf.set(IcebergCompactionService.PARTITION_PATH, new Path(ci.partName).toString());
134-
135-
PartitionSpec spec;
136-
String partitionPredicate;
137-
try {
138-
spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName);
139-
partitionPredicate = buildPartitionPredicate(ci, spec);
140-
} catch (MetaException e) {
141-
throw new HiveException(e);
142-
}
158+
// A single filter on a virtual column causes errors during compilation,
159+
// added another filter on file_path as a workaround.
160+
return String.format("insert overwrite table %1$s select %2$s from %1$s " +
161+
"where %3$s != %4$d and %5$s is not null %6$s %7$s",
162+
compactTableName, columnsList,
163+
VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
164+
VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : "and " + fileSizePredicate, orderBy);
165+
}
143166

144-
compactionQuery = String.format("INSERT OVERWRITE TABLE %1$s SELECT * FROM %1$s WHERE %2$s IN " +
145-
"(SELECT FILE_PATH FROM %1$s.FILES WHERE %3$s AND SPEC_ID = %4$d) %5$s %6$s",
146-
compactTableName, VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(),
147-
fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy);
167+
// Partitioned table without partition evolution with partition spec as null in the compaction request - this
168+
// code branch is not supposed to be reachable
169+
throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION);
170+
}
171+
172+
private String buildPartitionCompactionQuery(
173+
CompactionInfo ci,
174+
String compactTableName,
175+
HiveConf conf,
176+
Table icebergTable,
177+
String columnsList,
178+
String fileSizePredicate,
179+
String orderBy) throws HiveException {
180+
HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false);
181+
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
182+
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
183+
conf.set(IcebergCompactionService.PARTITION_PATH, new Path(ci.partName).toString());
184+
185+
PartitionSpec spec;
186+
String partitionPredicate;
187+
try {
188+
spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName);
189+
partitionPredicate = buildPartitionPredicate(ci, spec);
190+
} catch (MetaException e) {
191+
throw new HiveException(e);
148192
}
149-
return compactionQuery;
193+
194+
return String.format("INSERT OVERWRITE TABLE %1$s SELECT %2$s FROM %1$s WHERE %3$s IN " +
195+
"(SELECT FILE_PATH FROM %1$s.FILES WHERE %4$s AND SPEC_ID = %5$d) %6$s %7$s",
196+
compactTableName, columnsList, VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(),
197+
fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy);
198+
}
199+
200+
/**
201+
* Builds a comma-separated SELECT list from the Iceberg table schema.
202+
*/
203+
private static String buildSelectColumnList(Table icebergTable, HiveConf conf) {
204+
return icebergTable.schema().columns().stream()
205+
.map(Types.NestedField::name)
206+
.map(col -> HiveUtils.unparseIdentifier(col, conf))
207+
.collect(Collectors.joining(", "));
150208
}
151209

152210
private String buildPartitionPredicate(CompactionInfo ci, PartitionSpec spec) throws MetaException {
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
-- SORT_QUERY_RESULTS
2+
3+
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
4+
--! qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
5+
--! qt:replace:/(MINOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
6+
--! qt:replace:/(MINOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
7+
-- Mask compaction id as they will be allocated in parallel threads
8+
--! qt:replace:/^(\d+)(\t.*\tmanual\ticeberg\t)/#Masked#$2/
9+
10+
set hive.llap.io.enabled=true;
11+
set hive.vectorized.execution.enabled=true;
12+
13+
create database if not exists ice_comp_all with dbproperties('hive.compactor.worker.pool'='iceberg');
14+
use ice_comp_all;
15+
16+
-- Partitioned table with minor and major compaction
17+
create table part_tbl(
18+
id int,
19+
data string
20+
)
21+
partitioned by (dept_id int)
22+
stored by iceberg stored as parquet
23+
tblproperties (
24+
'format-version'='3',
25+
'hive.compactor.worker.pool'='iceberg',
26+
-- Use target.size only to make Parquet data files (~996 bytes) count as fragments.
27+
-- Default fragment ratio is 8, so fragment_size = target_size / 8.
28+
-- Pick target_size > 996 * 8 (7968) so files are treated as fragment files and minor compaction is eligible.
29+
'compactor.threshold.target.size'='8000',
30+
'compactor.threshold.min.input.files'='2',
31+
'compactor.threshold.delete.file.ratio'='0.0'
32+
);
33+
34+
insert into part_tbl values (1,'p1', 10);
35+
insert into part_tbl values (2,'p2', 10);
36+
insert into part_tbl values (3,'p3', 20);
37+
insert into part_tbl values (4,'p4', 20);
38+
39+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
40+
FROM part_tbl
41+
ORDER BY ROW__LINEAGE__ID;
42+
43+
alter table part_tbl compact 'minor' and wait pool 'iceberg';
44+
45+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
46+
FROM part_tbl
47+
ORDER BY ROW__LINEAGE__ID;
48+
49+
show compactions;
50+
51+
-- For MAJOR eligibility, avoid treating files as "fragments" by lowering target.size
52+
alter table part_tbl set tblproperties ('compactor.threshold.target.size'='1500');
53+
54+
merge into part_tbl t
55+
using (select 1 as id, 'p1_upd' as data, 10 as dept_id) s
56+
on t.dept_id = s.dept_id and t.id = s.id
57+
when matched then update set data = s.data;
58+
59+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
60+
FROM part_tbl
61+
ORDER BY ROW__LINEAGE__ID;
62+
63+
alter table part_tbl compact 'major' and wait pool 'iceberg';
64+
65+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
66+
FROM part_tbl
67+
ORDER BY ROW__LINEAGE__ID;
68+
69+
show compactions;
70+
71+
-- Partition evolution
72+
alter table part_tbl set tblproperties ('compactor.threshold.target.size'='8000');
73+
74+
alter table part_tbl set partition spec(dept_id, id);
75+
76+
insert into part_tbl values (5,'p5', 10);
77+
insert into part_tbl values (6,'p6', 20);
78+
79+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
80+
FROM part_tbl
81+
ORDER BY ROW__LINEAGE__ID;
82+
83+
alter table part_tbl compact 'minor' and wait pool 'iceberg';
84+
85+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
86+
FROM part_tbl
87+
ORDER BY ROW__LINEAGE__ID;
88+
89+
show compactions;
90+
91+
-- Unpartitioned table with minor and major compaction
92+
create table unpart_tbl(
93+
id int,
94+
data string
95+
)
96+
stored by iceberg stored as parquet
97+
tblproperties (
98+
'format-version'='3',
99+
'hive.compactor.worker.pool'='iceberg',
100+
-- Use target.size only to make Parquet data files (~996 bytes) count as fragments (default ratio 8).
101+
'compactor.threshold.target.size'='8000',
102+
'compactor.threshold.min.input.files'='2',
103+
'compactor.threshold.delete.file.ratio'='0.0'
104+
);
105+
106+
insert into unpart_tbl values (1,'a');
107+
insert into unpart_tbl values (2,'b');
108+
insert into unpart_tbl values (3,'c');
109+
insert into unpart_tbl values (4,'d');
110+
111+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
112+
FROM unpart_tbl
113+
ORDER BY ROW__LINEAGE__ID;
114+
115+
alter table unpart_tbl compact 'minor' and wait pool 'iceberg';
116+
117+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
118+
FROM unpart_tbl
119+
ORDER BY ROW__LINEAGE__ID;
120+
121+
show compactions;
122+
123+
-- For MAJOR eligibility, avoid treating files as "fragments" by lowering target.size, then create deletes via MERGE.
124+
alter table unpart_tbl set tblproperties ('compactor.threshold.target.size'='1500');
125+
126+
merge into unpart_tbl t
127+
using (select 1 as id, 'a_upd' as data) s
128+
on t.id = s.id
129+
when matched then update set data = s.data;
130+
131+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
132+
FROM unpart_tbl
133+
ORDER BY ROW__LINEAGE__ID;
134+
135+
alter table unpart_tbl compact 'major' and wait pool 'iceberg';
136+
137+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
138+
FROM unpart_tbl
139+
ORDER BY ROW__LINEAGE__ID;
140+
141+
show compactions;

0 commit comments

Comments
 (0)