Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/javaTests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
"org.apache.sysds.test.applications.**",
"**.test.usertest.**",
"**.component.c**.**",
"**.component.e**.**,**.component.f**.**,**.component.m**.**,**.component.o**.**",
"**.component.e**.**,**.component.f**.**,**.component.i**.**,**.component.m**.**,**.component.o**.**",
"**.component.p**.**,**.component.r**.**,**.component.s**.**,**.component.t**.**,**.component.u**.**",
"**.functions.a**.**,**.functions.binary.matrix.**,**.functions.binary.scalar.**,**.functions.binary.tensor.**",
"**.functions.blocks.**,**.functions.data.rand.**,",
Expand Down
44 changes: 44 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
<jackson.version>2.15.4</jackson.version>
<scala.version>2.12.18</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<delta-kernel.version>3.3.2</delta-kernel.version>
<parquet.version>1.13.1</parquet.version>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss z</maven.build.timestamp.format>
<project.build.outputTimestamp>1</project.build.outputTimestamp>
<enableGPU>false</enableGPU>
Expand Down Expand Up @@ -968,6 +970,48 @@
</profiles>

<dependencies>
<!-- Delta Lake native (Spark-free) read/write via Delta Kernel -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-api</artifactId>
<version>${delta-kernel.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-defaults</artifactId>
<version>${delta-kernel.version}</version>
</dependency>
<!-- Delta Kernel needs parquet >= 1.13 (the version Spark 3.5.x ships);
declared directly via ${parquet.version} so Maven's nearest-wins
mediation does not downgrade it to an older transitive version. The
jackson trio Delta Kernel relies on is pinned globally via
${jackson.version}. -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${parquet.version}</version>
</dependency>
<!-- Delta's Spark connector, test-only: an independent (reference) Delta engine
used purely to cross-check that tables our Delta Kernel writer produces are
readable by Spark/Delta, and that tables Spark/Delta writes (including
deletion vectors and multiple commits) are readable by our reader. Delta
3.3.x targets Spark 3.5.x, matching ${spark.version}. -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
<version>${delta-kernel.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jcuda</groupId>
<artifactId>jcuda</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/apache/sysds/common/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -878,14 +878,15 @@ public enum FileFormat {
HDF5, // Hierarchical Data Format (HDF)
COG, // Cloud-optimized GeoTIFF
PARQUET, // parquet format for columnar data storage
DELTA, // Delta Lake table (transaction log + parquet), read/written via Delta Kernel
UNKNOWN;

public boolean isIJV() {
return this == TEXT || this == MM;
}

public boolean isTextFormat() {
return this != BINARY && this != COMPRESSED;
return this != BINARY && this != COMPRESSED && this != DELTA;
}

public static boolean isTextFormat(String fmt) {
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/apache/sysds/conf/ConfigurationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,21 @@ public static int getFederatedTimeout(){
return getDMLConfig().getIntValue(DMLConfig.FEDERATED_TIMEOUT);
}

/** @return rows per parquet read batch for the native Delta reader */
public static int getDeltaReaderBatchSize() {
return getDMLConfig().getIntValue(DMLConfig.DELTA_READER_BATCH_SIZE);
}

/** @return matrix rows materialized per columnar batch for the native Delta writer */
public static int getDeltaWriterBatchSize() {
return getDMLConfig().getIntValue(DMLConfig.DELTA_WRITER_BATCH_SIZE);
}

/** @return target data-file size (bytes) for the native Delta writer */
public static long getDeltaWriterTargetFileSize() {
return Long.parseLong(getDMLConfig().getTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE));
}

public static boolean isFederatedSSL(){
return getDMLConfig().getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION);
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/apache/sysds/conf/DMLConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public class DMLConfig
public static final String CP_PARALLEL_OPS = "sysds.cp.parallel.ops";
public static final String CP_PARALLEL_IO = "sysds.cp.parallel.io";
public static final String IO_COMPRESSION_CODEC = "sysds.io.compression.encoding";
public static final String DELTA_READER_BATCH_SIZE = "sysds.io.delta.reader.batchsize"; // int: rows per parquet read batch
public static final String DELTA_WRITER_BATCH_SIZE = "sysds.io.delta.writer.batchsize"; // int: matrix rows materialized per columnar batch handed to the engine
public static final String DELTA_WRITER_TARGET_FILE_SIZE = "sysds.io.delta.writer.targetfilesize"; // long: target data-file size in bytes (smaller -> more files -> more parallel-read throughput)
public static final String PARALLEL_ENCODE = "sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and apply
public static final String PARALLEL_ENCODE_STAGED = "sysds.parallel.encode.staged";
public static final String PARALLEL_ENCODE_APPLY_BLOCKS = "sysds.parallel.encode.applyBlocks";
Expand Down Expand Up @@ -158,6 +161,9 @@ public class DMLConfig
_defaultVals.put(CP_PARALLEL_OPS, "true" );
_defaultVals.put(CP_PARALLEL_IO, "true" );
_defaultVals.put(IO_COMPRESSION_CODEC, "none");
_defaultVals.put(DELTA_READER_BATCH_SIZE, "4096"); // rows per parquet read batch (Delta Kernel default 1024)
_defaultVals.put(DELTA_WRITER_BATCH_SIZE, "4096"); // matrix rows materialized per columnar batch handed to the engine
_defaultVals.put(DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(64L * 1024 * 1024)); // 64MB target data-file size (Delta Kernel default 128MB) -> more files -> more parallel-read throughput
_defaultVals.put(PARALLEL_TOKENIZE, "false");
_defaultVals.put(PARALLEL_TOKENIZE_NUM_BLOCKS, "64");
_defaultVals.put(FRAME_TO_MATRIX_WARN_CAST, "false");
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/apache/sysds/parser/DMLTranslator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,8 @@ public void constructHops(StatementBlock sb) {
case CSV:
case LIBSVM:
case HDF5:
// write output in textcell format
case DELTA:
// columnar/text formats: no block layout (blocksize -1)
ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1);
break;
case BINARY:
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/org/apache/sysds/parser/DataExpression.java
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,10 @@ else if( getVarParam(READNNZPARAM) != null ) {

boolean isCOG = (formatTypeString != null && formatTypeString.equalsIgnoreCase(FileFormat.COG.toString()));

// Delta tables are self-describing (schema + dimensions discovered from the
// transaction log at read time), so dimensions are optional like CSV.
boolean isDelta = (formatTypeString != null && formatTypeString.equalsIgnoreCase(FileFormat.DELTA.toString()));

dataTypeString = (getVarParam(DATATYPEPARAM) == null) ? null : getVarParam(DATATYPEPARAM).toString();

if ( dataTypeString == null || dataTypeString.equalsIgnoreCase(Statement.MATRIX_DATA_TYPE)
Expand All @@ -1202,8 +1206,8 @@ else if( getVarParam(READNNZPARAM) != null ) {
// initialize size of target data identifier to UNKNOWN
getOutput().setDimensions(-1, -1);

if (!isCSV && !isLIBSVM && !isHDF5 && !isCOG && ConfigurationManager.getCompilerConfig()
.getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv/libsvm format / jmlc api
if (!isCSV && !isLIBSVM && !isHDF5 && !isCOG && !isDelta && ConfigurationManager.getCompilerConfig()
.getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv/libsvm/delta format / jmlc api
&& (getVarParam(READROWPARAM) == null || getVarParam(READCOLPARAM) == null) ) {
raiseValidateError("Missing or incomplete dimension information in read statement: "
+ mtdFileName, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
Expand All @@ -1215,7 +1219,7 @@ && getVarParam(READCOLPARAM) instanceof ConstIdentifier)
// these are strings that are long values
Long dim1 = (getVarParam(READROWPARAM) == null) ? null : Long.valueOf( getVarParam(READROWPARAM).toString());
Long dim2 = (getVarParam(READCOLPARAM) == null) ? null : Long.valueOf( getVarParam(READCOLPARAM).toString());
if ( !isCSV && (dim1 < 0 || dim2 < 0) && ConfigurationManager
if ( !isCSV && !isDelta && (dim1 < 0 || dim2 < 0) && ConfigurationManager
.getCompilerConfig().getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) ) {
raiseValidateError("Invalid dimension information in read statement", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}
Expand Down Expand Up @@ -1333,7 +1337,8 @@ else if (valueTypeString.equalsIgnoreCase(ValueType.UNKNOWN.name()))
}

//validate read filename
if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString()))
if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString())
|| checkFormatType(FileFormat.DELTA)) //delta: columnar, no block layout
getOutput().setBlocksize(-1);
else if (checkFormatType(FileFormat.BINARY, FileFormat.COMPRESSED, FileFormat.UNKNOWN)) {
if( getVarParam(ROWBLOCKCOUNTPARAM)!=null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ protected MatrixBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcep
DataConverter.readMatrixFromHDFS(fname, iimd.getFileFormat(),
rlen, clen, blen, mc.getNonZeros(), getFileFormatProperties());

if(iimd.getFileFormat() == FileFormat.CSV) {
if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) {
//dimensions/nnz are discovered at read time for these self-describing formats
_metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(newData.getDataCharacteristics(),
iimd.getFileFormat()) : new MetaData(newData.getDataCharacteristics());
}
Expand Down
Loading
Loading