From 8feb495295a88668228a700d2c024e382f690d85 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Fri, 26 Jun 2026 10:06:52 +0000 Subject: [PATCH] [SYSTEMDS-3949] Add native Delta Lake frame read/write via Delta Kernel Extend the native Delta Lake support from matrices to frames, reading and writing Delta Lake tables through the Spark-free Delta Kernel library on the single-node CP path. DML read/write with format="delta" now works for frames, discovering schema, column names, and dimensions directly from the table. - Add FrameReaderDelta, FrameReaderDeltaParallel and FrameWriterDelta - Wire DELTA into the frame reader and writer factories - Refresh cached frame metadata and schema after a Delta read - Broaden Delta frame component IO coverage Stacked on the matrix Delta support; append/overwrite semantics, distributed execution, and time travel remain out of scope. --- .../controlprogram/caching/FrameObject.java | 11 +- .../sysds/runtime/io/FrameReaderDelta.java | 309 +++++++++ .../runtime/io/FrameReaderDeltaParallel.java | 360 ++++++++++ .../sysds/runtime/io/FrameReaderFactory.java | 2 + .../sysds/runtime/io/FrameWriterDelta.java | 251 +++++++ .../sysds/runtime/io/FrameWriterFactory.java | 2 + .../component/io/DeltaFrameReadWriteTest.java | 624 ++++++++++++++++++ .../io/delta/FrameDeltaReadWriteTest.java | 123 ++++ .../io/delta/FrameDeltaReadCompare.dml | 35 + .../functions/io/delta/FrameDeltaWrite.dml | 32 + 10 files changed, 1748 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java create mode 100644 src/main/java/org/apache/sysds/runtime/io/FrameReaderDeltaParallel.java create mode 100644 src/main/java/org/apache/sysds/runtime/io/FrameWriterDelta.java create mode 100644 src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java create mode 100644 src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java create mode 100644 src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml create mode 100644 src/test/scripts/functions/io/delta/FrameDeltaWrite.dml diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java index 7151d87211c..219f954cc52 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.Pair; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.FileFormat; import org.apache.sysds.common.Types.ValueType; @@ -203,9 +204,14 @@ protected FrameBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcept .createFrameReader(iimd.getFileFormat(), getFileFormatProperties()) .readFrameFromHDFS(fname, lschema, dc.getRows(), dc.getCols()); - if(iimd.getFileFormat() == FileFormat.CSV) + //Delta and CSV discover dimensions (and Delta also schema) at read time, so + //refresh the cached metadata to reflect the materialized frame block. + if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) { _metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(data.getDataCharacteristics(), iimd.getFileFormat()) : new MetaData(data.getDataCharacteristics()); + if(iimd.getFileFormat() == FileFormat.DELTA) + _schema = data.getSchema(); + } // sanity check correct output if(data == null) @@ -293,6 +299,9 @@ protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatPro FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt, fprop); writer.writeFrameToHDFS(_data, fname, getNumRows(), getNumColumns()); + + if(DMLScript.STATISTICS) + CacheStatistics.incrementHDFSWrites(); } @Override diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java new file mode 100644 index 00000000000..9d5df380552 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.frame.data.columns.Array; +import org.apache.sysds.runtime.frame.data.columns.ArrayFactory; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +/** + * Single-threaded native Delta Lake reader for frames, built on the Spark-free + * Delta Kernel library. It opens the latest snapshot of a Delta table, reads + * its parquet data files through the kernel's default engine (honoring deletion + * vectors), and materializes the columns into a {@link FrameBlock} whose schema + * and column names are derived from the Delta table schema. + * + *

Data is extracted column-at-a-time into primitive arrays (no per-cell + * boxing or {@code FrameBlock.set} dispatch) and the frame is constructed + * directly from typed column {@link Array}s. Supported column types map to + * SystemDS value types: double, float, long, int, short, byte, boolean, and + * string. Neither the schema nor the dimensions need to be supplied; they are + * discovered from the table.

+ */ +public class FrameReaderDelta extends FrameReader { + + //per-column read codes (how to pull a value out of the Delta column vector); + //package visible so the parallel reader can reuse the same dispatch. + static final int R_DOUBLE = 0, R_FLOAT = 1, R_LONG = 2, R_INT = 3, + R_SHORT = 4, R_BYTE = 5, R_BOOLEAN = 6, R_STRING = 7; + + @Override + public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException + { + Engine engine = DeltaKernelUtils.createEngine(); + String tablePath = DeltaKernelUtils.qualify(fname); + + //per-batch, per-column extracted arrays (boxing free) + ArrayList batchCols = new ArrayList<>(); + ArrayList batchSizes = new ArrayList<>(); + int[] nrowH = new int[1]; + ValueType[][] vtH = new ValueType[1][]; + String[][] nameH = new String[1][]; + int[][] readCodeH = new int[1][]; + + DeltaKernelUtils.scan(engine, tablePath, sch -> { + int ncol = sch.length(); + int[] readCode = new int[ncol]; + ValueType[] vt = new ValueType[ncol]; + String[] cnames = new String[ncol]; + for( int c=0; c { + int n = DeltaKernelUtils.countSelected(size, selected); + Object[] extracted = new Object[ncol]; + for( int c=0; c[] columns = new Array[ncol]; + for( int c=0; c buildColumn(ValueType vt, int nrow, ArrayList batchCols, + ArrayList batchSizes, int c) + { + switch( vt ) { + case FP64: { + double[] all = new double[nrow]; + int off = 0; + for( int b=0; bIt mirrors {@link ReaderDeltaParallel} (the matrix variant) but produces + * typed column {@link Array}s instead of a dense {@code double[]}. As with the + * matrix reader, the expensive part of a Delta read is the per-file parquet + * decode, so parallelizing across data files is the natural speedup. A table + * backed by a single data file cannot be split this way, so the reader + * transparently falls back to the sequential {@link FrameReaderDelta}.

+ */ +public class FrameReaderDeltaParallel extends FrameReaderDelta { + + private final int _numThreads; + + public FrameReaderDeltaParallel() { + _numThreads = OptimizerUtils.getParallelBinaryReadParallelism(); + } + + @Override + public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException + { + Engine engine = DeltaKernelUtils.createEngine(); + String tablePath = DeltaKernelUtils.qualify(fname); + DeltaKernelUtils.ScanHandle handle = DeltaKernelUtils.openScan(engine, tablePath); + + final int nfiles = handle.scanFiles.size(); + //nothing to gain from parallelism for single-file (or empty) tables + if( _numThreads <= 1 || nfiles <= 1 ) + return super.readFrameFromHDFS(fname, schema, names, rlen, clen); + + //derive per-column read codes, value types and names once from the schema + final int ncol = handle.schema.length(); + final int[] readCodes = new int[ncol]; + final ValueType[] vt = new ValueType[ncol]; + final String[] cnames = new String[ncol]; + for( int c=0; c pre-size + //one typed array per column and let each thread decode directly into its + //row offset (no intermediate buffers, no serial concatenation). + if( useDirectPath(handle) ) { + long total = 0; + for( long r : handle.numRecords ) + total += r; + if( total > 0 && total <= Integer.MAX_VALUE ) + return readDirect(fname, handle, ncol, readCodes, vt, cnames, (int) total); + } + + return readBuffered(fname, handle, ncol, readCodes, vt, cnames); + } + + /** + * Whether the metadata-driven direct-write fast path can be used for this + * table (exact per-file row counts and no deletion vectors). Visible for + * testing: the buffered fallback is otherwise only reachable for tables + * lacking row statistics or carrying deletion vectors, which the SystemDS + * Delta writer never produces. + * + * @param handle the opened scan handle + * @return true if the direct path is applicable + */ + protected boolean useDirectPath(DeltaKernelUtils.ScanHandle handle) { + return handle.hasExactRowCounts(); + } + + /** + * Fast path: each thread decodes one data file straight into the final typed + * column arrays at a metadata-derived row offset. Single allocation per + * column, fully parallel. + */ + private FrameBlock readDirect(String fname, DeltaKernelUtils.ScanHandle handle, + int ncol, int[] readCodes, ValueType[] vt, String[] cnames, int nrow) throws IOException + { + final int nfiles = handle.scanFiles.size(); + final int[] rowOffset = new int[nfiles]; + int acc = 0; + for( int i=0; i> tasks = new ArrayList<>(nfiles); + for( int i=0; i { + int[] cur = new int[] {base}; + Engine eng = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> { + for( int c=0; c f : pool.invokeAll(tasks) ) + f.get(); + } + catch(Exception ex) { + throw new IOException("Failed parallel read of Delta table: " + fname, ex); + } + finally { + pool.shutdown(); + } + + Array[] columns = new Array[ncol]; + for( int c=0; c[] fileCols = new ArrayList[nfiles]; + @SuppressWarnings("unchecked") + final ArrayList[] fileSizes = new ArrayList[nfiles]; + final ExecutorService pool = CommonThreadPool.get(_numThreads); + try { + ArrayList> tasks = new ArrayList<>(nfiles); + for( int i=0; i { + ArrayList bcs = new ArrayList<>(); + ArrayList bss = new ArrayList<>(); + Engine eng = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> { + int n = DeltaKernelUtils.countSelected(size, selected); + Object[] extracted = new Object[ncol]; + for( int c=0; c f : pool.invokeAll(tasks) ) + f.get(); + } + catch(Exception ex) { + throw new IOException("Failed parallel read of Delta table: " + fname, ex); + } + finally { + pool.shutdown(); + } + + //flatten the per-file batches in file order and concatenate per column + ArrayList batchCols = new ArrayList<>(); + ArrayList batchSizes = new ArrayList<>(); + int nrow = 0; + for( int i=0; i[] columns = new Array[ncol]; + for( int c=0; c createColumn(ValueType vt, Object full) { + switch( vt ) { + case FP64: return ArrayFactory.create((double[]) full); + case FP32: return ArrayFactory.create((float[]) full); + case INT64: return ArrayFactory.create((long[]) full); + case INT32: return ArrayFactory.create((int[]) full); + case BOOLEAN: return ArrayFactory.create((boolean[]) full); + default: return ArrayFactory.create((String[]) full); // STRING + } + } + + /** + * Decode the live (selected, after deletion vector) rows of one column batch + * directly into a pre-sized typed array starting at absolute row {@code destOff}. + * Null numeric cells keep the array default (0); string nulls are stored as null. + */ + private static void extractColumnInto(ColumnVector col, int size, boolean[] selected, + int readCode, Object dest, int destOff) + { + switch( readCode ) { + case R_DOUBLE: { + double[] a = (double[]) dest; + int lr = destOff; + for( int r=0; r[] cols = new Array[ncol]; + boolean[] nullable = new boolean[ncol]; + for( int c=0; c { + private final Array[] _cols; + private final boolean[] _nullable; + private final StructType _schema; + private final int _nrow; + private final int _ncol; + private int _pos = 0; + + FrameBatchIterator(Array[] cols, boolean[] nullable, StructType schema, int nrow, int ncol) { + _cols = cols; + _nullable = nullable; + _schema = schema; + _nrow = nrow; + _ncol = ncol; + } + + @Override + public boolean hasNext() { + return _pos < _nrow; + } + + @Override + public FilteredColumnarBatch next() { + if( !hasNext() ) + throw new NoSuchElementException(); + int size = Math.min(BATCH_ROWS, _nrow - _pos); + ColumnarBatch batch = new FrameColumnarBatch(_cols, _nullable, _schema, _pos, size, _ncol); + _pos += size; + return new FilteredColumnarBatch(batch, Optional.empty()); + } + + @Override + public void close() { + //nothing to release + } + } + + /** Read-only view of a row range of the frame columns as a Delta Kernel columnar batch. */ + private static class FrameColumnarBatch implements ColumnarBatch { + private final Array[] _cols; + private final boolean[] _nullable; + private final StructType _schema; + private final int _rowStart; + private final int _size; + private final int _ncol; + + FrameColumnarBatch(Array[] cols, boolean[] nullable, StructType schema, int rowStart, int size, int ncol) { + _cols = cols; + _nullable = nullable; + _schema = schema; + _rowStart = rowStart; + _size = size; + _ncol = ncol; + } + + @Override + public StructType getSchema() { + return _schema; + } + + @Override + public ColumnVector getColumnVector(int ordinal) { + if( ordinal < 0 || ordinal >= _ncol ) + throw new IndexOutOfBoundsException("column ordinal " + ordinal); + return new FrameColumnVector(_cols[ordinal], _nullable[ordinal], + _schema.at(ordinal).getDataType(), _rowStart, _size); + } + + @Override + public int getSize() { + return _size; + } + } + + /** + * Read-only typed column view over one column {@link Array} row range. Numeric + * values are read through {@link Array#getAsDouble(int)} to avoid boxing, and + * non-nullable columns short-circuit {@code isNullAt} so the kernel never pays + * for a redundant boxed fetch. + */ + private static class FrameColumnVector implements ColumnVector { + private final Array _col; + private final boolean _nullable; + private final DataType _type; + private final int _rowStart; + private final int _size; + + FrameColumnVector(Array col, boolean nullable, DataType type, int rowStart, int size) { + _col = col; + _nullable = nullable; + _type = type; + _rowStart = rowStart; + _size = size; + } + + @Override + public DataType getDataType() { + return _type; + } + + @Override + public int getSize() { + return _size; + } + + @Override + public boolean isNullAt(int rowId) { + return _nullable && _col.get(_rowStart + rowId) == null; + } + + @Override + public String getString(int rowId) { + Object v = _col.get(_rowStart + rowId); + return (v == null) ? null : v.toString(); + } + + @Override + public boolean getBoolean(int rowId) { + return _col.getAsDouble(_rowStart + rowId) != 0; + } + + @Override + public double getDouble(int rowId) { + return _col.getAsDouble(_rowStart + rowId); + } + + @Override + public float getFloat(int rowId) { + return (float) _col.getAsDouble(_rowStart + rowId); + } + + @Override + public long getLong(int rowId) { + //exact for INT64 (getAsDouble would lose precision beyond 2^53) + return ((Number) _col.get(_rowStart + rowId)).longValue(); + } + + @Override + public int getInt(int rowId) { + return (int) _col.getAsDouble(_rowStart + rowId); + } + + @Override + public void close() { + //nothing to release + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java b/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java index 3fb3968c96f..ff38eb395dd 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java @@ -50,6 +50,8 @@ public static FrameWriter createFrameWriter(FileFormat fmt, FileFormatProperties return binaryParallel ? new FrameWriterBinaryBlockParallel() : new FrameWriterBinaryBlock(); case PROTO: return new FrameWriterProto(); + case DELTA: + return new FrameWriterDelta(); default: throw new DMLRuntimeException("Failed to create frame writer for unknown format: " + fmt.toString()); } diff --git a/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java new file mode 100644 index 00000000000..1c1d33a80d6 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; + +import org.apache.commons.io.FileUtils; +import org.apache.sysds.common.Types.FileFormat; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.CompilerConfig; +import org.apache.sysds.conf.CompilerConfig.ConfigType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.DeltaKernelUtils; +import org.apache.sysds.runtime.io.FrameReader; +import org.apache.sysds.runtime.io.FrameReaderDelta; +import org.apache.sysds.runtime.io.FrameReaderDeltaParallel; +import org.apache.sysds.runtime.io.FrameReaderFactory; +import org.apache.sysds.runtime.io.FrameWriterDelta; +import org.junit.Test; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Direct (no DML) round-trip tests for the native Delta Kernel based frame + * reader/writer. Each test writes a FrameBlock to a fresh local Delta table + * directory and reads it back, asserting the discovered schema, column names, + * dimensions, and per-cell values match. Several tests additionally assert that + * the parallel reader ({@link FrameReaderDeltaParallel}) agrees with the serial + * reader cell-for-cell across a multi-file table (both its direct and buffered + * paths). + */ +public class DeltaFrameReadWriteTest { + + //nonsense schema/dims handed to the reader to confirm it discovers everything + private static final ValueType[] NO_SCHEMA = new ValueType[] {ValueType.STRING}; + private static final String[] NO_NAMES = new String[] {"x"}; + + //small target file size + enough random rows so the writer rolls multiple + //data files, exercising the per-file parallel read path rather than the + //single-file serial fallback. + private static final long SMALL_TARGET_FILE_SIZE = 512L * 1024; + private static final int ROWS_MULTI_FILE = 150_000; + + private static FrameBlock writeThenRead(FrameBlock in) throws Exception { + Path dir = Files.createTempDirectory("sysds_delta_frame_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns()); + //pass nonsense schema/dims: the reader must discover everything from the table + return new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + private static FrameBlock alloc(ValueType[] schema, String[] names, int nrow) { + FrameBlock fb = new FrameBlock(schema, names); + fb.ensureAllocatedColumns(nrow); + return fb; + } + + @Test + public void roundTripMixedTypes() throws Exception { + ValueType[] schema = {ValueType.STRING, ValueType.INT64, ValueType.FP64, + ValueType.BOOLEAN, ValueType.INT32, ValueType.FP32}; + String[] names = {"name", "id", "score", "active", "count", "ratio"}; + int nrow = 5; + FrameBlock in = alloc(schema, names, nrow); + for( int r=0; r readBuffered() + FrameBlock buffered = new FrameReaderDeltaParallel() { + @Override protected boolean useDirectPath(DeltaKernelUtils.ScanHandle h) { return false; } + }.readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + + assertFramesEqual(serial, buffered); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void factoryRoutesDeltaToParallelWhenEnabled() { + //the factory must pick the parallel frame reader iff parallel CP read is enabled + CompilerConfig cc = ConfigurationManager.getCompilerConfig(); + try { + cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, true); + ConfigurationManager.setLocalConfig(cc); + FrameReader par = FrameReaderFactory.createFrameReader(FileFormat.DELTA); + assertTrue("expected FrameReaderDeltaParallel when parallel read enabled", + par instanceof FrameReaderDeltaParallel); + + cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, false); + ConfigurationManager.setLocalConfig(cc); + FrameReader ser = FrameReaderFactory.createFrameReader(FileFormat.DELTA); + assertTrue("expected serial FrameReaderDelta when parallel read disabled", + ser instanceof FrameReaderDelta && !(ser instanceof FrameReaderDeltaParallel)); + } + finally { + ConfigurationManager.clearLocalConfigs(); + } + } + + @Test + public void readerBatchSizeConfigRoundTrips() throws Exception { + //a non-default reader batch size must not change the result (more, smaller + //batches exercise the per-batch extract/concatenate loop more often). + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_READER_BATCH_SIZE, "128"); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_frame_bs_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + assertEquals("config getter reflects the override", + 128, ConfigurationManager.getDeltaReaderBatchSize()); + + FrameBlock in = genMixedFrame(5000, 31); + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns()); + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertFramesEqual(in, out); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void writerTargetFileSizeConfigProducesMoreFiles() throws Exception { + //a smaller configured target file size must make the writer roll more + //data files for the same frame (the lever the parallel reader relies on). + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_frame_cfg_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + assertEquals("config getter reflects the override", + SMALL_TARGET_FILE_SIZE, ConfigurationManager.getDeltaWriterTargetFileSize()); + + FrameBlock in = genMixedFrame(ROWS_MULTI_FILE, 41); + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns()); + assertMultiFile(tablePath); + + //data still round-trips correctly with the custom layout + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertFramesEqual(in, out); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void emptyFrameRoundTrip() throws Exception { + //a schema-only Delta table (no data files, 0 rows); the reader must + //rebuild empty typed columns and discover the schema/names from the table. + ValueType[] schema = {ValueType.STRING, ValueType.FP64, ValueType.INT64}; + String[] names = {"s", "d", "k"}; + DataType[] dtypes = {StringType.STRING, DoubleType.DOUBLE, LongType.LONG}; + + Path dir = Files.createTempDirectory("sysds_delta_frame_empty_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + writeEmptyTable(tablePath, names, dtypes); + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertEquals("rows", 0, out.getNumRows()); + assertEquals("cols", schema.length, out.getNumColumns()); + for( int c=0; c writer must reject + new FrameWriterDelta().writeFrameToHDFS(fb, tablePath, + fb.getNumRows() + 1, fb.getNumColumns()); + fail("expected an IOException for a frame/metadata dimension mismatch"); + } + catch(IOException ex) { + assertTrue("message should mention the dimension mismatch, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("dimensions mismatch")); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readFromInputStreamUnsupported() throws Exception { + //Delta is a directory-based table format; stream reads are not supported + try { + new FrameReaderDelta().readFrameFromInputStream(null, NO_SCHEMA, NO_NAMES, -1, -1); + fail("expected UnsupportedOperationException for a Delta input-stream read"); + } + catch(UnsupportedOperationException ex) { + //expected: must throw before touching the (null) stream + } + } + + @Test + public void parallelReadStringNullsMatchSerialMultiFile() throws Exception { + //string nulls across a multi-file table: the parallel direct path must + //reproduce the serial read cell-for-cell (assertFramesEqual uses + //Objects.equals, so nulls are compared faithfully). + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_frame_parnull_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + ValueType[] schema = {ValueType.STRING, ValueType.INT64}; + String[] names = {"s", "k"}; + int nrow = ROWS_MULTI_FILE; + FrameBlock in = alloc(schema, names, nrow); + for( int r=0; r s = Files.walk(new File(tablePath).toPath()) ) { + files = s.filter(p -> p.toString().endsWith(".parquet")).count(); + } + assertTrue("expected a multi-file Delta table to exercise the parallel path, got " + files, + files > 1); + } + + private static void assertFramesEqual(FrameBlock expected, FrameBlock actual) { + assertEquals("rows", expected.getNumRows(), actual.getNumRows()); + assertEquals("cols", expected.getNumColumns(), actual.getNumColumns()); + int ncol = expected.getNumColumns(); + for( int c=0; c empty() { + return new CloseableIterator() { + @Override public boolean hasNext() { return false; } + @Override public FilteredColumnarBatch next() { throw new NoSuchElementException(); } + @Override public void close() {} + }; + } + + /** Writes a single date column (kernel stores dates as INT32 days) used to + * assert the frame reader rejects a non-mappable column type. */ + private static void writeDateColumn(String tablePath, int[] days) throws Exception { + Engine engine = DeltaKernelUtils.createEngine(); + final StructType schema = new StructType().add("d", DateType.DATE, false); + ColumnarBatch batch = new ColumnarBatch() { + @Override public StructType getSchema() { return schema; } + @Override public int getSize() { return days.length; } + @Override public ColumnVector getColumnVector(int ordinal) { return new DateVector(days); } + }; + FilteredColumnarBatch fcb = new FilteredColumnarBatch(batch, Optional.empty()); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(tablePath), schema, singleton(fcb)); + } + + /** Writes a short column and a byte column (kernel stores these as 16/8-bit + * integers) used to assert the frame reader coerces both to INT32. */ + private static void writeShortByteColumns(String tablePath, short[] shorts, byte[] bytes) throws Exception { + Engine engine = DeltaKernelUtils.createEngine(); + final StructType schema = new StructType() + .add("sh", ShortType.SHORT, false) + .add("by", ByteType.BYTE, false); + ColumnarBatch batch = new ColumnarBatch() { + @Override public StructType getSchema() { return schema; } + @Override public int getSize() { return shorts.length; } + @Override public ColumnVector getColumnVector(int ordinal) { + return (ordinal == 0) ? new ShortVector(shorts) : new ByteVector(bytes); + } + }; + FilteredColumnarBatch fcb = new FilteredColumnarBatch(batch, Optional.empty()); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(tablePath), schema, singleton(fcb)); + } + + private static CloseableIterator singleton(FilteredColumnarBatch fcb) { + return new CloseableIterator() { + private boolean _done = false; + @Override public boolean hasNext() { return !_done; } + @Override public FilteredColumnarBatch next() { + if( _done ) throw new NoSuchElementException(); + _done = true; + return fcb; + } + @Override public void close() {} + }; + } + + /** Column view exposing an int[] as a Delta date column. */ + private static class DateVector implements ColumnVector { + private final int[] _days; + DateVector(int[] days) { _days = days; } + @Override public DataType getDataType() { return DateType.DATE; } + @Override public int getSize() { return _days.length; } + @Override public boolean isNullAt(int rowId) { return false; } + @Override public int getInt(int rowId) { return _days[rowId]; } + @Override public void close() {} + } + + /** Column view exposing a short[] as a Delta short column. */ + private static class ShortVector implements ColumnVector { + private final short[] _vals; + ShortVector(short[] vals) { _vals = vals; } + @Override public DataType getDataType() { return ShortType.SHORT; } + @Override public int getSize() { return _vals.length; } + @Override public boolean isNullAt(int rowId) { return false; } + @Override public short getShort(int rowId) { return _vals[rowId]; } + @Override public void close() {} + } + + /** Column view exposing a byte[] as a Delta byte column. */ + private static class ByteVector implements ColumnVector { + private final byte[] _vals; + ByteVector(byte[] vals) { _vals = vals; } + @Override public DataType getDataType() { return ByteType.BYTE; } + @Override public int getSize() { return _vals.length; } + @Override public boolean isNullAt(int rowId) { return false; } + @Override public byte getByte(int rowId) { return _vals[rowId]; } + @Override public void close() {} + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java b/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java new file mode 100644 index 00000000000..0a6bba5a163 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.delta; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.HashMap; + +import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics; +import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +/** + * End-to-end DML test of the native Delta frame read/write path. + * + *

As in the matrix variant, the write and the read run as two separate + * SystemDS executions so the read is a genuine disk read rather than an + * in-memory cache hit. We additionally assert via {@link CacheStatistics} that + * the write run wrote (delta + text reference) and the read run read (delta + + * text reference) from HDFS, so a short-circuited path would fail the test.

+ */ +public class FrameDeltaReadWriteTest extends AutomatedTestBase { + + private final static String TEST_DIR = "functions/io/delta/"; + private final static String TEST_CLASS_DIR = TEST_DIR + FrameDeltaReadWriteTest.class.getSimpleName() + "/"; + private final static String WRITE_NAME = "FrameDeltaWrite"; + private final static String READ_NAME = "FrameDeltaReadCompare"; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(WRITE_NAME, + new TestConfiguration(TEST_CLASS_DIR, WRITE_NAME, new String[] { "ref" })); + addTestConfiguration(READ_NAME, + new TestConfiguration(TEST_CLASS_DIR, READ_NAME, new String[] { "R" })); + } + + @Test + public void testDenseRoundTrip() { + runFrameDeltaRoundTrip(200, 12, 1.0); + } + + @Test + public void testSparseRoundTrip() { + runFrameDeltaRoundTrip(640, 8, 0.2); + } + + @Test + public void testMultiBatchRoundTrip() { + runFrameDeltaRoundTrip(9000, 4, 1.0); + } + + private void runFrameDeltaRoundTrip(int rows, int cols, double sparsity) { + try { + String HOME = SCRIPT_DIR + TEST_DIR; + + // ---- phase 1: write the frame as a Delta table + text reference ---- + getAndLoadTestConfiguration(WRITE_NAME); + String deltaPath = output("deltaTable"); + String refPath = output("ref"); + fullDMLScriptName = HOME + WRITE_NAME + ".dml"; + programArgs = new String[] { "-stats", "-args", + String.valueOf(rows), String.valueOf(cols), String.valueOf(sparsity), + deltaPath, refPath }; + runTest(true, false, null, -1); + + //the write run must materialize two objects to disk: the frame Delta + //table under test + the matrix text reference. FrameWriterDelta genuinely + //hitting HDFS is what produces the frame-side write statistic. + long hdfsWrites = CacheStatistics.getHDFSWrites(); + assertTrue("expected >= 2 HDFS writes in the write run (delta frame + reference), got " + + hdfsWrites, hdfsWrites >= 2); + //and a real Delta table (transaction log) must have been created + assertTrue("missing Delta transaction log under " + deltaPath, + new File(deltaPath, "_delta_log").isDirectory()); + + // ---- phase 2: fresh execution reads the Delta frame and compares ---- + getAndLoadTestConfiguration(READ_NAME); + fullDMLScriptName = HOME + READ_NAME + ".dml"; + programArgs = new String[] { "-stats", "-args", + deltaPath, refPath, output("R") }; + runTest(true, false, null, -1); + + long hdfsReads = CacheStatistics.getHDFSHits(); + assertTrue("expected >= 2 HDFS reads in the read run (delta + reference), got " + + hdfsReads, hdfsReads >= 2); + + HashMap R = readDMLMatrixFromOutputDir("R"); + double diff = R.getOrDefault(new CellIndex(1, 1), 0.0); + double nrow = R.getOrDefault(new CellIndex(1, 2), 0.0); + double ncol = R.getOrDefault(new CellIndex(1, 3), 0.0); + + assertEquals("reconstruction error", 0.0, diff, 1e-12); + assertEquals("discovered rows", rows, (int) nrow); + assertEquals("discovered cols", cols, (int) ncol); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml b/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml new file mode 100644 index 00000000000..cdf1f0794fc --- /dev/null +++ b/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml @@ -0,0 +1,35 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Reader side of the native Delta frame round-trip test. Reads the Delta table +# as a frame (schema + dimensions discovered from the transaction log) and the +# text matrix reference, both genuine HDFS reads in a fresh process, then +# reports the elementwise reconstruction error and the discovered dimensions. + +Y = read($1, data_type="frame", format="delta") +Xref = read($2, format="text") + +M = as.matrix(Y) +R = matrix(0, rows=1, cols=3) +R[1,1] = sum(abs(Xref - M)) # 0 if FrameReaderDelta reconstructed the frame exactly +R[1,2] = nrow(Y) # discovered row count +R[1,3] = ncol(Y) # discovered column count +write(R, $3) diff --git a/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml b/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml new file mode 100644 index 00000000000..5e152dde013 --- /dev/null +++ b/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml @@ -0,0 +1,32 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Writer side of the native Delta frame round-trip test. Generates a matrix, +# converts it to a frame, and materializes it as a Delta table (under test). +# The same matrix is also written as a plain text reference. Running the +# read/compare in a SEPARATE process prevents SystemDS from short-circuiting +# the subsequent read against the in-memory frame, so FrameReaderDelta is +# actually exercised. + +X = rand(rows=$1, cols=$2, min=-5, max=5, seed=7, sparsity=$3) +F = as.frame(X) +write(F, $4, format="delta") +write(X, $5, format="text")