Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
Expand Down Expand Up @@ -203,9 +204,14 @@ protected FrameBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcept
.createFrameReader(iimd.getFileFormat(), getFileFormatProperties())
.readFrameFromHDFS(fname, lschema, dc.getRows(), dc.getCols());

if(iimd.getFileFormat() == FileFormat.CSV)
//Delta and CSV discover dimensions (and Delta also schema) at read time, so
//refresh the cached metadata to reflect the materialized frame block.
if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) {
_metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(data.getDataCharacteristics(),
iimd.getFileFormat()) : new MetaData(data.getDataCharacteristics());
if(iimd.getFileFormat() == FileFormat.DELTA)
_schema = data.getSchema();
}

// sanity check correct output
if(data == null)
Expand Down Expand Up @@ -293,6 +299,9 @@ protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatPro

FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt, fprop);
writer.writeFrameToHDFS(_data, fname, getNumRows(), getNumColumns());

if(DMLScript.STATISTICS)
CacheStatistics.incrementHDFSWrites();
}

@Override
Expand Down
309 changes: 309 additions & 0 deletions src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.runtime.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;

import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.frame.data.columns.Array;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.ByteType;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
/**
* Single-threaded native Delta Lake reader for frames, built on the Spark-free
* Delta Kernel library. It opens the latest snapshot of a Delta table, reads
* its parquet data files through the kernel's default engine (honoring deletion
* vectors), and materializes the columns into a {@link FrameBlock} whose schema
* and column names are derived from the Delta table schema.
*
* <p>Data is extracted column-at-a-time into primitive arrays (no per-cell
* boxing or {@code FrameBlock.set} dispatch) and the frame is constructed
* directly from typed column {@link Array}s. Supported column types map to
* SystemDS value types: double, float, long, int, short, byte, boolean, and
* string. Neither the schema nor the dimensions need to be supplied; they are
* discovered from the table.</p>
*/
public class FrameReaderDelta extends FrameReader {

//per-column read codes (how to pull a value out of the Delta column vector);
//package visible so the parallel reader can reuse the same dispatch.
static final int R_DOUBLE = 0, R_FLOAT = 1, R_LONG = 2, R_INT = 3,
R_SHORT = 4, R_BYTE = 5, R_BOOLEAN = 6, R_STRING = 7;

@Override
public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException, DMLRuntimeException
{
Engine engine = DeltaKernelUtils.createEngine();
String tablePath = DeltaKernelUtils.qualify(fname);

//per-batch, per-column extracted arrays (boxing free)
ArrayList<Object[]> batchCols = new ArrayList<>();
ArrayList<Integer> batchSizes = new ArrayList<>();
int[] nrowH = new int[1];
ValueType[][] vtH = new ValueType[1][];
String[][] nameH = new String[1][];
int[][] readCodeH = new int[1][];

DeltaKernelUtils.scan(engine, tablePath, sch -> {
int ncol = sch.length();
int[] readCode = new int[ncol];
ValueType[] vt = new ValueType[ncol];
String[] cnames = new String[ncol];
for( int c=0; c<ncol; c++ ) {
DataType dt = sch.at(c).getDataType();
readCode[c] = readCode(dt, sch.at(c).getName());
vt[c] = valueType(readCode[c]);
cnames[c] = sch.at(c).getName();
}
vtH[0] = vt;
nameH[0] = cnames;
readCodeH[0] = readCode;
return (cols, size, selected) -> {
int n = DeltaKernelUtils.countSelected(size, selected);
Object[] extracted = new Object[ncol];
for( int c=0; c<ncol; c++ )
extracted[c] = extractColumn(cols[c], size, selected, n, readCode[c]);
batchCols.add(extracted);
batchSizes.add(n);
nrowH[0] += n;
};
});

ValueType[] vt = vtH[0];
String[] names2 = nameH[0];
int ncol = vt.length;
int nrow = nrowH[0];

//empty table: the typed column arrays cannot be zero-length, so return a
//schema-only frame with the discovered schema/names and zero rows.
if( nrow == 0 )
return new FrameBlock(vt, names2, 0);

//concatenate the per-batch column arrays into one typed array per column
Array<?>[] columns = new Array<?>[ncol];
for( int c=0; c<ncol; c++ )
columns[c] = buildColumn(vt[c], nrow, batchCols, batchSizes, c);

FrameBlock ret = new FrameBlock(columns);
ret.setColumnNames(names2);
return ret;
}

static Object extractColumn(ColumnVector col, int size, boolean[] selected, int n, int readCode) {
switch( readCode ) {
case R_DOUBLE: {
double[] a = new double[n];
int lr = 0;
for( int r=0; r<size; r++ ) {
if( selected != null && !selected[r] ) continue;
if( !col.isNullAt(r) ) a[lr] = col.getDouble(r);
lr++;
}
return a;
}
case R_FLOAT: {
float[] a = new float[n];
int lr = 0;
for( int r=0; r<size; r++ ) {
if( selected != null && !selected[r] ) continue;
if( !col.isNullAt(r) ) a[lr] = col.getFloat(r);
lr++;
}
return a;
}
case R_LONG: {
long[] a = new long[n];
int lr = 0;
for( int r=0; r<size; r++ ) {
if( selected != null && !selected[r] ) continue;
if( !col.isNullAt(r) ) a[lr] = col.getLong(r);
lr++;
}
return a;
}
case R_INT: {
int[] a = new int[n];
int lr = 0;
for( int r=0; r<size; r++ ) {
if( selected != null && !selected[r] ) continue;
if( !col.isNullAt(r) ) a[lr] = col.getInt(r);
lr++;
}
return a;
}
case R_SHORT: {
int[] a = new int[n];
int lr = 0;
for( int r=0; r<size; r++ ) {
if( selected != null && !selected[r] ) continue;
if( !col.isNullAt(r) ) a[lr] = col.getShort(r);
lr++;
}
return a;
}
case R_BYTE: {
int[] a = new int[n];
int lr = 0;
for( int r=0; r<size; r++ ) {
if( selected != null && !selected[r] ) continue;
if( !col.isNullAt(r) ) a[lr] = col.getByte(r);
lr++;
}
return a;
}
case R_BOOLEAN: {
boolean[] a = new boolean[n];
int lr = 0;
for( int r=0; r<size; r++ ) {
if( selected != null && !selected[r] ) continue;
if( !col.isNullAt(r) ) a[lr] = col.getBoolean(r);
lr++;
}
return a;
}
default: { // R_STRING
String[] a = new String[n];
int lr = 0;
for( int r=0; r<size; r++ ) {
if( selected != null && !selected[r] ) continue;
a[lr] = col.isNullAt(r) ? null : col.getString(r);
lr++;
}
return a;
}
}
}

static Array<?> buildColumn(ValueType vt, int nrow, ArrayList<Object[]> batchCols,
ArrayList<Integer> batchSizes, int c)
{
switch( vt ) {
case FP64: {
double[] all = new double[nrow];
int off = 0;
for( int b=0; b<batchCols.size(); b++ ) {
int n = batchSizes.get(b);
System.arraycopy(batchCols.get(b)[c], 0, all, off, n);
off += n;
}
return ArrayFactory.create(all);
}
case FP32: {
float[] all = new float[nrow];
int off = 0;
for( int b=0; b<batchCols.size(); b++ ) {
int n = batchSizes.get(b);
System.arraycopy(batchCols.get(b)[c], 0, all, off, n);
off += n;
}
return ArrayFactory.create(all);
}
case INT64: {
long[] all = new long[nrow];
int off = 0;
for( int b=0; b<batchCols.size(); b++ ) {
int n = batchSizes.get(b);
System.arraycopy(batchCols.get(b)[c], 0, all, off, n);
off += n;
}
return ArrayFactory.create(all);
}
case INT32: {
int[] all = new int[nrow];
int off = 0;
for( int b=0; b<batchCols.size(); b++ ) {
int n = batchSizes.get(b);
System.arraycopy(batchCols.get(b)[c], 0, all, off, n);
off += n;
}
return ArrayFactory.create(all);
}
case BOOLEAN: {
boolean[] all = new boolean[nrow];
int off = 0;
for( int b=0; b<batchCols.size(); b++ ) {
int n = batchSizes.get(b);
System.arraycopy(batchCols.get(b)[c], 0, all, off, n);
off += n;
}
return ArrayFactory.create(all);
}
default: { // STRING
String[] all = new String[nrow];
int off = 0;
for( int b=0; b<batchCols.size(); b++ ) {
int n = batchSizes.get(b);
System.arraycopy(batchCols.get(b)[c], 0, all, off, n);
off += n;
}
return ArrayFactory.create(all);
}
}
}

static int readCode(DataType dt, String name) {
if( dt instanceof StringType ) return R_STRING;
if( dt instanceof DoubleType ) return R_DOUBLE;
if( dt instanceof FloatType ) return R_FLOAT;
if( dt instanceof LongType ) return R_LONG;
if( dt instanceof IntegerType ) return R_INT;
if( dt instanceof ShortType ) return R_SHORT;
if( dt instanceof ByteType ) return R_BYTE;
if( dt instanceof BooleanType ) return R_BOOLEAN;
throw new DMLRuntimeException("Unsupported non-mappable Delta column '" + name
+ "' of type " + dt + " for frame read.");
}

static ValueType valueType(int readCode) {
switch( readCode ) {
case R_DOUBLE: return ValueType.FP64;
case R_FLOAT: return ValueType.FP32;
case R_LONG: return ValueType.INT64;
case R_INT:
case R_SHORT:
case R_BYTE: return ValueType.INT32;
case R_BOOLEAN: return ValueType.BOOLEAN;
default: return ValueType.STRING;
}
}

@Override
public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException, DMLRuntimeException
{
throw new UnsupportedOperationException(
"Reading a Delta table from an input stream is not supported; Delta is a directory-based table format.");
}
}
Loading
Loading