diff --git a/README.md b/README.md index 0f948786165..8248a8a7e78 100644 --- a/README.md +++ b/README.md @@ -45,3 +45,226 @@ To build from source visit [SystemDS Install from source](https://apache.github. [![Python Test](https://github.com/apache/systemds/actions/workflows/python.yml/badge.svg?branch=main)](https://github.com/apache/systemds/actions/workflows/python.yml) [![Total PyPI downloads](https://static.pepy.tech/personalized-badge/systemds?units=abbreviation&period=total&left_color=grey&right_color=blue&left_text=Total%20PyPI%20Downloads)](https://pepy.tech/project/systemds) [![Monthly PyPI downloads](https://static.pepy.tech/personalized-badge/systemds?units=abbreviation&left_color=grey&right_color=blue&left_text=Monthly%20PyPI%20Downloads)](https://pepy.tech/project/systemds) + + +## Term Project Plan: OOC Operator Extensions + + +### Project Summary + +This term project extends Apache SystemDS Out-of-Core (OOC) execution support for two operators: + +1. `cov(A, B)` and `cov(A, B, W)` covariance operations. +2. `TSMMOOCInstruction`, the specialized transposed self matrix multiplication instruction used for both `t(X) %*% X` and `X %*% t(X)`. + +The project originally had a six-week implementation plan, but the schedule is now compressed to four active weeks. The revised scope emphasizes test-first development, includes both TSMM directions, removes Spark benchmarking from scope, includes performance tests in the PR, and opens the draft PR early enough for external feedback. + +### Team Roles + +| Role | Main Responsibility | +| --- | --- | +| Person A | Covariance OOC implementation lead | +| Person B | TSMM OOC implementation lead | +| Person C | Testing, benchmarking, integration, and PR coordination lead | + +All team members should review each other's code weekly. Each major task should begin with a failing correctness test and should be developed in small commits. + + +### Four-Week Deliverables by Person + +| Week | Person A: Covariance Lead | Person B: TSMM Lead | Person C: Testing, Benchmarking, and PR Coordination | +| --- | --- | --- | --- | +| Week 1 | Write failing OOC tests for `cov(A, B)`; study CP/Spark covariance and OOC central moment references; start `CovarianceOOCInstruction.java`; wire initial parser support. | Write failing multi-tile tests for both `t(X) %*% X` and `X %*% t(X)`; study `TSMMOOCInstruction`, `MMultOOCInstruction`, and `Tsmm2SPInstruction`; document LEFT/RIGHT output indexing. | Verify setup and code style; add first OOC covariance DML script; define CP-vs-OOC benchmark matrix sizes, sparsities, and block sizes; verify all new tests fail for the expected reason. | +| Week 2 | Complete unweighted covariance; implement weighted `cov(A, B, W)`; add stream joins, `CmCovObject` reduction, dimension checks, and block-size validation. | Remove the single-output-block limitation; produce diagonal and off-diagonal output tiles; implement LEFT and RIGHT TSMM indexing prototype. | Add weighted covariance DML/test files; compare covariance OOC against CP; create performance test skeleton in the existing Java performance test area. | +| Week 3 | Stabilize covariance; add dense/sparse, weighted/unweighted, and error-handling tests; verify covariance OOC heavy hitters; review TSMM implementation. | Complete LEFT and RIGHT multi-block TSMM; verify output metadata and symmetric off-diagonal tiles; review covariance implementation. | Extend `TransposeSelfMMTest`; run focused correctness tests; implement CP-vs-OOC performance tests; collect preliminary numbers; open draft PR and request early review. | +| Week 4 | Address covariance feedback; finalize tests; remove debug code; check comments and style. | Address TSMM feedback; finalize helper methods, memory behavior, and output metadata handling. | Run final correctness tests and warm-up benchmarks; update PR description with final tables, tests run, limitations, and final submission notes. | + +### Relevant Source Files + +#### Covariance + +| Purpose | File | +| --- | --- | +| CP covariance reference | `src/main/java/org/apache/sysds/runtime/instructions/cp/CovarianceCPInstruction.java` | +| Spark covariance reference for algorithmic behavior only | `src/main/java/org/apache/sysds/runtime/instructions/spark/CovarianceSPInstruction.java` | +| OOC central moment reference | `src/main/java/org/apache/sysds/runtime/instructions/ooc/CentralMomentOOCInstruction.java` | +| OOC parser | `src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java` | +| OOC instruction type enum | `src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java` | + +#### TSMM + +| Purpose | File | +| --- | --- | +| Current OOC TSMM implementation | `src/main/java/org/apache/sysds/runtime/instructions/ooc/TSMMOOCInstruction.java` | +| OOC matrix multiplication reference | `src/main/java/org/apache/sysds/runtime/instructions/ooc/MMultOOCInstruction.java` | +| Multi-block TSMM reference for algorithmic behavior only | `src/main/java/org/apache/sysds/runtime/instructions/spark/Tsmm2SPInstruction.java` | +| Current OOC TSMM test | `src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java` | +| Current OOC TSMM DML script | `src/test/scripts/functions/ooc/TSMM.dml` | + +### Expected Deliverables + +- OOC support for unweighted covariance: `cov(A, B)`. +- OOC support for weighted covariance: `cov(A, B, W)`. +- OOC TSMM support for `t(X) %*% X` with arbitrary output dimensions. +- OOC TSMM support for `X %*% t(X)` with arbitrary output dimensions. +- Correctness tests comparing OOC output against CP output. +- Dense and sparse test cases. +- Error handling tests for invalid dimensions or unsupported cases. +- Performance test code included in the PR. +- Benchmark results included in the PR description. +- A draft PR opened by the end of Week 3 for early review. +- Final cleaned PR before the deadline. + +### Four-Week Project Plan + +#### Week 1: Test-First Setup and Initial Covariance Implementation + +**Goal:** Establish failing correctness tests first, then implement the smallest useful covariance OOC path. + +| Person | Weekly Deliverables | +| --- | --- | +| Person A: Covariance Lead | Study `CovarianceCPInstruction.java`, `CovarianceSPInstruction.java`, and `CentralMomentOOCInstruction.java`; create failing OOC tests for `cov(A, B)`; implement initial `CovarianceOOCInstruction.java`; wire covariance into `OOCInstructionParser.java`. | +| Person B: TSMM Lead | Study `TSMMOOCInstruction.java`, `MMultOOCInstruction.java`, and `Tsmm2SPInstruction.java`; create failing tests for multi-tile `t(X) %*% X`; create failing tests for multi-tile `X %*% t(X)`; document output tile indexing for both directions. | +| Person C: Testing, Benchmarking, Integration | Verify Java, Maven, and IDE style setup; add OOC covariance DML script; define CP-vs-OOC benchmark dimensions, sparsities, and block sizes; confirm all new tests fail for expected missing-support reasons. | + +##### Week 1 Required Output + +- Failing unweighted covariance OOC tests. +- Failing LEFT TSMM multi-tile tests for `t(X) %*% X`. +- Failing RIGHT TSMM multi-tile tests for `X %*% t(X)`. +- Initial covariance parser wiring started or completed. +- Benchmark plan focused only on CP vs OOC. + +#### Week 2: Complete Covariance and Build TSMM Multi-Block Prototype + +**Goal:** Finish covariance correctness and produce a reviewable TSMM prototype for both directions. + +| Person | Weekly Deliverables | +| --- | --- | +| Person A: Covariance Lead | Complete unweighted `cov(A, B)`; implement weighted `cov(A, B, W)`; join `A`, `B`, and `W` streams by block index; reduce `CmCovObject` partials; add dimension and block-size validation. | +| Person B: TSMM Lead | Replace the single-output-block restriction in `TSMMOOCInstruction.java`; generate diagonal partial output tiles; generate off-diagonal partial output tiles; handle both LEFT and RIGHT TSMM indexing. | +| Person C: Testing, Benchmarking, Integration | Add `CovarianceWeights.dml` and `CovarianceWeightsTest.java`; compare covariance OOC results against CP; add initial performance test skeleton under the existing Java performance test area. | + +##### Week 2 Required Output + +- Dense and sparse unweighted covariance tests passing. +- Dense and sparse weighted covariance tests passing or close to passing. +- TSMM prototype emits multi-tile outputs for both directions. +- Performance test code skeleton exists and is committed. + +#### Week 3: TSMM Correctness, Benchmarks, and Early Draft PR + +**Goal:** Make both operators reviewable, collect first benchmark numbers, and open the draft PR early enough for feedback. + +| Person | Weekly Deliverables | +| --- | --- | +| Person A: Covariance Lead | Stabilize covariance implementation; add error handling tests; verify OOC heavy hitters include covariance; review Person B's TSMM code. | +| Person B: TSMM Lead | Complete multi-block TSMM for `t(X) %*% X`; complete multi-block TSMM for `X %*% t(X)`; verify output matrix characteristics; verify symmetric off-diagonal tiles; review Person A's covariance code. | +| Person C: Testing, Benchmarking, Integration | Extend `TransposeSelfMMTest.java`; run focused correctness tests; implement CP-vs-OOC performance tests; collect preliminary benchmark numbers; open draft PR and request early review. | + +##### Week 3 Required Output + +- Covariance implementation complete. +- TSMM LEFT and RIGHT correctness tests passing for main dense and sparse cases. +- Initial benchmark results collected. +- Draft PR opened before the end of the week. +- PR description includes tests run, benchmark setup, preliminary numbers, and known limitations. + +#### Week 4: Review Feedback, Final Benchmarks, and Submission Cleanup + +**Goal:** Use review feedback, finalize benchmarks, clean the PR, and prepare final submission. + +| Person | Weekly Deliverables | +| --- | --- | +| Person A: Covariance Lead | Address covariance review comments; finalize covariance tests; remove debug code; check code style and comments. | +| Person B: TSMM Lead | Address TSMM review comments; finalize TSMM tests; simplify helper methods; check memory behavior and output metadata handling. | +| Person C: Testing, Benchmarking, Integration | Run final focused tests; run final benchmark measurements with warm-up runs; update PR description with final benchmark tables; coordinate final PR cleanup. | + +##### Week 4 Required Output + +- Review feedback addressed where possible. +- Focused correctness tests passing. +- Performance tests included in the PR. +- Final benchmark results included in PR description. +- PR ready for final grading/submission. + +### Weekly Review Checklist + +| Week | Required Review Output | +| --- | --- | +| Week 1 | Failing tests exist and fail for expected missing OOC support. | +| Week 2 | Covariance is mostly complete; TSMM multi-block prototype is reviewable. | +| Week 3 | Both operators are correctness-testable; draft PR is open for review. | +| Week 4 | Review feedback is addressed; final tests and benchmarks are documented. | + +### Suggested Git Workflow + +Use one feature branch with separate commits for covariance, TSMM, tests, and benchmarks: + +```bash +git switch -c term-ooc-covariance-tsmm +``` + +Recommended commit structure: + +```text +test: add failing OOC covariance tests +feat: add OOC covariance instruction +test: add weighted OOC covariance tests +test: add multi-block OOC TSMM tests +feat: extend OOC TSMM to left and right multi-block outputs +bench: add OOC covariance and TSMM performance tests +docs: add benchmark results to PR notes +``` + +### Suggested Test Commands + +```bash +mvn -Dtest=org.apache.sysds.test.functions.ooc.CovarianceTest test +mvn -Dtest=org.apache.sysds.test.functions.ooc.CovarianceWeightsTest test +mvn -Dtest=org.apache.sysds.test.functions.ooc.TransposeSelfMMTest test +``` + +Run broader OOC tests if time allows: + +```bash +mvn -Dtest=org.apache.sysds.test.functions.ooc.* test +``` + +### Benchmark Requirements + +Benchmarks should compare CP vs OOC only. Spark benchmarking is intentionally removed from scope. + +Benchmark variables: + +- covariance weighted vs unweighted +- dense vs sparse inputs +- TSMM LEFT: `t(X) %*% X` +- TSMM RIGHT: `X %*% t(X)` +- single-tile output vs multi-tile output +- block sizes such as 500, 1000, and 2000 if feasible +- JVM warm-up runs before measured runs + +Benchmark output should include: + +- matrix dimensions +- sparsity +- block size +- execution mode +- warm-up count +- measured runtime +- short interpretation of results + +### Success Criteria + +- `cov(A, B)` works in OOC mode. +- `cov(A, B, W)` works in OOC mode. +- `t(X) %*% X` works in OOC mode when the output has more than one tile. +- `X %*% t(X)` works in OOC mode when the output has more than one tile. +- Correctness tests compare OOC output against CP output. +- Tests cover dense and sparse inputs. +- Benchmarks include warm-up runs. +- Performance test code is included in the PR. +- Benchmark results are included in the PR description. +- Draft PR is opened by the end of Week 3 for feedback. +- Final PR follows Apache SystemDS style conventions. diff --git a/dev/ooc-benchmark-plan.md b/dev/ooc-benchmark-plan.md new file mode 100644 index 00000000000..bd87c24c064 --- /dev/null +++ b/dev/ooc-benchmark-plan.md @@ -0,0 +1,43 @@ +# OOC Covariance and TSMM Benchmark Plan + +## Scope + +Benchmarks compare local CP execution against local OOC execution only. + +## Operators + +- cov(A, B) +- cov(A, B, W) +- t(X) %*% X +- X %*% t(X) + +## Benchmark Variables + +- Dense and sparse inputs +- Weighted and unweighted covariance +- TSMM LEFT: t(X) %*% X +- TSMM RIGHT: X %*% t(X) +- Single-tile and multi-tile TSMM outputs +- Block sizes: 500, 1000, and 2000 where feasible +- Warm-up runs before measured runs + +## Initial Matrix Plan + +| Operator | Case | Matrix size | Sparsity | Block size | Comparison | +|---|---:|---:|---:|---:|---| +| cov(A,B) | dense | 10000 x 1 | 0.9 | 1000 | CP vs OOC | +| cov(A,B) | sparse | 10000 x 1 | 0.1 | 1000 | CP vs OOC | +| cov(A,B,W) | dense | 10000 x 1 | 0.9 | 1000 | CP vs OOC | +| cov(A,B,W) | sparse | 10000 x 1 | 0.1 | 1000 | CP vs OOC | +| t(X)%*%X | LEFT single-tile | 10000 x 100 | 0.9 | 1000 | CP vs OOC | +| t(X)%*%X | LEFT multi-tile | 10000 x 3000 | 0.2 | 1000 | CP vs OOC | +| X%*%t(X) | RIGHT single-tile | 100 x 10000 | 0.9 | 1000 | CP vs OOC | +| X%*%t(X) | RIGHT multi-tile | 3000 x 100 | 0.2 | 1000 | CP vs OOC | + +## Measurement Plan + +- 1 or 2 warm-up runs +- 3 measured runs +- Report average runtime +- Report matrix dimensions, sparsity, block size, execution mode, and operator +- Verify correctness against CP output before interpreting runtime diff --git a/hello.dml b/hello.dml new file mode 100644 index 00000000000..7df869a15e7 --- /dev/null +++ b/hello.dml @@ -0,0 +1 @@ +print("Hello, World!") diff --git a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java index ae41639687b..10f5030e637 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java @@ -32,6 +32,7 @@ import org.apache.sysds.runtime.instructions.ooc.CtableOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.IndexingOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.DataGenOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.CovarianceOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.OOCInstruction; import org.apache.sysds.runtime.instructions.ooc.ParameterizedBuiltinOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction; @@ -102,6 +103,8 @@ else if(parts.length == 4) return TeeOOCInstruction.parseInstruction(str); case CentralMoment: return CentralMomentOOCInstruction.parseInstruction(str); + case Covariance: + return CovarianceOOCInstruction.parseInstruction(str); case Ctable: return CtableOOCInstruction.parseInstruction(str); case ParameterizedBuiltin: diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CentralMomentOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CentralMomentOOCInstruction.java index 1c73b636341..f642ec3503a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CentralMomentOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CentralMomentOOCInstruction.java @@ -35,78 +35,77 @@ public class CentralMomentOOCInstruction extends AggregateUnaryOOCInstruction { - private CentralMomentOOCInstruction(CMOperator cm, CPOperand in1, CPOperand in2, CPOperand in3, CPOperand out, - String opcode, String str) { - super(OOCType.CM, cm, in1, in2, in3, out, opcode, str); - } - - public static CentralMomentOOCInstruction parseInstruction(String str) { - CentralMomentCPInstruction cpInst = CentralMomentCPInstruction.parseInstruction(str); - return parseInstruction(cpInst); - } - - public static CentralMomentOOCInstruction parseInstruction(CentralMomentCPInstruction inst) { - return new CentralMomentOOCInstruction((CMOperator) inst.getOperator(), inst.input1, inst.input2, inst.input3, - inst.output, inst.getOpcode(), inst.getInstructionString()); - } - - @Override - public void processInstruction(ExecutionContext ec) { - String output_name = output.getName(); - - /* - * The "order" of the central moment in the instruction can - * be set to INVALID when the exact value is unknown at - * compilation time. We first need to determine the exact - * order and update the CMOperator, if needed. - */ - - MatrixObject matObj = ec.getMatrixObject(input1.getName()); - OOCStream qIn = matObj.getStreamHandle(); - - CPOperand scalarInput = (input3 == null ? input2 : input3); - ScalarObject order = ec.getScalarInput(scalarInput); - - CMOperator cm_op = ((CMOperator) _optr); - if(cm_op.getAggOpType() == CMOperator.AggregateOperationTypes.INVALID) - cm_op = cm_op.setCMAggOp((int) order.getLongValue()); - - CMOperator finalCm_op = cm_op; - - OOCStream cmObjs = createWritableStream(); - - if(input3 == null) { - mapOOC(qIn, cmObjs, tmp -> ((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op))); // Need to copy CMOperator as its ValueFunction is stateful - } - else { - // Here we use a hash join approach - // Note that this may keep blocks in the cache for a while, depending on when a matching block arrives in the stream - MatrixObject wtObj = ec.getMatrixObject(input2.getName()); - - DataCharacteristics dc = ec.getDataCharacteristics(input1.getName()); - DataCharacteristics dcW = ec.getDataCharacteristics(input2.getName()); - - if (dc.getBlocksize() != dcW.getBlocksize()) - throw new DMLRuntimeException("Different block sizes are not yet supported"); - - OOCStream wIn = wtObj.getStreamHandle(); - - joinOOC(qIn, wIn, cmObjs, - (tmp, weights) -> - ((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op), (MatrixBlock) weights.getValue()), - IndexedMatrixValue::getIndexes); - } - - try { - CmCovObject agg = cmObjs.dequeue(); - CmCovObject next; - - while ((next = cmObjs.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) - agg = (CmCovObject) finalCm_op.fn.execute(agg, next); - - ec.setScalarOutput(output_name, new DoubleObject(agg.getRequiredResult(finalCm_op))); - } catch (Exception ex) { - throw new DMLRuntimeException(ex); - } - } + private CentralMomentOOCInstruction(CMOperator cm, CPOperand in1, CPOperand in2, CPOperand in3, CPOperand out, + String opcode, String str) { + super(OOCType.CM, cm, in1, in2, in3, out, opcode, str); + } + + public static CentralMomentOOCInstruction parseInstruction(String str) { + CentralMomentCPInstruction cpInst = CentralMomentCPInstruction.parseInstruction(str); + return parseInstruction(cpInst); + } + + public static CentralMomentOOCInstruction parseInstruction(CentralMomentCPInstruction inst) { + return new CentralMomentOOCInstruction((CMOperator) inst.getOperator(), inst.input1, inst.input2, inst.input3, + inst.output, inst.getOpcode(), inst.getInstructionString()); + } + + @Override + public void processInstruction(ExecutionContext ec) { + String output_name = output.getName(); + + /* + * The "order" of the central moment in the instruction can + * be set to INVALID when the exact value is unknown at + * compilation time. We first need to determine the exact + * order and update the CMOperator, if needed. + */ + + MatrixObject matObj = ec.getMatrixObject(input1.getName()); + OOCStream qIn = matObj.getStreamHandle(); + + CPOperand scalarInput = (input3 == null ? input2 : input3); + ScalarObject order = ec.getScalarInput(scalarInput); + + CMOperator cm_op = ((CMOperator) _optr); + if (cm_op.getAggOpType() == CMOperator.AggregateOperationTypes.INVALID) + cm_op = cm_op.setCMAggOp((int) order.getLongValue()); + + CMOperator finalCm_op = cm_op; + + OOCStream cmObjs = createWritableStream(); + + if (input3 == null) { + mapOOC(qIn, cmObjs, tmp -> ((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op))); // Need to copy CMOperator as its ValueFunction is stateful + } else { + // Here we use a hash join approach + // Note that this may keep blocks in the cache for a while, depending on when a matching block arrives in the stream + MatrixObject wtObj = ec.getMatrixObject(input2.getName()); + + DataCharacteristics dc = ec.getDataCharacteristics(input1.getName()); + DataCharacteristics dcW = ec.getDataCharacteristics(input2.getName()); + + if (dc.getBlocksize() != dcW.getBlocksize()) + throw new DMLRuntimeException("Different block sizes are not yet supported"); + + OOCStream wIn = wtObj.getStreamHandle(); + + joinOOC(qIn, wIn, cmObjs, + (tmp, weights) -> + ((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op), (MatrixBlock) weights.getValue()), + IndexedMatrixValue::getIndexes); + } + + try { + CmCovObject agg = cmObjs.dequeue(); + CmCovObject next; + + while ((next = cmObjs.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) + agg = (CmCovObject) finalCm_op.fn.execute(agg, next); + + ec.setScalarOutput(output_name, new DoubleObject(agg.getRequiredResult(finalCm_op))); + } catch (Exception ex) { + throw new DMLRuntimeException(ex); + } + } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CovarianceOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CovarianceOOCInstruction.java new file mode 100644 index 00000000000..cee08b36f89 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CovarianceOOCInstruction.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.instructions.ooc; + +import java.util.List; + +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.common.Types.DataType; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.functionobjects.COV; +import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.cp.CmCovObject; +import org.apache.sysds.runtime.instructions.cp.DoubleObject; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.COVOperator; +import org.apache.sysds.runtime.meta.DataCharacteristics; +public class CovarianceOOCInstruction extends ComputationOOCInstruction { + + private CovarianceOOCInstruction(COVOperator cov, CPOperand in1, CPOperand in2, CPOperand in3, CPOperand out, + String opcode, String str) { + super(OOCType.COV, cov, in1, in2, in3, out, opcode, str); + } + + public static CovarianceOOCInstruction parseInstruction(String str) { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + String opcode = parts[0]; + + if(!opcode.equalsIgnoreCase(Opcodes.COV.toString())) + throw new DMLRuntimeException("CovarianceOOCInstruction.parseInstruction():: Unknown opcode " + opcode); + + // the OOC instruction string matches the Spark format, + + COVOperator cov = new COVOperator(COV.getCOMFnObject()); + if(parts.length == 4) { // this is the case for unweighted cov.A.B.out + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand out = new CPOperand(parts[3]); + return new CovarianceOOCInstruction(cov, in1, in2, null, out, opcode, str); + } + else if(parts.length == 5) {// this is the case for weighted cov.A.B.W.out + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand in3 = new CPOperand(parts[3]); + CPOperand out = new CPOperand(parts[4]); + return new CovarianceOOCInstruction(cov, in1, in2, in3, out, opcode, str); + } + else { + throw new DMLRuntimeException("Invalid number of arguments in Instruction: " + str); + } + } + + @Override + public void processInstruction(ExecutionContext ec) { + COVOperator cov_op = (COVOperator) _optr; + + MatrixObject mo1 = ec.getMatrixObject(input1.getName()); + MatrixObject mo2 = ec.getMatrixObject(input2.getName()); + + OOCStream q1 = mo1.getStreamHandle(); + OOCStream q2 = mo2.getStreamHandle(); + + OOCStream covObjs = createWritableStream(); + + if(input3 == null) { + // unweighted covariance join the two tile streams by block index + joinOOC(q1, q2, covObjs, + (a, b) -> ((MatrixBlock) a.getValue()).covOperations(cov_op, (MatrixBlock) b.getValue()), + IndexedMatrixValue::getIndexes); + } + else { + // weighted covariance additionally join the weights tile stream + MatrixObject mo3 = ec.getMatrixObject(input3.getName()); + + DataCharacteristics dc1 = ec.getDataCharacteristics(input1.getName()); + DataCharacteristics dc2 = ec.getDataCharacteristics(input2.getName()); + DataCharacteristics dc3 = ec.getDataCharacteristics(input3.getName()); + if(dc1.getBlocksize() != dc2.getBlocksize() || dc1.getBlocksize() != dc3.getBlocksize()) + throw new DMLRuntimeException("Different block sizes are not yet supported"); + + OOCStream q3 = mo3.getStreamHandle(); + + joinOOC(List.of(q1, q2, q3), covObjs, + tiles -> ((MatrixBlock) tiles.get(0).getValue()).covOperations(cov_op, + (MatrixBlock) tiles.get(1).getValue(), (MatrixBlock) tiles.get(2).getValue()), + IndexedMatrixValue::getIndexes); + } + + try { + CmCovObject agg = covObjs.dequeue(); + CmCovObject next; + + while((next = covObjs.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) + agg = (CmCovObject) cov_op.fn.execute(agg, next); + + ec.setScalarOutput(output.getName(), new DoubleObject(agg.getRequiredResult(cov_op))); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + } +} + diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java index 679e7187e5e..9b5bdd7f4f5 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java @@ -80,7 +80,7 @@ public abstract class OOCInstruction extends Instruction { public enum OOCType { Reblock, Tee, Binary, Ternary, Unary, AggregateUnary, AggregateBinary, AggregateTernary, MAPMM, MMTSJ, - MAPMMCHAIN, Reorg, CM, Ctable, MatrixIndexing, ParameterizedBuiltin, Rand, Append, Quaternary + MAPMMCHAIN, Reorg, CM, COV, Ctable, MatrixIndexing, ParameterizedBuiltin, Rand, Append, Quaternary } protected final OOCInstruction.OOCType _ooctype; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TSMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TSMMOOCInstruction.java index e0207940409..cf52293334c 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TSMMOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TSMMOOCInstruction.java @@ -19,18 +19,22 @@ package org.apache.sysds.runtime.instructions.ooc; +import java.util.List; +import java.util.concurrent.CompletableFuture; + import org.apache.sysds.common.Opcodes; import org.apache.sysds.lops.MMTSJ; import org.apache.sysds.lops.MMTSJ.MMTSJType; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.functionobjects.Multiply; import org.apache.sysds.runtime.functionobjects.Plus; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.LibMatrixReorg; import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysds.runtime.matrix.operators.AggregateOperator; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; @@ -48,7 +52,7 @@ public static TSMMOOCInstruction parseInstruction(String str) { String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); InstructionUtils.checkNumFields(parts, 3); String opcode = parts[0]; - CPOperand in1 = new CPOperand(parts[1]); // the large matrix (streamed), columns <= blocksize + CPOperand in1 = new CPOperand(parts[1]); // the large matrix (streamed) CPOperand out = new CPOperand(parts[2]); MMTSJ.MMTSJType mmtsjType = MMTSJ.MMTSJType.valueOf(parts[3]); @@ -59,39 +63,77 @@ public static TSMMOOCInstruction parseInstruction(String str) { } @Override - public void processInstruction( ExecutionContext ec ) { + public void processInstruction(ExecutionContext ec) { MatrixObject min = ec.getMatrixObject(input1); - int nRows = (int) min.getDataCharacteristics().getRows(); - int nCols = (int) min.getDataCharacteristics().getCols(); - int bLen = min.getDataCharacteristics().getBlocksize(); - - OOCStream qIn = min.getStreamHandle(); + int numRowBlocks = Math.toIntExact(min.getDataCharacteristics().getNumRowBlocks()); + int numColBlocks = Math.toIntExact(min.getDataCharacteristics().getNumColBlocks()); + int blocksPerJoinGroup = _type.isLeft() ? numColBlocks : numRowBlocks; + int partialsPerOutput = _type.isLeft() ? numRowBlocks : numColBlocks; + + OOCStreamable inputStreamable = min.getStreamable(); + boolean createdCache = !inputStreamable.hasStreamCache(); + CachingStream inputCache = createdCache ? new CachingStream(min.getStreamHandle()) + : inputStreamable.getStreamCache(); + + OOCStream> groupedPartials = createWritableStream(); + OOCStream partials = createWritableStream(); + OOCStream out = createWritableStream(); + ec.getMatrixObject(output).setStreamHandle(out); + + joinManyOOC(inputCache.getReadStream(), inputCache.getReadStream(), groupedPartials, + this::createPartialOutputTiles, this::getJoinIndex, this::getJoinIndex, + blocksPerJoinGroup, blocksPerJoinGroup); + expandOOC(groupedPartials, partials, values -> values); + BinaryOperator plus = InstructionUtils.parseBinaryOperator(Opcodes.PLUS.toString()); + CompletableFuture outFuture = groupedReduceOOC(partials, out, (left, right) -> { + MatrixBlock result = ((MatrixBlock) left.getValue()).binaryOperationsInPlace(plus, right.getValue()); + left.setValue(result); + return left; + }, partialsPerOutput); + + outFuture.whenComplete((result, error) -> { + if(createdCache) + inputCache.scheduleDeletion(); + }); + } + + private long getJoinIndex(IndexedMatrixValue value) { + return _type.isLeft() ? value.getIndexes().getRowIndex() : value.getIndexes().getColumnIndex(); + } + + private long getOutputIndex(IndexedMatrixValue value) { + return _type.isLeft() ? value.getIndexes().getColumnIndex() : value.getIndexes().getRowIndex(); + } + + private List createPartialOutputTiles(IndexedMatrixValue left, IndexedMatrixValue right) { + long leftIndex = getOutputIndex(left); + long rightIndex = getOutputIndex(right); + if(leftIndex > rightIndex) + return List.of(); + + MatrixBlock leftBlock = (MatrixBlock) left.getValue(); + MatrixBlock rightBlock = (MatrixBlock) right.getValue(); + if(leftIndex == rightIndex) { + MatrixBlock diagonal = leftBlock.transposeSelfMatrixMultOperations(new MatrixBlock(), _type); + return List.of(new IndexedMatrixValue(new MatrixIndexes(leftIndex, rightIndex), diagonal)); + } - //validation check TODO extend compiler to not create OOC otherwise - if( (_type.isLeft() && nCols > bLen) - || (_type.isRight() && nRows > bLen) ) - { - throw new UnsupportedOperationException(); + MatrixBlock partial; + if(_type.isLeft()) { + MatrixBlock leftTranspose = LibMatrixReorg.transpose(leftBlock); + partial = leftTranspose.aggregateBinaryOperations(leftTranspose, rightBlock, new MatrixBlock(), + (AggregateBinaryOperator) _optr); } - - //int dim = _type.isLeft() ? nCols : nRows; - MatrixBlock resultBlock = null; - - OOCStream tmpStream = createWritableStream(); - - mapOOC(qIn, tmpStream, - tmp -> ((MatrixBlock) tmp.getValue()) - .transposeSelfMatrixMultOperations(new MatrixBlock(), _type)); - - MatrixBlock tmp; - while ((tmp = tmpStream.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) { - if (resultBlock == null) - resultBlock = tmp; - else - resultBlock.binaryOperationsInPlace(plus, tmp); + else { + MatrixBlock rightTranspose = LibMatrixReorg.transpose(rightBlock); + partial = leftBlock.aggregateBinaryOperations(leftBlock, rightTranspose, new MatrixBlock(), + (AggregateBinaryOperator) _optr); } - ec.setMatrixOutput(output.getName(), resultBlock); + MatrixBlock mirror = LibMatrixReorg.transpose(partial); + return List.of( + new IndexedMatrixValue(new MatrixIndexes(leftIndex, rightIndex), partial), + new IndexedMatrixValue(new MatrixIndexes(rightIndex, leftIndex), mirror)); } } diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java new file mode 100644 index 00000000000..833ba68b28f --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import java.io.IOException; + +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +public class CovarianceTest extends AutomatedTestBase { + private final static String TEST_NAME = "Covariance"; + private final static String TEST_DIR = "functions/ooc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + CovarianceTest.class.getSimpleName() + "/"; + private final static double eps = 1e-10; + + private final static String INPUT_A = "A"; + private final static String INPUT_B = "B"; + private final static String OUTPUT_CP = "R_CP"; + private final static String OUTPUT_OOC = "R_OOC"; + + private final static int rows = 1871; + private final static int cols = 1; + private final static int blocksize = 1000; + private final static int maxVal = 7; + + private final static double denseSparsity = 0.65; + private final static double sparseSparsity = 0.05; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME, + new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] { OUTPUT_CP, OUTPUT_OOC })); + } + + @Test + public void testCovarianceDenseOOC() { + runCovarianceOOCCompareTest(false); + } + + @Test + public void testCovarianceSparseOOC() { + runCovarianceOOCCompareTest(true); + } + + private void runCovarianceOOCCompareTest(boolean sparse) { + Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); + + try { + getAndLoadTestConfiguration(TEST_NAME); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + + double sparsity = sparse ? sparseSparsity : denseSparsity; + + double[][] A = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 7); + double[][] B = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 823); + + MatrixBlock ABlock = DataConverter.convertToMatrixBlock(A); + MatrixBlock BBlock = DataConverter.convertToMatrixBlock(B); + + writeBinaryMatrix(INPUT_A, ABlock, rows, cols, blocksize); + writeBinaryMatrix(INPUT_B, BBlock, rows, cols, blocksize); + + // Reference run: normal single-node CP execution. + programArgs = new String[] { + "-args", input(INPUT_A), input(INPUT_B), output(OUTPUT_CP) + }; + runTest(true, false, null, -1); + + // OOC run: this is expected to fail in Week 1 until covariance OOC support is implemented. + programArgs = new String[] { + "-explain", "-stats", "-ooc", + "-args", input(INPUT_A), input(INPUT_B), output(OUTPUT_OOC) + }; + runTest(true, false, null, -1); + + MatrixBlock cpResult = DataConverter.readMatrixFromHDFS( + output(OUTPUT_CP), Types.FileFormat.BINARY, 1, 1, blocksize, 1); + + MatrixBlock oocResult = DataConverter.readMatrixFromHDFS( + output(OUTPUT_OOC), Types.FileFormat.BINARY, 1, 1, blocksize, 1); + + TestUtils.compareMatrices(cpResult, oocResult, eps); + } + catch(IOException ex) { + throw new RuntimeException(ex); + } + finally { + resetExecMode(platformOld); + } + } + + private void writeBinaryMatrix(String name, MatrixBlock mb, int rows, int cols, int blocksize) throws IOException { + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); + writer.writeMatrixToHDFS(mb, input(name), rows, cols, blocksize, mb.getNonZeros()); + + HDFSTool.writeMetaDataFile(input(name + ".mtd"), + Types.ValueType.FP64, + new MatrixCharacteristics(rows, cols, blocksize, mb.getNonZeros()), + Types.FileFormat.BINARY); + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceWeightsTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceWeightsTest.java new file mode 100644 index 00000000000..28bdfcc56a6 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceWeightsTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.instructions.Instruction; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixValue; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; + +public class CovarianceWeightsTest extends AutomatedTestBase { + private final static String TEST_NAME1 = "CovarianceWeights"; + private final static String TEST_DIR = "functions/ooc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + CovarianceWeightsTest.class.getSimpleName() + "/"; + private final static double eps = 1e-8; + private static final String INPUT_NAME_A = "X"; + private static final String INPUT_NAME_B = "Y"; + private static final String INPUT_NAME_W = "W"; + private static final String OUTPUT_NAME = "res"; + + private final static int rows = 1871; + private final static int maxVal = 7; + private final static double sparsity1 = 0.65; + private final static double sparsity2 = 0.05; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1); + addTestConfiguration(TEST_NAME1, config); + } + + @Test + public void testCovarianceWeightsDense() { + runCovarianceWeightsTest(false); + } + + @Test + public void testCovarianceWeightsSparse() { + runCovarianceWeightsTest(true); + } + + private void runCovarianceWeightsTest(boolean sparse) { + Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); + + try { + getAndLoadTestConfiguration(TEST_NAME1); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[] {"-explain", "-stats", "-ooc", "-args", input(INPUT_NAME_A), + input(INPUT_NAME_B), input(INPUT_NAME_W), output(OUTPUT_NAME)}; + + // 1. Generate the data in-memory as MatrixBlock objects (1-d column vectors) + double[][] A_data = getRandomMatrix(rows, 1, 1, maxVal, sparse ? sparsity2 : sparsity1, 7); + double[][] B_data = getRandomMatrix(rows, 1, 1, maxVal, sparse ? sparsity2 : sparsity1, 3); + double[][] W_data = getRandomMatrix(rows, 1, 0, 1, 1.0, 7); + + // 2. Convert the double arrays to MatrixBlock objects + MatrixBlock A_mb = DataConverter.convertToMatrixBlock(A_data); + MatrixBlock B_mb = DataConverter.convertToMatrixBlock(B_data); + MatrixBlock W_mb = DataConverter.convertToMatrixBlock(W_data); + + // 3. Create a binary matrix writer + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); + + // 4. Write the inputs to binary SequenceFiles incl. metadata + writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME_A), rows, 1, 1000, A_mb.getNonZeros()); + writer.writeMatrixToHDFS(B_mb, input(INPUT_NAME_B), rows, 1, 1000, B_mb.getNonZeros()); + writer.writeMatrixToHDFS(W_mb, input(INPUT_NAME_W), rows, 1, 1000, W_mb.getNonZeros()); + HDFSTool.writeMetaDataFile(input(INPUT_NAME_A + ".mtd"), Types.ValueType.FP64, + new MatrixCharacteristics(rows, 1, 1000, A_mb.getNonZeros()), Types.FileFormat.BINARY); + HDFSTool.writeMetaDataFile(input(INPUT_NAME_B + ".mtd"), Types.ValueType.FP64, + new MatrixCharacteristics(rows, 1, 1000, B_mb.getNonZeros()), Types.FileFormat.BINARY); + HDFSTool.writeMetaDataFile(input(INPUT_NAME_W + ".mtd"), Types.ValueType.FP64, + new MatrixCharacteristics(rows, 1, 1000, W_mb.getNonZeros()), Types.FileFormat.BINARY); + + runTest(true, false, null, -1); + + //check Covariance OOC + Assert.assertTrue("OOC wasn't used for Covariance", + heavyHittersContainsString(Instruction.OOC_INST_PREFIX + Opcodes.COV)); + + //compare results + + // rerun without ooc flag + programArgs = new String[] {"-explain", "-stats", "-args", input(INPUT_NAME_A), input(INPUT_NAME_B), + input(INPUT_NAME_W), output(OUTPUT_NAME + "_target")}; + runTest(true, false, null, -1); + + // compare matrices + HashMap ret1 = readDMLMatrixFromOutputDir(OUTPUT_NAME); + HashMap ret2 = readDMLMatrixFromOutputDir(OUTPUT_NAME + "_target"); + TestUtils.compareMatrices(ret1, ret2, eps, "Ret-1", "Ret-2"); + } + catch(IOException e) { + throw new RuntimeException(e); + } + finally { + resetExecMode(platformOld); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java index ed61038a716..219b4fb66ae 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java @@ -21,7 +21,7 @@ import org.apache.sysds.common.Opcodes; import org.apache.sysds.common.Types; -import org.apache.sysds.lops.MMTSJ; +import org.apache.sysds.lops.MMTSJ.MMTSJType; import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.io.MatrixWriter; import org.apache.sysds.runtime.io.MatrixWriterFactory; @@ -38,74 +38,83 @@ import java.io.IOException; public class TransposeSelfMMTest extends AutomatedTestBase { - private final static String TEST_NAME1 = "TSMM"; + private static final String TEST_NAME_LEFT = "TSMM"; + private static final String TEST_NAME_RIGHT = "TSMMRight"; private final static String TEST_DIR = "functions/ooc/"; private final static String TEST_CLASS_DIR = TEST_DIR + TransposeSelfMMTest.class.getSimpleName() + "/"; private final static double eps = 1e-8; private static final String INPUT_NAME = "X"; - private static final String OUTPUT_NAME = "res"; + private static final String OUTPUT_NAME_CP = "res_cp"; + private static final String OUTPUT_NAME_OOC = "res_ooc"; - private final static int rows = 2143; - private final static int cols = 123; + private static final int SINGLE_TILE_ROWS = 2143; + private static final int SINGLE_TILE_COLS = 123; + private static final int SINGLE_TILE_BLOCK_SIZE = 1000; + private static final int MULTI_TILE_ROWS = 1003; + private static final int MULTI_TILE_COLS = 1007; + private static final int MULTI_TILE_BLOCK_SIZE = 1000; private final static double sparsity1 = 0.7; private final static double sparsity2 = 0.1; - private final int k = 1; @Override public void setUp() { TestUtils.clearAssertionInformation(); - TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1); - addTestConfiguration(TEST_NAME1, config); + addTestConfiguration(TEST_NAME_LEFT, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME_LEFT)); + addTestConfiguration(TEST_NAME_RIGHT, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME_RIGHT)); } @Test - public void testTsmmDense() { - runTSMMTest(cols, false); + public void testTsmmLeftDenseSingleTile() { + runTSMMTest(MMTSJType.LEFT, SINGLE_TILE_ROWS, SINGLE_TILE_COLS, SINGLE_TILE_BLOCK_SIZE, false); } - + + @Test + public void testTsmmLeftSparseSingleTile() { + runTSMMTest(MMTSJType.LEFT, SINGLE_TILE_ROWS, SINGLE_TILE_COLS, SINGLE_TILE_BLOCK_SIZE, true); + } + + @Test + public void testTsmmLeftDenseMultiTile() { + runTSMMTest(MMTSJType.LEFT, MULTI_TILE_ROWS, MULTI_TILE_COLS, MULTI_TILE_BLOCK_SIZE, false); + } + @Test - public void testTsmmSparse() { - runTSMMTest(cols, false); + public void testTsmmRightDenseMultiTile() { + runTSMMTest(MMTSJType.RIGHT, MULTI_TILE_ROWS, MULTI_TILE_COLS, MULTI_TILE_BLOCK_SIZE, false); } - private void runTSMMTest(int cols, boolean sparse ) - { + private void runTSMMTest(MMTSJType type, int rows, int cols, int blockSize, boolean sparse) { Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); - try - { - getAndLoadTestConfiguration(TEST_NAME1); + try { + String testName = type.isLeft() ? TEST_NAME_LEFT : TEST_NAME_RIGHT; + getAndLoadTestConfiguration(testName); String HOME = SCRIPT_DIR + TEST_DIR; - fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; - programArgs = new String[]{"-explain", "-stats", "-ooc", - "-args", input(INPUT_NAME), output(OUTPUT_NAME)}; + fullDMLScriptName = HOME + testName + ".dml"; - // 1. Generate the data in-memory as MatrixBlock objects double[][] A_data = getRandomMatrix(rows, cols, 0, 1, sparse?sparsity2:sparsity1, 10); - - // 2. Convert the double arrays to MatrixBlock objects MatrixBlock A_mb = DataConverter.convertToMatrixBlock(A_data); - - // 3. Create a binary matrix writer MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); - - // 4. Write matrix A to a binary SequenceFile - writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows, cols, 1000, A_mb.getNonZeros()); + writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows, cols, blockSize, A_mb.getNonZeros()); HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), Types.ValueType.FP64, - new MatrixCharacteristics(rows, cols, 1000, A_mb.getNonZeros()), Types.FileFormat.BINARY); + new MatrixCharacteristics(rows, cols, blockSize, A_mb.getNonZeros()), Types.FileFormat.BINARY); + programArgs = new String[] {"-stats", "-args", input(INPUT_NAME), output(OUTPUT_NAME_CP)}; + runTest(true, false, null, -1); + + programArgs = new String[] {"-explain", "-stats", "-ooc", + "-args", input(INPUT_NAME), output(OUTPUT_NAME_OOC)}; runTest(true, false, null, -1); - //check tsmm OOC Assert.assertTrue("OOC wasn't used for TSMM", heavyHittersContainsString(Instruction.OOC_INST_PREFIX + Opcodes.TSMM)); - - //compare results - MatrixBlock ret1 = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME), - Types.FileFormat.BINARY, cols, cols, 1000, cols*cols); - MatrixBlock ret2 = new MatrixBlock(rows, rows, false); - A_mb.transposeSelfMatrixMultOperations(ret2, MMTSJ.MMTSJType.LEFT, k); - TestUtils.compareMatrices(ret1, ret2, eps); + + int outputDim = type.isLeft() ? cols : rows; + MatrixBlock actual = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME_OOC), + Types.FileFormat.BINARY, outputDim, outputDim, blockSize); + MatrixBlock expected = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME_CP), + Types.FileFormat.BINARY, outputDim, outputDim, blockSize); + TestUtils.compareMatrices(actual, expected, eps); } catch (IOException e) { throw new RuntimeException(e); diff --git a/src/test/scripts/functions/ooc/Covariance.dml b/src/test/scripts/functions/ooc/Covariance.dml new file mode 100644 index 00000000000..da051a86245 --- /dev/null +++ b/src/test/scripts/functions/ooc/Covariance.dml @@ -0,0 +1,29 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +A = read($1); +B = read($2); + +s = cov(A, B); +res = as.matrix(s); + +write(res, $3, format="binary"); + diff --git a/src/test/scripts/functions/ooc/CovarianceWeights.dml b/src/test/scripts/functions/ooc/CovarianceWeights.dml new file mode 100644 index 00000000000..8d24a3f17a2 --- /dev/null +++ b/src/test/scripts/functions/ooc/CovarianceWeights.dml @@ -0,0 +1,27 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- +A = read($1); +B = read($2); +W = read($3); +s = cov(A, B, W); +m = as.matrix(s); + +write(m, $4, format="text"); diff --git a/src/test/scripts/functions/ooc/TSMMRight.dml b/src/test/scripts/functions/ooc/TSMMRight.dml new file mode 100644 index 00000000000..37fd5d46b9d --- /dev/null +++ b/src/test/scripts/functions/ooc/TSMMRight.dml @@ -0,0 +1,28 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Read input matrix from command line args +X = read($1); + +# Operation under test +res = X %*% t(X); + +write(res, $2, format="binary")