Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,38 @@ public void processElement(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
}

// This works only when all files are using the same partition spec.
private void appendDataFiles(Table table, Iterable<FileWriteResult> fileWriteResults) {
AppendFiles update = table.newAppend();
private void appendDataFiles(Table table, Iterable<FileWriteResult> fileWriteResults)
throws IOException {
Map<Integer, PartitionSpec> specs = table.specs();
FileIO io = table.io();
String uuid = UUID.randomUUID().toString();

Map<String, List<DataFile>> byPartition = new HashMap<>();
for (FileWriteResult result : fileWriteResults) {
DataFile dataFile = result.getDataFile(table.specs());
update.appendFile(dataFile);
DataFile dataFile = result.getDataFile(specs);
String partitionPath = result.getSerializableDataFile().getPartitionPath();
byPartition.computeIfAbsent(partitionPath, k -> new ArrayList<>()).add(dataFile);
committedDataFileByteSize.update(dataFile.fileSizeInBytes());
committedDataFileRecordCount.update(dataFile.recordCount());
}

AppendFiles update = table.newAppend();
int manifestIdx = 0;
for (Map.Entry<String, List<DataFile>> entry : byPartition.entrySet()) {
List<DataFile> files = entry.getValue();
PartitionSpec spec =
files.get(0).specId() >= 0
? Preconditions.checkStateNotNull(specs.get(files.get(0).specId()))
: table.spec();
ManifestWriter<DataFile> writer =
createManifestWriter(
table.location(), uuid + "-" + manifestIdx++, spec, io);
for (DataFile file : files) {
writer.add(file);
}
writer.close();
update.appendManifest(writer.toManifestFile());
Comment on lines +178 to +185

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are two issues here:

  1. createManifestWriter is not a standard Iceberg API and is not defined in this class, which will cause a compilation error.
  2. The ManifestWriter is not closed within a try-finally or try-with-resources block. If an exception occurs while adding files, the writer will leak resources.

We can resolve both issues by using the standard ManifestFiles.write API from Iceberg and wrapping the write loop in a try-finally block to ensure the writer is closed properly before retrieving the manifest file.

Note: You will need to import org.apache.iceberg.ManifestFiles and org.apache.iceberg.io.OutputFile.

        OutputFile outputFile = io.newOutputFile(
            table.location() + "/metadata/" + uuid + "-" + manifestIdx++ + ".avro");
        ManifestWriter<DataFile> writer = ManifestFiles.write(spec, outputFile);
        try {
          for (DataFile file : files) {
            writer.add(file);
          }
        } finally {
          writer.close();
        }
        update.appendManifest(writer.toManifestFile());

}
update.commit();
}

Expand Down
Loading