diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallGrpcTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallGrpcTaskBuilder.java new file mode 100644 index 000000000..6d3b377f0 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallGrpcTaskBuilder.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import io.serverlessworkflow.api.types.CallGRPC; +import io.serverlessworkflow.api.types.GRPCArguments; +import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; +import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; +import io.serverlessworkflow.fluent.spec.TaskBaseBuilder; +import io.serverlessworkflow.fluent.spec.spi.CallGrpcTaskFluent; + +public class FuncCallGrpcTaskBuilder extends TaskBaseBuilder + implements CallGrpcTaskFluent, + FuncTaskTransformations, + ConditionalTaskBuilder { + + FuncCallGrpcTaskBuilder() { + final CallGRPC callGRPC = new CallGRPC(); + callGRPC.setWith(new GRPCArguments()); + super.setTask(callGRPC); + } + + @Override + public FuncCallGrpcTaskBuilder self() { + return this; + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java index 176bc5690..8de4ff865 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java @@ -104,6 +104,12 @@ public FuncDoTaskBuilder openapi( return this; } + @Override + public FuncDoTaskBuilder grpc(String name, Consumer itemsConfigurer) { + this.listBuilder().grpc(name, itemsConfigurer); + return this; + } + @Override public FuncDoTaskBuilder workflow(String name, Consumer itemsConfigurer) { this.listBuilder().workflow(name, itemsConfigurer); diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java index 257a1f96b..07b32c5f8 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.fluent.func; +import io.serverlessworkflow.api.types.CallGRPC; import io.serverlessworkflow.api.types.CallHTTP; import io.serverlessworkflow.api.types.CallOpenAPI; import io.serverlessworkflow.api.types.CallTask; @@ -168,6 +169,23 @@ public FuncTaskItemListBuilder openapi( return this.addTaskItem(new TaskItem(name, task)); } + @Override + public FuncTaskItemListBuilder grpc( + String name, Consumer itemsConfigurer) { + name = this.defaultNameAndRequireConfig(name, itemsConfigurer, TYPE_GRPC); + + final FuncCallGrpcTaskBuilder grpcTaskBuilder = new FuncCallGrpcTaskBuilder(); + itemsConfigurer.accept(grpcTaskBuilder); + + final CallGRPC callGRPC = grpcTaskBuilder.build(); + final CallTask callTask = new CallTask(); + callTask.setCallGRPC(callGRPC); + final Task task = new Task(); + task.setCallTask(callTask); + + return this.addTaskItem(new TaskItem(name, task)); + } + @Override public FuncTaskItemListBuilder workflow( String name, Consumer itemsConfigurer) { diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncCallGrpcConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncCallGrpcConfigurer.java new file mode 100644 index 000000000..a3e552f50 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncCallGrpcConfigurer.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.configurers; + +import io.serverlessworkflow.fluent.func.FuncCallGrpcTaskBuilder; +import java.util.function.Consumer; + +@FunctionalInterface +public interface FuncCallGrpcConfigurer extends Consumer {} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallGrpcStep.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallGrpcStep.java new file mode 100644 index 000000000..daa93ad6f --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallGrpcStep.java @@ -0,0 +1,93 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.serverlessworkflow.fluent.func.FuncCallGrpcTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; +import io.serverlessworkflow.fluent.spec.configurers.AuthenticationConfigurer; +import io.serverlessworkflow.fluent.spec.spi.CallGrpcTaskFluent; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public class FuncCallGrpcStep extends Step { + + private final List>> steps = new ArrayList<>(); + + private String name; + + public FuncCallGrpcStep(String name) { + this.name = name; + } + + public FuncCallGrpcStep() {} + + public void setName(String name) { + this.name = name; + } + + public FuncCallGrpcStep proto(String uri) { + steps.add(b -> b.proto(uri)); + return this; + } + + public FuncCallGrpcStep proto(String uri, AuthenticationConfigurer authenticationConfigurer) { + steps.add(b -> b.proto(uri, authenticationConfigurer)); + return this; + } + + public FuncCallGrpcStep service(String name, String host) { + steps.add(b -> b.service(name, host)); + return this; + } + + public FuncCallGrpcStep service(String name, String host, int port) { + steps.add(b -> b.service(name, host, port)); + return this; + } + + public FuncCallGrpcStep method(String method) { + steps.add(b -> b.method(method)); + return this; + } + + public FuncCallGrpcStep argument(String name, Object value) { + steps.add(b -> b.argument(name, value)); + return this; + } + + public FuncCallGrpcStep arguments(java.util.Map arguments) { + steps.add(b -> b.arguments(arguments)); + return this; + } + + public FuncCallGrpcStep authentication(AuthenticationConfigurer authenticationConfigurer) { + steps.add(b -> b.authentication(authenticationConfigurer)); + return this; + } + + @Override + protected void configure(FuncTaskItemListBuilder list, Consumer post) { + list.grpc( + name, + builder -> { + for (Consumer> c : steps) { + c.accept(builder); + } + post.accept(builder); + }); + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java index bcf3f20fb..3d592f15e 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java @@ -33,6 +33,7 @@ import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; import io.serverlessworkflow.fluent.func.FuncTryTaskBuilder; +import io.serverlessworkflow.fluent.func.configurers.FuncCallGrpcConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncCallHttpConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncCallOpenAPIConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer; @@ -1947,7 +1948,78 @@ public static FuncTaskConfigurer call(String name, FuncCallOpenAPIConfigurer con } /** - * Create a new OpenAPI specification to be used with {@link #call(FuncCallOpenAPIStep)}. + * gRPC call using a fluent {@link FuncCallGrpcStep}. + * + *

This overload creates an unnamed gRPC call task. + * + *

{@code
+   * tasks(FuncDSL.call(FuncDSL.grpc()
+   *     .proto("proto/greeter.proto")
+   *     .service("Greeter", "localhost")
+   *     .method("SayHello")
+   *     .argument("name", "World")));
+   * }
+ * + * @param spec fluent gRPC spec built via {@link #grpc()} + * @return a {@link FuncTaskConfigurer} that adds a gRPC call task + */ + public static FuncTaskConfigurer call(FuncCallGrpcStep spec) { + return call(null, spec); + } + + /** + * gRPC call using a fluent {@link FuncCallGrpcStep} with an explicit task name. + * + *
{@code
+   * tasks(
+   * FuncDSL.call(
+   * "greet",
+   * FuncDSL.grpc()
+   *     .proto("proto/greeter.proto")
+   *     .service("Greeter", "localhost")
+   *     .method("SayHello")
+   *     .argument("name", "World"))
+   * );
+   * }
+ * + * @param name task name, or {@code null} for an anonymous task + * @param spec fluent gRPC spec built via {@link #grpc()} + * @return a {@link FuncTaskConfigurer} that adds a named gRPC call task + */ + public static FuncTaskConfigurer call(String name, FuncCallGrpcStep spec) { + Objects.requireNonNull(spec, "spec"); + spec.setName(name); + return spec; + } + + /** + * Low-level gRPC call entrypoint using a {@link FuncCallGrpcConfigurer}. + * + *

This overload creates an unnamed gRPC call task. + * + * @param configurer configurer that mutates the underlying gRPC call builder + * @return a {@link FuncTaskConfigurer} that adds a gRPC call task + */ + public static FuncTaskConfigurer call(FuncCallGrpcConfigurer configurer) { + return call(null, configurer); + } + + /** + * Low-level gRPC call entrypoint using a {@link FuncCallGrpcConfigurer}. + * + *

This overload allows assigning an explicit task name. + * + * @param name task name, or {@code null} for an anonymous task + * @param configurer configurer that mutates the underlying gRPC call builder + * @return a {@link FuncTaskConfigurer} that adds a gRPC call task + */ + public static FuncTaskConfigurer call(String name, FuncCallGrpcConfigurer configurer) { + Objects.requireNonNull(configurer, "configurer"); + return list -> list.grpc(name, configurer); + } + + /** + * Create a new gRPC specification to be used with {@link #call(FuncCallGrpcStep)}. * *

Typical usage: * @@ -2001,6 +2073,37 @@ public static FuncCallHttpStep http(String name) { return new FuncCallHttpStep(name); } + /** + * Create a new, empty gRPC specification to be used with {@link #call(FuncCallGrpcStep)}. + * + *

Typical usage: + * + *

{@code
+   * FuncDSL.call(
+   *     grpc()
+   *         .proto("proto/greeter.proto")
+   *         .service("Greeter", "localhost")
+   *         .method("SayHello")
+   *         .argument("name", "World"))
+   * );
+   * }
+ * + * @return a new {@link FuncCallGrpcStep} + */ + public static FuncCallGrpcStep grpc() { + return new FuncCallGrpcStep(); + } + + /** + * Named variant of {@link #grpc()}. + * + * @param name task name to be used when the spec is attached via {@link #call(FuncCallGrpcStep)} + * @return a new named {@link FuncCallGrpcStep} + */ + public static FuncCallGrpcStep grpc(String name) { + return new FuncCallGrpcStep(name); + } + /** * Create a new HTTP specification preconfigured with an endpoint expression and authentication. * diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/CallGrpcFluent.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/CallGrpcFluent.java new file mode 100644 index 000000000..977d0d834 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/CallGrpcFluent.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.spi; + +import io.serverlessworkflow.fluent.spec.TaskBaseBuilder; +import java.util.function.Consumer; + +public interface CallGrpcFluent, LIST> { + + LIST grpc(String name, Consumer itemsConfigurer); + + default LIST grpc(Consumer itemsConfigurer) { + return this.grpc(null, itemsConfigurer); + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java index 3fd13eb5e..3efcae1c1 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.fluent.func.spi; +import io.serverlessworkflow.fluent.func.FuncCallGrpcTaskBuilder; import io.serverlessworkflow.fluent.func.FuncCallHttpTaskBuilder; import io.serverlessworkflow.fluent.func.FuncCallOpenAPITaskBuilder; import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; @@ -52,6 +53,7 @@ public interface FuncDoFluent> CallFnFluent, CallHttpFluent, CallOpenAPIFluent, + CallGrpcFluent, WorkflowFluent { default SELF subflow(String name, Consumer itemsConfigurer) { diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java index c7ea88670..5f1b3348e 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java @@ -18,6 +18,7 @@ import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.fluent.spec.configurers.CallGrpcConfigurer; import io.serverlessworkflow.fluent.spec.spi.DoFluent; import java.util.List; import java.util.function.Consumer; @@ -168,6 +169,20 @@ public TaskItemListBuilder grpc(String name, Consumer items return addTaskItem(new TaskItem(name, task)); } + public TaskItemListBuilder grpc(String name, CallGrpcConfigurer configurer) { + name = defaultNameAndRequireConfig(name, configurer, TYPE_GRPC); + + final CallGrpcTaskBuilder callGRPCBuilder = new CallGrpcTaskBuilder(); + configurer.accept(callGRPCBuilder); + + final CallTask callTask = new CallTask(); + callTask.setCallGRPC(callGRPCBuilder.build()); + final Task task = new Task(); + task.setCallTask(callTask); + + return addTaskItem(new TaskItem(name, task)); + } + @Override public TaskItemListBuilder workflow(String name, Consumer itemsConfigurer) { name = defaultNameAndRequireConfig(name, itemsConfigurer, TYPE_WORKFLOW); diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 49ad1d893..4dfd17429 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -58,6 +58,12 @@ io.serverlessworkflow serverlessworkflow-impl-grpc + + io.serverlessworkflow + serverlessworkflow-experimental-fluent-func + ${project.version} + test + org.glassfish.jersey.core jersey-client diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java index bbde7cf0d..a9c20b96b 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java @@ -15,32 +15,42 @@ */ package io.serverlessworkflow.impl.test.grpc; +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.test.grpc.handlers.ContributorBiDiStreamingHandler; import io.serverlessworkflow.impl.test.junit.DisabledIfProtocUnavailable; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @DisabledIfProtocUnavailable public class GrpcBiDirectionalStreamingTest { private static final int PORT_FOR_EXAMPLES = 5011; - private static WorkflowApplication app; - private static Server server; + private WorkflowApplication app; + private Server server; - @BeforeAll - static void setUpApp() throws IOException { + @BeforeEach + void setUp() throws IOException { server = ServerBuilder.forPort(PORT_FOR_EXAMPLES) .addService(new ContributorBiDiStreamingHandler()) @@ -51,8 +61,10 @@ static void setUpApp() throws IOException { } @AfterEach - void cleanup() throws InterruptedException { - server.shutdown().awaitTermination(); + void tearDown() throws InterruptedException { + server.shutdownNow(); + server.awaitTermination(); + app.close(); } @Test @@ -77,4 +89,46 @@ void grpcContributors() throws IOException { Assertions.assertThat(collection).hasSize(5); } + + @ParameterizedTest(name = "{0}") + @MethodSource("contributorsBidiStreamSources") + void testContributorsBidiStreamDsl(String sourceName, Workflow workflow) throws IOException { + String protoFilePath = + "file://" + + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto") + .getFile(); + + WorkflowModel model = + app.workflowDefinition(workflow) + .instance(Map.of("protoFilePath", protoFilePath)) + .start() + .join(); + + Collection collection = model.asCollection(); + + assertThat(collection).hasSize(5); + } + + private static Stream contributorsBidiStreamSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath("workflows-samples/grpc/contributors-bidi-stream-call.yaml"), + contributorsBidiStreamWorkflow()) + .map(w -> Arguments.of(WorkflowDefinitionId.of(w).toString(), w)); + } + + private static Workflow contributorsBidiStreamWorkflow() { + return WorkflowBuilder.workflow("grpc-example", "test", "0.1.0") + .tasks( + doTasks( + call( + "greet", + grpc() + .proto("workflows-samples/grpc/proto/contributors.proto") + .service("BiDirectionalStreaming", "localhost", 5011) + .method("CreateContributor") + .argument("github", "dependabot[bot]")))) + .build(); + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java index f22c34d5a..6370f1e33 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java @@ -15,31 +15,41 @@ */ package io.serverlessworkflow.impl.test.grpc; +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.test.grpc.handlers.ContributorClientStreamingHandler; import io.serverlessworkflow.impl.test.junit.DisabledIfProtocUnavailable; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @DisabledIfProtocUnavailable public class GrpcClientStreamingTest { private static final int PORT_FOR_EXAMPLES = 5011; - private static WorkflowApplication app; - private static Server server; + private WorkflowApplication app; + private Server server; - @BeforeAll - static void setUpApp() throws IOException { + @BeforeEach + void setUp() throws IOException { server = ServerBuilder.forPort(PORT_FOR_EXAMPLES) .addService(new ContributorClientStreamingHandler()) @@ -50,8 +60,10 @@ static void setUpApp() throws IOException { } @AfterEach - void cleanup() throws InterruptedException { - server.shutdown().awaitTermination(); + void tearDown() throws InterruptedException { + server.shutdownNow(); + server.awaitTermination(); + app.close(); } @Test @@ -70,4 +82,37 @@ void grpcPerson() throws IOException { Assertions.assertThat(list).isNotEmpty(); } + + @ParameterizedTest(name = "{0}") + @MethodSource("contributorsClientStreamSources") + void testContributorsClientStreamDsl(String sourceName, Workflow workflow) { + List> list = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asCollection().stream() + .map(m -> m.asMap().orElseThrow()) + .toList(); + + assertThat(list).isNotEmpty(); + } + + private static Stream contributorsClientStreamSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-client-stream-call.yaml"), + contributorsClientStreamWorkflow()) + .map(w -> Arguments.of(WorkflowDefinitionId.of(w).toString(), w)); + } + + private static Workflow contributorsClientStreamWorkflow() { + return WorkflowBuilder.workflow("grpc-example", "test", "0.1.0") + .tasks( + doTasks( + call( + "greet", + grpc() + .proto("workflows-samples/grpc/proto/contributors.proto") + .service("ClientStreaming", "localhost", 5011) + .method("CreateContributor") + .argument("github", "dependabot[bot]")))) + .build(); + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java index 5f18ca670..42d724e3a 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java @@ -15,32 +15,42 @@ */ package io.serverlessworkflow.impl.test.grpc; +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.test.grpc.handlers.ContributorServerStreamingHandler; import io.serverlessworkflow.impl.test.junit.DisabledIfProtocUnavailable; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @DisabledIfProtocUnavailable public class GrpcServerStreamingTest { private static final int PORT_FOR_EXAMPLES = 5011; - private static WorkflowApplication app; - private static Server server; + private WorkflowApplication app; + private Server server; - @BeforeAll - static void setUpApp() throws IOException { + @BeforeEach + void setUp() throws IOException { server = ServerBuilder.forPort(PORT_FOR_EXAMPLES) .addService(new ContributorServerStreamingHandler()) @@ -51,8 +61,10 @@ static void setUpApp() throws IOException { } @AfterEach - void cleanup() throws InterruptedException { - server.shutdown().awaitTermination(); + void tearDown() throws InterruptedException { + server.shutdownNow(); + server.awaitTermination(); + app.close(); } @Test @@ -77,4 +89,47 @@ void grpcContributors() throws IOException { Assertions.assertThat(collection).hasSize(5); } + + @ParameterizedTest(name = "{0}") + @MethodSource("contributorsServerStreamSources") + void testContributorsServerStreamDsl(String sourceName, Workflow workflow) throws IOException { + String protoFilePath = + "file://" + + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto") + .getFile(); + + WorkflowModel model = + app.workflowDefinition(workflow) + .instance(Map.of("protoFilePath", protoFilePath)) + .start() + .join(); + + Collection collection = model.asCollection(); + + assertThat(collection).hasSize(5); + } + + private static Stream contributorsServerStreamSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-server-stream-call.yaml"), + contributorsServerStreamWorkflow()) + .map(w -> Arguments.of(WorkflowDefinitionId.of(w).toString(), w)); + } + + private static Workflow contributorsServerStreamWorkflow() { + return WorkflowBuilder.workflow("grpc-example", "test", "0.1.0") + .tasks( + doTasks( + call( + "greet", + grpc() + .proto("workflows-samples/grpc/proto/contributors.proto") + .service("ServerStreaming", "localhost", 5011) + .method("CreateContributor") + .argument("github", "dependabot[bot]")))) + .build(); + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java index 594b481e5..402f2e548 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java @@ -15,30 +15,41 @@ */ package io.serverlessworkflow.impl.test.grpc; +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; +import io.serverlessworkflow.impl.jackson.JsonUtils; import io.serverlessworkflow.impl.test.grpc.handlers.ContributorUnaryArgsExprHandler; import io.serverlessworkflow.impl.test.junit.DisabledIfProtocUnavailable; import java.io.IOException; import java.util.Map; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @DisabledIfProtocUnavailable public class GrpcUnaryArgsExprTest { private static final int PORT_FOR_EXAMPLES = 5011; - private static WorkflowApplication app; - private static Server server; + private WorkflowApplication app; + private Server server; - @BeforeAll - static void setUpApp() throws IOException { + @BeforeEach + void setUp() throws IOException { server = ServerBuilder.forPort(PORT_FOR_EXAMPLES) .addService(new ContributorUnaryArgsExprHandler()) @@ -49,8 +60,10 @@ static void setUpApp() throws IOException { } @AfterEach - void cleanup() throws InterruptedException { - server.shutdown().awaitTermination(); + void tearDown() throws InterruptedException { + server.shutdownNow(); + server.awaitTermination(); + app.close(); } @Test @@ -72,4 +85,40 @@ void grpcPerson() throws IOException { Assertions.assertThat(output).contains(Map.entry("message", "Success with bootable[origin]")); } + + @ParameterizedTest(name = "{0}") + @MethodSource("contributorsUnaryArgsExprSources") + void testContributorsUnaryArgsExprDsl(String sourceName, Workflow workflow) { + Map output = + app.workflowDefinition(workflow) + .instance(Map.of("github", "bootable[origin]")) + .start() + .thenApply( + model -> (Map) JsonUtils.toJavaValue(JsonUtils.modelToJson(model))) + .join(); + + assertThat(output).contains(Map.entry("message", "Success with bootable[origin]")); + } + + private static Stream contributorsUnaryArgsExprSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-unary-args-expr-call.yaml"), + contributorsUnaryArgsExprWorkflow()) + .map(w -> Arguments.of(WorkflowDefinitionId.of(w).toString(), w)); + } + + private static Workflow contributorsUnaryArgsExprWorkflow() { + return WorkflowBuilder.workflow("grpc-example", "test", "0.1.0") + .tasks( + doTasks( + call( + "greet", + grpc() + .proto("workflows-samples/grpc/proto/contributors.proto") + .service("UnaryArgsExpr", "localhost", 5011) + .method("CreateContributor") + .argument("github", "${ .github }")))) + .build(); + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java index bce72a72e..9e691fb38 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java @@ -15,30 +15,41 @@ */ package io.serverlessworkflow.impl.test.grpc; +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; +import io.serverlessworkflow.impl.jackson.JsonUtils; import io.serverlessworkflow.impl.test.grpc.handlers.PersonUnaryHandler; import io.serverlessworkflow.impl.test.junit.DisabledIfProtocUnavailable; import java.io.IOException; import java.util.Map; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @DisabledIfProtocUnavailable public class GrpcUnaryTest { private static final int PORT_FOR_EXAMPLES = 5011; - private static WorkflowApplication app; - private static Server server; + private WorkflowApplication app; + private Server server; - @BeforeAll - static void setUpApp() throws IOException { + @BeforeEach + void setUp() throws IOException { server = ServerBuilder.forPort(PORT_FOR_EXAMPLES).addService(new PersonUnaryHandler()).build(); server.start(); @@ -46,8 +57,10 @@ static void setUpApp() throws IOException { } @AfterEach - void cleanup() throws InterruptedException { - server.shutdown().awaitTermination(); + void tearDown() throws InterruptedException { + server.shutdownNow(); + server.awaitTermination(); + app.close(); } @Test @@ -74,4 +87,45 @@ void grpcPerson() throws IOException { Assertions.assertThat(output).contains(Map.entry("name", "John Doe"), Map.entry("id", 891182)); } + + @ParameterizedTest(name = "{0}") + @MethodSource("getPersonCallSources") + void testGetPersonCallDsl(String sourceName, Workflow workflow) throws IOException { + String protoFilePath = + "file://" + + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/person.proto") + .getFile(); + + Map output = + app.workflowDefinition(workflow) + .instance(Map.of("protoFilePath", protoFilePath)) + .start() + .thenApply( + model -> (Map) JsonUtils.toJavaValue(JsonUtils.modelToJson(model))) + .join(); + + assertThat(output).contains(Map.entry("name", "John Doe"), Map.entry("id", 891182)); + } + + private static Stream getPersonCallSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath("workflows-samples/grpc/get-person-call.yaml"), + getPersonCallWorkflow()) + .map(w -> Arguments.of(WorkflowDefinitionId.of(w).toString(), w)); + } + + private static Workflow getPersonCallWorkflow() { + return WorkflowBuilder.workflow("grpc-example", "test", "0.1.0") + .tasks( + doTasks( + call( + "greet", + grpc() + .proto("workflows-samples/grpc/proto/person.proto") + .service("Person", "localhost", 5011) + .method("GetPerson")))) + .build(); + } }