diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index 7151d87211c..219f954cc52 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -23,6 +23,7 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
@@ -203,9 +204,14 @@ protected FrameBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcept
.createFrameReader(iimd.getFileFormat(), getFileFormatProperties())
.readFrameFromHDFS(fname, lschema, dc.getRows(), dc.getCols());
- if(iimd.getFileFormat() == FileFormat.CSV)
+ //Delta and CSV discover dimensions (and Delta also schema) at read time, so
+ //refresh the cached metadata to reflect the materialized frame block.
+ if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) {
_metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(data.getDataCharacteristics(),
iimd.getFileFormat()) : new MetaData(data.getDataCharacteristics());
+ if(iimd.getFileFormat() == FileFormat.DELTA)
+ _schema = data.getSchema();
+ }
// sanity check correct output
if(data == null)
@@ -293,6 +299,9 @@ protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatPro
FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt, fprop);
writer.writeFrameToHDFS(_data, fname, getNumRows(), getNumColumns());
+
+ if(DMLScript.STATISTICS)
+ CacheStatistics.incrementHDFSWrites();
}
@Override
diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java
new file mode 100644
index 00000000000..9d5df380552
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+/**
+ * Single-threaded native Delta Lake reader for frames, built on the Spark-free
+ * Delta Kernel library. It opens the latest snapshot of a Delta table, reads
+ * its parquet data files through the kernel's default engine (honoring deletion
+ * vectors), and materializes the columns into a {@link FrameBlock} whose schema
+ * and column names are derived from the Delta table schema.
+ *
+ *
Data is extracted column-at-a-time into primitive arrays (no per-cell
+ * boxing or {@code FrameBlock.set} dispatch) and the frame is constructed
+ * directly from typed column {@link Array}s. Supported column types map to
+ * SystemDS value types: double, float, long, int, short, byte, boolean, and
+ * string. Neither the schema nor the dimensions need to be supplied; they are
+ * discovered from the table.
+ */
+public class FrameReaderDelta extends FrameReader {
+
+ //per-column read codes (how to pull a value out of the Delta column vector);
+ //package visible so the parallel reader can reuse the same dispatch.
+ static final int R_DOUBLE = 0, R_FLOAT = 1, R_LONG = 2, R_INT = 3,
+ R_SHORT = 4, R_BYTE = 5, R_BOOLEAN = 6, R_STRING = 7;
+
+ @Override
+ public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen)
+ throws IOException, DMLRuntimeException
+ {
+ Engine engine = DeltaKernelUtils.createEngine();
+ String tablePath = DeltaKernelUtils.qualify(fname);
+
+ //per-batch, per-column extracted arrays (boxing free)
+ ArrayList batchCols = new ArrayList<>();
+ ArrayList batchSizes = new ArrayList<>();
+ int[] nrowH = new int[1];
+ ValueType[][] vtH = new ValueType[1][];
+ String[][] nameH = new String[1][];
+ int[][] readCodeH = new int[1][];
+
+ DeltaKernelUtils.scan(engine, tablePath, sch -> {
+ int ncol = sch.length();
+ int[] readCode = new int[ncol];
+ ValueType[] vt = new ValueType[ncol];
+ String[] cnames = new String[ncol];
+ for( int c=0; c {
+ int n = DeltaKernelUtils.countSelected(size, selected);
+ Object[] extracted = new Object[ncol];
+ for( int c=0; c[] columns = new Array>[ncol];
+ for( int c=0; c buildColumn(ValueType vt, int nrow, ArrayList batchCols,
+ ArrayList batchSizes, int c)
+ {
+ switch( vt ) {
+ case FP64: {
+ double[] all = new double[nrow];
+ int off = 0;
+ for( int b=0; bIt mirrors {@link ReaderDeltaParallel} (the matrix variant) but produces
+ * typed column {@link Array}s instead of a dense {@code double[]}. As with the
+ * matrix reader, the expensive part of a Delta read is the per-file parquet
+ * decode, so parallelizing across data files is the natural speedup. A table
+ * backed by a single data file cannot be split this way, so the reader
+ * transparently falls back to the sequential {@link FrameReaderDelta}.
+ */
+public class FrameReaderDeltaParallel extends FrameReaderDelta {
+
+ private final int _numThreads;
+
+ public FrameReaderDeltaParallel() {
+ _numThreads = OptimizerUtils.getParallelBinaryReadParallelism();
+ }
+
+ @Override
+ public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen)
+ throws IOException, DMLRuntimeException
+ {
+ Engine engine = DeltaKernelUtils.createEngine();
+ String tablePath = DeltaKernelUtils.qualify(fname);
+ DeltaKernelUtils.ScanHandle handle = DeltaKernelUtils.openScan(engine, tablePath);
+
+ final int nfiles = handle.scanFiles.size();
+ //nothing to gain from parallelism for single-file (or empty) tables
+ if( _numThreads <= 1 || nfiles <= 1 )
+ return super.readFrameFromHDFS(fname, schema, names, rlen, clen);
+
+ //derive per-column read codes, value types and names once from the schema
+ final int ncol = handle.schema.length();
+ final int[] readCodes = new int[ncol];
+ final ValueType[] vt = new ValueType[ncol];
+ final String[] cnames = new String[ncol];
+ for( int c=0; c pre-size
+ //one typed array per column and let each thread decode directly into its
+ //row offset (no intermediate buffers, no serial concatenation).
+ if( useDirectPath(handle) ) {
+ long total = 0;
+ for( long r : handle.numRecords )
+ total += r;
+ if( total > 0 && total <= Integer.MAX_VALUE )
+ return readDirect(fname, handle, ncol, readCodes, vt, cnames, (int) total);
+ }
+
+ return readBuffered(fname, handle, ncol, readCodes, vt, cnames);
+ }
+
+ /**
+ * Whether the metadata-driven direct-write fast path can be used for this
+ * table (exact per-file row counts and no deletion vectors). Visible for
+ * testing: the buffered fallback is otherwise only reachable for tables
+ * lacking row statistics or carrying deletion vectors, which the SystemDS
+ * Delta writer never produces.
+ *
+ * @param handle the opened scan handle
+ * @return true if the direct path is applicable
+ */
+ protected boolean useDirectPath(DeltaKernelUtils.ScanHandle handle) {
+ return handle.hasExactRowCounts();
+ }
+
+ /**
+ * Fast path: each thread decodes one data file straight into the final typed
+ * column arrays at a metadata-derived row offset. Single allocation per
+ * column, fully parallel.
+ */
+ private FrameBlock readDirect(String fname, DeltaKernelUtils.ScanHandle handle,
+ int ncol, int[] readCodes, ValueType[] vt, String[] cnames, int nrow) throws IOException
+ {
+ final int nfiles = handle.scanFiles.size();
+ final int[] rowOffset = new int[nfiles];
+ int acc = 0;
+ for( int i=0; i> tasks = new ArrayList<>(nfiles);
+ for( int i=0; i {
+ int[] cur = new int[] {base};
+ Engine eng = DeltaKernelUtils.createEngine();
+ DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow,
+ (cols, size, selected) -> {
+ for( int c=0; c f : pool.invokeAll(tasks) )
+ f.get();
+ }
+ catch(Exception ex) {
+ throw new IOException("Failed parallel read of Delta table: " + fname, ex);
+ }
+ finally {
+ pool.shutdown();
+ }
+
+ Array>[] columns = new Array>[ncol];
+ for( int c=0; c[] fileCols = new ArrayList[nfiles];
+ @SuppressWarnings("unchecked")
+ final ArrayList[] fileSizes = new ArrayList[nfiles];
+ final ExecutorService pool = CommonThreadPool.get(_numThreads);
+ try {
+ ArrayList> tasks = new ArrayList<>(nfiles);
+ for( int i=0; i {
+ ArrayList bcs = new ArrayList<>();
+ ArrayList bss = new ArrayList<>();
+ Engine eng = DeltaKernelUtils.createEngine();
+ DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow,
+ (cols, size, selected) -> {
+ int n = DeltaKernelUtils.countSelected(size, selected);
+ Object[] extracted = new Object[ncol];
+ for( int c=0; c f : pool.invokeAll(tasks) )
+ f.get();
+ }
+ catch(Exception ex) {
+ throw new IOException("Failed parallel read of Delta table: " + fname, ex);
+ }
+ finally {
+ pool.shutdown();
+ }
+
+ //flatten the per-file batches in file order and concatenate per column
+ ArrayList batchCols = new ArrayList<>();
+ ArrayList batchSizes = new ArrayList<>();
+ int nrow = 0;
+ for( int i=0; i[] columns = new Array>[ncol];
+ for( int c=0; c createColumn(ValueType vt, Object full) {
+ switch( vt ) {
+ case FP64: return ArrayFactory.create((double[]) full);
+ case FP32: return ArrayFactory.create((float[]) full);
+ case INT64: return ArrayFactory.create((long[]) full);
+ case INT32: return ArrayFactory.create((int[]) full);
+ case BOOLEAN: return ArrayFactory.create((boolean[]) full);
+ default: return ArrayFactory.create((String[]) full); // STRING
+ }
+ }
+
+ /**
+ * Decode the live (selected, after deletion vector) rows of one column batch
+ * directly into a pre-sized typed array starting at absolute row {@code destOff}.
+ * Null numeric cells keep the array default (0); string nulls are stored as null.
+ */
+ private static void extractColumnInto(ColumnVector col, int size, boolean[] selected,
+ int readCode, Object dest, int destOff)
+ {
+ switch( readCode ) {
+ case R_DOUBLE: {
+ double[] a = (double[]) dest;
+ int lr = destOff;
+ for( int r=0; r[] cols = new Array>[ncol];
+ boolean[] nullable = new boolean[ncol];
+ for( int c=0; c {
+ private final Array>[] _cols;
+ private final boolean[] _nullable;
+ private final StructType _schema;
+ private final int _nrow;
+ private final int _ncol;
+ private int _pos = 0;
+
+ FrameBatchIterator(Array>[] cols, boolean[] nullable, StructType schema, int nrow, int ncol) {
+ _cols = cols;
+ _nullable = nullable;
+ _schema = schema;
+ _nrow = nrow;
+ _ncol = ncol;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _pos < _nrow;
+ }
+
+ @Override
+ public FilteredColumnarBatch next() {
+ if( !hasNext() )
+ throw new NoSuchElementException();
+ int size = Math.min(BATCH_ROWS, _nrow - _pos);
+ ColumnarBatch batch = new FrameColumnarBatch(_cols, _nullable, _schema, _pos, size, _ncol);
+ _pos += size;
+ return new FilteredColumnarBatch(batch, Optional.empty());
+ }
+
+ @Override
+ public void close() {
+ //nothing to release
+ }
+ }
+
+ /** Read-only view of a row range of the frame columns as a Delta Kernel columnar batch. */
+ private static class FrameColumnarBatch implements ColumnarBatch {
+ private final Array>[] _cols;
+ private final boolean[] _nullable;
+ private final StructType _schema;
+ private final int _rowStart;
+ private final int _size;
+ private final int _ncol;
+
+ FrameColumnarBatch(Array>[] cols, boolean[] nullable, StructType schema, int rowStart, int size, int ncol) {
+ _cols = cols;
+ _nullable = nullable;
+ _schema = schema;
+ _rowStart = rowStart;
+ _size = size;
+ _ncol = ncol;
+ }
+
+ @Override
+ public StructType getSchema() {
+ return _schema;
+ }
+
+ @Override
+ public ColumnVector getColumnVector(int ordinal) {
+ if( ordinal < 0 || ordinal >= _ncol )
+ throw new IndexOutOfBoundsException("column ordinal " + ordinal);
+ return new FrameColumnVector(_cols[ordinal], _nullable[ordinal],
+ _schema.at(ordinal).getDataType(), _rowStart, _size);
+ }
+
+ @Override
+ public int getSize() {
+ return _size;
+ }
+ }
+
+ /**
+ * Read-only typed column view over one column {@link Array} row range. Numeric
+ * values are read through {@link Array#getAsDouble(int)} to avoid boxing, and
+ * non-nullable columns short-circuit {@code isNullAt} so the kernel never pays
+ * for a redundant boxed fetch.
+ */
+ private static class FrameColumnVector implements ColumnVector {
+ private final Array> _col;
+ private final boolean _nullable;
+ private final DataType _type;
+ private final int _rowStart;
+ private final int _size;
+
+ FrameColumnVector(Array> col, boolean nullable, DataType type, int rowStart, int size) {
+ _col = col;
+ _nullable = nullable;
+ _type = type;
+ _rowStart = rowStart;
+ _size = size;
+ }
+
+ @Override
+ public DataType getDataType() {
+ return _type;
+ }
+
+ @Override
+ public int getSize() {
+ return _size;
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return _nullable && _col.get(_rowStart + rowId) == null;
+ }
+
+ @Override
+ public String getString(int rowId) {
+ Object v = _col.get(_rowStart + rowId);
+ return (v == null) ? null : v.toString();
+ }
+
+ @Override
+ public boolean getBoolean(int rowId) {
+ return _col.getAsDouble(_rowStart + rowId) != 0;
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return _col.getAsDouble(_rowStart + rowId);
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ return (float) _col.getAsDouble(_rowStart + rowId);
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ //exact for INT64 (getAsDouble would lose precision beyond 2^53)
+ return ((Number) _col.get(_rowStart + rowId)).longValue();
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ return (int) _col.getAsDouble(_rowStart + rowId);
+ }
+
+ @Override
+ public void close() {
+ //nothing to release
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java b/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java
index 3fb3968c96f..ff38eb395dd 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java
@@ -50,6 +50,8 @@ public static FrameWriter createFrameWriter(FileFormat fmt, FileFormatProperties
return binaryParallel ? new FrameWriterBinaryBlockParallel() : new FrameWriterBinaryBlock();
case PROTO:
return new FrameWriterProto();
+ case DELTA:
+ return new FrameWriterDelta();
default:
throw new DMLRuntimeException("Failed to create frame writer for unknown format: " + fmt.toString());
}
diff --git a/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java
new file mode 100644
index 00000000000..1c1d33a80d6
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.conf.CompilerConfig;
+import org.apache.sysds.conf.CompilerConfig.ConfigType;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.io.DeltaKernelUtils;
+import org.apache.sysds.runtime.io.FrameReader;
+import org.apache.sysds.runtime.io.FrameReaderDelta;
+import org.apache.sysds.runtime.io.FrameReaderDeltaParallel;
+import org.apache.sysds.runtime.io.FrameReaderFactory;
+import org.apache.sysds.runtime.io.FrameWriterDelta;
+import org.junit.Test;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+
+/**
+ * Direct (no DML) round-trip tests for the native Delta Kernel based frame
+ * reader/writer. Each test writes a FrameBlock to a fresh local Delta table
+ * directory and reads it back, asserting the discovered schema, column names,
+ * dimensions, and per-cell values match. Several tests additionally assert that
+ * the parallel reader ({@link FrameReaderDeltaParallel}) agrees with the serial
+ * reader cell-for-cell across a multi-file table (both its direct and buffered
+ * paths).
+ */
+public class DeltaFrameReadWriteTest {
+
+ //nonsense schema/dims handed to the reader to confirm it discovers everything
+ private static final ValueType[] NO_SCHEMA = new ValueType[] {ValueType.STRING};
+ private static final String[] NO_NAMES = new String[] {"x"};
+
+ //small target file size + enough random rows so the writer rolls multiple
+ //data files, exercising the per-file parallel read path rather than the
+ //single-file serial fallback.
+ private static final long SMALL_TARGET_FILE_SIZE = 512L * 1024;
+ private static final int ROWS_MULTI_FILE = 150_000;
+
+ private static FrameBlock writeThenRead(FrameBlock in) throws Exception {
+ Path dir = Files.createTempDirectory("sysds_delta_frame_");
+ String tablePath = new File(dir.toFile(), "table").getAbsolutePath();
+ try {
+ new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns());
+ //pass nonsense schema/dims: the reader must discover everything from the table
+ return new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1);
+ }
+ finally {
+ FileUtils.deleteQuietly(dir.toFile());
+ }
+ }
+
+ private static FrameBlock alloc(ValueType[] schema, String[] names, int nrow) {
+ FrameBlock fb = new FrameBlock(schema, names);
+ fb.ensureAllocatedColumns(nrow);
+ return fb;
+ }
+
+ @Test
+ public void roundTripMixedTypes() throws Exception {
+ ValueType[] schema = {ValueType.STRING, ValueType.INT64, ValueType.FP64,
+ ValueType.BOOLEAN, ValueType.INT32, ValueType.FP32};
+ String[] names = {"name", "id", "score", "active", "count", "ratio"};
+ int nrow = 5;
+ FrameBlock in = alloc(schema, names, nrow);
+ for( int r=0; r readBuffered()
+ FrameBlock buffered = new FrameReaderDeltaParallel() {
+ @Override protected boolean useDirectPath(DeltaKernelUtils.ScanHandle h) { return false; }
+ }.readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1);
+
+ assertFramesEqual(serial, buffered);
+ }
+ finally {
+ ConfigurationManager.clearLocalConfigs();
+ FileUtils.deleteQuietly(dir.toFile());
+ }
+ }
+
+ @Test
+ public void factoryRoutesDeltaToParallelWhenEnabled() {
+ //the factory must pick the parallel frame reader iff parallel CP read is enabled
+ CompilerConfig cc = ConfigurationManager.getCompilerConfig();
+ try {
+ cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, true);
+ ConfigurationManager.setLocalConfig(cc);
+ FrameReader par = FrameReaderFactory.createFrameReader(FileFormat.DELTA);
+ assertTrue("expected FrameReaderDeltaParallel when parallel read enabled",
+ par instanceof FrameReaderDeltaParallel);
+
+ cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, false);
+ ConfigurationManager.setLocalConfig(cc);
+ FrameReader ser = FrameReaderFactory.createFrameReader(FileFormat.DELTA);
+ assertTrue("expected serial FrameReaderDelta when parallel read disabled",
+ ser instanceof FrameReaderDelta && !(ser instanceof FrameReaderDeltaParallel));
+ }
+ finally {
+ ConfigurationManager.clearLocalConfigs();
+ }
+ }
+
+ @Test
+ public void readerBatchSizeConfigRoundTrips() throws Exception {
+ //a non-default reader batch size must not change the result (more, smaller
+ //batches exercise the per-batch extract/concatenate loop more often).
+ DMLConfig conf = new DMLConfig();
+ conf.setTextValue(DMLConfig.DELTA_READER_BATCH_SIZE, "128");
+ ConfigurationManager.setLocalConfig(conf);
+ Path dir = Files.createTempDirectory("sysds_delta_frame_bs_");
+ String tablePath = new File(dir.toFile(), "table").getAbsolutePath();
+ try {
+ assertEquals("config getter reflects the override",
+ 128, ConfigurationManager.getDeltaReaderBatchSize());
+
+ FrameBlock in = genMixedFrame(5000, 31);
+ new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns());
+ FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1);
+ assertFramesEqual(in, out);
+ }
+ finally {
+ ConfigurationManager.clearLocalConfigs();
+ FileUtils.deleteQuietly(dir.toFile());
+ }
+ }
+
+ @Test
+ public void writerTargetFileSizeConfigProducesMoreFiles() throws Exception {
+ //a smaller configured target file size must make the writer roll more
+ //data files for the same frame (the lever the parallel reader relies on).
+ DMLConfig conf = new DMLConfig();
+ conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE));
+ ConfigurationManager.setLocalConfig(conf);
+ Path dir = Files.createTempDirectory("sysds_delta_frame_cfg_");
+ String tablePath = new File(dir.toFile(), "table").getAbsolutePath();
+ try {
+ assertEquals("config getter reflects the override",
+ SMALL_TARGET_FILE_SIZE, ConfigurationManager.getDeltaWriterTargetFileSize());
+
+ FrameBlock in = genMixedFrame(ROWS_MULTI_FILE, 41);
+ new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns());
+ assertMultiFile(tablePath);
+
+ //data still round-trips correctly with the custom layout
+ FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1);
+ assertFramesEqual(in, out);
+ }
+ finally {
+ ConfigurationManager.clearLocalConfigs();
+ FileUtils.deleteQuietly(dir.toFile());
+ }
+ }
+
+ @Test
+ public void emptyFrameRoundTrip() throws Exception {
+ //a schema-only Delta table (no data files, 0 rows); the reader must
+ //rebuild empty typed columns and discover the schema/names from the table.
+ ValueType[] schema = {ValueType.STRING, ValueType.FP64, ValueType.INT64};
+ String[] names = {"s", "d", "k"};
+ DataType[] dtypes = {StringType.STRING, DoubleType.DOUBLE, LongType.LONG};
+
+ Path dir = Files.createTempDirectory("sysds_delta_frame_empty_");
+ String tablePath = new File(dir.toFile(), "table").getAbsolutePath();
+ try {
+ writeEmptyTable(tablePath, names, dtypes);
+ FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1);
+ assertEquals("rows", 0, out.getNumRows());
+ assertEquals("cols", schema.length, out.getNumColumns());
+ for( int c=0; c writer must reject
+ new FrameWriterDelta().writeFrameToHDFS(fb, tablePath,
+ fb.getNumRows() + 1, fb.getNumColumns());
+ fail("expected an IOException for a frame/metadata dimension mismatch");
+ }
+ catch(IOException ex) {
+ assertTrue("message should mention the dimension mismatch, got: " + ex.getMessage(),
+ ex.getMessage() != null && ex.getMessage().contains("dimensions mismatch"));
+ }
+ finally {
+ FileUtils.deleteQuietly(dir.toFile());
+ }
+ }
+
+ @Test
+ public void readFromInputStreamUnsupported() throws Exception {
+ //Delta is a directory-based table format; stream reads are not supported
+ try {
+ new FrameReaderDelta().readFrameFromInputStream(null, NO_SCHEMA, NO_NAMES, -1, -1);
+ fail("expected UnsupportedOperationException for a Delta input-stream read");
+ }
+ catch(UnsupportedOperationException ex) {
+ //expected: must throw before touching the (null) stream
+ }
+ }
+
+ @Test
+ public void parallelReadStringNullsMatchSerialMultiFile() throws Exception {
+ //string nulls across a multi-file table: the parallel direct path must
+ //reproduce the serial read cell-for-cell (assertFramesEqual uses
+ //Objects.equals, so nulls are compared faithfully).
+ DMLConfig conf = new DMLConfig();
+ conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE));
+ ConfigurationManager.setLocalConfig(conf);
+ Path dir = Files.createTempDirectory("sysds_delta_frame_parnull_");
+ String tablePath = new File(dir.toFile(), "table").getAbsolutePath();
+ try {
+ ValueType[] schema = {ValueType.STRING, ValueType.INT64};
+ String[] names = {"s", "k"};
+ int nrow = ROWS_MULTI_FILE;
+ FrameBlock in = alloc(schema, names, nrow);
+ for( int r=0; r s = Files.walk(new File(tablePath).toPath()) ) {
+ files = s.filter(p -> p.toString().endsWith(".parquet")).count();
+ }
+ assertTrue("expected a multi-file Delta table to exercise the parallel path, got " + files,
+ files > 1);
+ }
+
+ private static void assertFramesEqual(FrameBlock expected, FrameBlock actual) {
+ assertEquals("rows", expected.getNumRows(), actual.getNumRows());
+ assertEquals("cols", expected.getNumColumns(), actual.getNumColumns());
+ int ncol = expected.getNumColumns();
+ for( int c=0; c empty() {
+ return new CloseableIterator() {
+ @Override public boolean hasNext() { return false; }
+ @Override public FilteredColumnarBatch next() { throw new NoSuchElementException(); }
+ @Override public void close() {}
+ };
+ }
+
+ /** Writes a single date column (kernel stores dates as INT32 days) used to
+ * assert the frame reader rejects a non-mappable column type. */
+ private static void writeDateColumn(String tablePath, int[] days) throws Exception {
+ Engine engine = DeltaKernelUtils.createEngine();
+ final StructType schema = new StructType().add("d", DateType.DATE, false);
+ ColumnarBatch batch = new ColumnarBatch() {
+ @Override public StructType getSchema() { return schema; }
+ @Override public int getSize() { return days.length; }
+ @Override public ColumnVector getColumnVector(int ordinal) { return new DateVector(days); }
+ };
+ FilteredColumnarBatch fcb = new FilteredColumnarBatch(batch, Optional.empty());
+ DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(tablePath), schema, singleton(fcb));
+ }
+
+ /** Writes a short column and a byte column (kernel stores these as 16/8-bit
+ * integers) used to assert the frame reader coerces both to INT32. */
+ private static void writeShortByteColumns(String tablePath, short[] shorts, byte[] bytes) throws Exception {
+ Engine engine = DeltaKernelUtils.createEngine();
+ final StructType schema = new StructType()
+ .add("sh", ShortType.SHORT, false)
+ .add("by", ByteType.BYTE, false);
+ ColumnarBatch batch = new ColumnarBatch() {
+ @Override public StructType getSchema() { return schema; }
+ @Override public int getSize() { return shorts.length; }
+ @Override public ColumnVector getColumnVector(int ordinal) {
+ return (ordinal == 0) ? new ShortVector(shorts) : new ByteVector(bytes);
+ }
+ };
+ FilteredColumnarBatch fcb = new FilteredColumnarBatch(batch, Optional.empty());
+ DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(tablePath), schema, singleton(fcb));
+ }
+
+ private static CloseableIterator singleton(FilteredColumnarBatch fcb) {
+ return new CloseableIterator() {
+ private boolean _done = false;
+ @Override public boolean hasNext() { return !_done; }
+ @Override public FilteredColumnarBatch next() {
+ if( _done ) throw new NoSuchElementException();
+ _done = true;
+ return fcb;
+ }
+ @Override public void close() {}
+ };
+ }
+
+ /** Column view exposing an int[] as a Delta date column. */
+ private static class DateVector implements ColumnVector {
+ private final int[] _days;
+ DateVector(int[] days) { _days = days; }
+ @Override public DataType getDataType() { return DateType.DATE; }
+ @Override public int getSize() { return _days.length; }
+ @Override public boolean isNullAt(int rowId) { return false; }
+ @Override public int getInt(int rowId) { return _days[rowId]; }
+ @Override public void close() {}
+ }
+
+ /** Column view exposing a short[] as a Delta short column. */
+ private static class ShortVector implements ColumnVector {
+ private final short[] _vals;
+ ShortVector(short[] vals) { _vals = vals; }
+ @Override public DataType getDataType() { return ShortType.SHORT; }
+ @Override public int getSize() { return _vals.length; }
+ @Override public boolean isNullAt(int rowId) { return false; }
+ @Override public short getShort(int rowId) { return _vals[rowId]; }
+ @Override public void close() {}
+ }
+
+ /** Column view exposing a byte[] as a Delta byte column. */
+ private static class ByteVector implements ColumnVector {
+ private final byte[] _vals;
+ ByteVector(byte[] vals) { _vals = vals; }
+ @Override public DataType getDataType() { return ByteType.BYTE; }
+ @Override public int getSize() { return _vals.length; }
+ @Override public boolean isNullAt(int rowId) { return false; }
+ @Override public byte getByte(int rowId) { return _vals[rowId]; }
+ @Override public void close() {}
+ }
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java b/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java
new file mode 100644
index 00000000000..0a6bba5a163
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.io.delta;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.HashMap;
+
+import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics;
+import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+
+/**
+ * End-to-end DML test of the native Delta frame read/write path.
+ *
+ * As in the matrix variant, the write and the read run as two separate
+ * SystemDS executions so the read is a genuine disk read rather than an
+ * in-memory cache hit. We additionally assert via {@link CacheStatistics} that
+ * the write run wrote (delta + text reference) and the read run read (delta +
+ * text reference) from HDFS, so a short-circuited path would fail the test.
+ */
+public class FrameDeltaReadWriteTest extends AutomatedTestBase {
+
+ private final static String TEST_DIR = "functions/io/delta/";
+ private final static String TEST_CLASS_DIR = TEST_DIR + FrameDeltaReadWriteTest.class.getSimpleName() + "/";
+ private final static String WRITE_NAME = "FrameDeltaWrite";
+ private final static String READ_NAME = "FrameDeltaReadCompare";
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(WRITE_NAME,
+ new TestConfiguration(TEST_CLASS_DIR, WRITE_NAME, new String[] { "ref" }));
+ addTestConfiguration(READ_NAME,
+ new TestConfiguration(TEST_CLASS_DIR, READ_NAME, new String[] { "R" }));
+ }
+
+ @Test
+ public void testDenseRoundTrip() {
+ runFrameDeltaRoundTrip(200, 12, 1.0);
+ }
+
+ @Test
+ public void testSparseRoundTrip() {
+ runFrameDeltaRoundTrip(640, 8, 0.2);
+ }
+
+ @Test
+ public void testMultiBatchRoundTrip() {
+ runFrameDeltaRoundTrip(9000, 4, 1.0);
+ }
+
+ private void runFrameDeltaRoundTrip(int rows, int cols, double sparsity) {
+ try {
+ String HOME = SCRIPT_DIR + TEST_DIR;
+
+ // ---- phase 1: write the frame as a Delta table + text reference ----
+ getAndLoadTestConfiguration(WRITE_NAME);
+ String deltaPath = output("deltaTable");
+ String refPath = output("ref");
+ fullDMLScriptName = HOME + WRITE_NAME + ".dml";
+ programArgs = new String[] { "-stats", "-args",
+ String.valueOf(rows), String.valueOf(cols), String.valueOf(sparsity),
+ deltaPath, refPath };
+ runTest(true, false, null, -1);
+
+ //the write run must materialize two objects to disk: the frame Delta
+ //table under test + the matrix text reference. FrameWriterDelta genuinely
+ //hitting HDFS is what produces the frame-side write statistic.
+ long hdfsWrites = CacheStatistics.getHDFSWrites();
+ assertTrue("expected >= 2 HDFS writes in the write run (delta frame + reference), got "
+ + hdfsWrites, hdfsWrites >= 2);
+ //and a real Delta table (transaction log) must have been created
+ assertTrue("missing Delta transaction log under " + deltaPath,
+ new File(deltaPath, "_delta_log").isDirectory());
+
+ // ---- phase 2: fresh execution reads the Delta frame and compares ----
+ getAndLoadTestConfiguration(READ_NAME);
+ fullDMLScriptName = HOME + READ_NAME + ".dml";
+ programArgs = new String[] { "-stats", "-args",
+ deltaPath, refPath, output("R") };
+ runTest(true, false, null, -1);
+
+ long hdfsReads = CacheStatistics.getHDFSHits();
+ assertTrue("expected >= 2 HDFS reads in the read run (delta + reference), got "
+ + hdfsReads, hdfsReads >= 2);
+
+ HashMap R = readDMLMatrixFromOutputDir("R");
+ double diff = R.getOrDefault(new CellIndex(1, 1), 0.0);
+ double nrow = R.getOrDefault(new CellIndex(1, 2), 0.0);
+ double ncol = R.getOrDefault(new CellIndex(1, 3), 0.0);
+
+ assertEquals("reconstruction error", 0.0, diff, 1e-12);
+ assertEquals("discovered rows", rows, (int) nrow);
+ assertEquals("discovered cols", cols, (int) ncol);
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+}
diff --git a/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml b/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml
new file mode 100644
index 00000000000..cdf1f0794fc
--- /dev/null
+++ b/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml
@@ -0,0 +1,35 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Reader side of the native Delta frame round-trip test. Reads the Delta table
+# as a frame (schema + dimensions discovered from the transaction log) and the
+# text matrix reference, both genuine HDFS reads in a fresh process, then
+# reports the elementwise reconstruction error and the discovered dimensions.
+
+Y = read($1, data_type="frame", format="delta")
+Xref = read($2, format="text")
+
+M = as.matrix(Y)
+R = matrix(0, rows=1, cols=3)
+R[1,1] = sum(abs(Xref - M)) # 0 if FrameReaderDelta reconstructed the frame exactly
+R[1,2] = nrow(Y) # discovered row count
+R[1,3] = ncol(Y) # discovered column count
+write(R, $3)
diff --git a/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml b/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml
new file mode 100644
index 00000000000..5e152dde013
--- /dev/null
+++ b/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Writer side of the native Delta frame round-trip test. Generates a matrix,
+# converts it to a frame, and materializes it as a Delta table (under test).
+# The same matrix is also written as a plain text reference. Running the
+# read/compare in a SEPARATE process prevents SystemDS from short-circuiting
+# the subsequent read against the in-memory frame, so FrameReaderDelta is
+# actually exercised.
+
+X = rand(rows=$1, cols=$2, min=-5, max=5, seed=7, sparsity=$3)
+F = as.frame(X)
+write(F, $4, format="delta")
+write(X, $5, format="text")