[go: nahoru, domu]

Skip to content

Commit

Permalink
support of ribbon added
Browse files Browse the repository at this point in the history
  • Loading branch information
skarpenko committed Mar 5, 2018
1 parent bc390a7 commit cf7f11c
Show file tree
Hide file tree
Showing 33 changed files with 532 additions and 207 deletions.
53 changes: 53 additions & 0 deletions feign-reactive-cloud/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-reactive</artifactId>
<version>0.2.0-SNAPSHOT</version>
</parent>

<artifactId>feign-reactive-cloud</artifactId>

<properties>
<ribbon-version>2.1.1</ribbon-version>
</properties>

<dependencies>

<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-reactive-core</artifactId>
</dependency>

<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-core</artifactId>
<version>${ribbon-version}</version>
</dependency>

<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-loadbalancer</artifactId>
<version>${ribbon-version}</version>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
<optional>true</optional>

</dependency>

</dependencies>

</project>
43 changes: 43 additions & 0 deletions feign-reactive-cloud/src/main/java/feign/CloudReactiveFeign.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package feign;

import com.netflix.loadbalancer.reactive.LoadBalancerCommand;
import feign.reactive.client.ReactiveClient;
import feign.reactive.client.ReactiveClientFactory;
import feign.reactive.client.RibbonReactiveClient;

/**
* Allows to specify ribbon {@link LoadBalancerCommand}.
*
* @author Sergii Karpenko
*/
public class CloudReactiveFeign extends ReactiveFeign{

private CloudReactiveFeign(ReactiveFeign.ParseHandlersByName targetToHandlersByName, InvocationHandlerFactory factory) {
super(targetToHandlersByName, factory);
}

public static CloudReactiveFeign.Builder builder() {
return new CloudReactiveFeign.Builder();
}

public static class Builder extends ReactiveFeign.Builder {

private LoadBalancerCommand<Object> loadBalancerCommand;

public Builder setLoadBalancerCommand(LoadBalancerCommand<Object> loadBalancerCommand) {
this.loadBalancerCommand = loadBalancerCommand;
return this;
}

@Override
protected ReactiveClientFactory buildReactiveClientFactory() {
ReactiveClientFactory reactiveClientFactory = super.buildReactiveClientFactory();

return methodMetadata -> {
ReactiveClient reactiveClient = reactiveClientFactory.apply(methodMetadata);

return new RibbonReactiveClient(methodMetadata, loadBalancerCommand, reactiveClient);
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package feign.reactive.client;

import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.reactive.LoadBalancerCommand;
import feign.MethodMetadata;
import feign.Request;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.RxReactiveStreams;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.URI;

/**
* @author Sergii Karpenko
*/
public class RibbonReactiveClient implements ReactiveClient{

private LoadBalancerCommand<Object> loadBalancerCommand;
private ReactiveClient reactiveClient;
private final Type returnPublisherType;

public RibbonReactiveClient(MethodMetadata metadata,
LoadBalancerCommand<Object> loadBalancerCommand, ReactiveClient reactiveClient) {
this.loadBalancerCommand = loadBalancerCommand;
this.reactiveClient = reactiveClient;

returnPublisherType = ((ParameterizedType) metadata.returnType()).getRawType();
}

@Override
public Publisher<Object> executeRequest(Request request) {
Publisher<Object> publisher = RxReactiveStreams.toPublisher(
loadBalancerCommand.submit(server -> {

Request lbRequest = loadBalanceRequest(request, server);

return RxReactiveStreams.toObservable(reactiveClient.executeRequest(lbRequest));
}));

return returnPublisherType == Mono.class ? Mono.from(publisher) : Flux.from(publisher);
}

Request loadBalanceRequest(Request request, Server server) {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();

String lbUrl = request.url().replaceFirst(clientName, server.getHostPort());

return Request.create(request.method(), lbUrl, request.headers(), request.body(), request.charset());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package feign.reactive;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.loadbalancer.AbstractLoadBalancer;
import com.netflix.loadbalancer.reactive.LoadBalancerCommand;
import feign.CloudReactiveFeign;
import feign.ReactiveFeign;
import feign.RequestLine;
import feign.jackson.JacksonEncoder;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.net.URL;

import static com.netflix.client.ClientFactory.getNamedLoadBalancer;
import static com.netflix.config.ConfigurationManager.getConfigInstance;
import static org.assertj.core.api.Assertions.assertThat;


public class CloudReactiveHttpClientTest {

@Rule
public final MockWebServer server1 = new MockWebServer();
@Rule
public final MockWebServer server2 = new MockWebServer();

private static String serviceName = "LoadBalancingTargetTest-loadBalancingDefaultPolicyRoundRobin";
private static TestInterface client;

static String hostAndPort(URL url) {
return "localhost:" + url.getPort();
}

@Test
public void loadBalancingDefaultPolicyRoundRobin() throws IOException, InterruptedException {
String body = "success!";
server1.enqueue(new MockResponse().setBody(body));
server2.enqueue(new MockResponse().setBody(body));

String serverListKey = serviceName + ".ribbon.listOfServers";
getConfigInstance().setProperty(serverListKey,
hostAndPort(server1.url("").url()) + "," + hostAndPort(
server2.url("").url()));

ReactiveFeign.Builder builder = CloudReactiveFeign
.builder()
.webClient(WebClient.create())
//encodes body and parameters
.encoder(new JacksonEncoder(new ObjectMapper()));

builder = ((CloudReactiveFeign.Builder)builder).setLoadBalancerCommand(
LoadBalancerCommand.builder()
.withLoadBalancer(AbstractLoadBalancer.class.cast(getNamedLoadBalancer(serviceName)))
.build()
);

TestInterface client = builder.target(TestInterface.class, "http://"+serviceName);

try {

String result1 = client.get().block();
String result2 = client.get().block();

assertThat(result1)
.isEqualTo(result2)
.isEqualTo(body);

assertThat(server1.takeRequest().getPath())
.isEqualTo(server2.takeRequest().getPath())
.isEqualTo("/");
assertThat(server1.getRequestCount())
.isEqualTo(server2.getRequestCount())
.isEqualTo(1);
} finally {
getConfigInstance().clearProperty(serverListKey);
}
}


interface TestInterface {

@RequestLine("GET /")
Mono<String> get();
}
}
18 changes: 18 additions & 0 deletions feign-reactive-core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-reactive</artifactId>
<version>0.2.0-SNAPSHOT</version>
</parent>

<artifactId>feign-reactive-core</artifactId>

<dependencies>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package feign;

import static feign.Util.checkNotNull;
import static feign.Util.isDefault;

import feign.InvocationHandlerFactory.MethodHandler;
import feign.codec.Decoder;
import feign.codec.Encoder;
import feign.codec.ErrorDecoder;
import feign.reactive.*;
import feign.reactive.BuildTemplateByResolvingArgs;
import feign.reactive.ReactiveDelegatingContract;
import feign.reactive.ReactiveOptions;
import feign.reactive.client.ReactiveClientFactory;
import feign.reactive.client.WebReactiveClient;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
Expand All @@ -18,13 +19,12 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;

import static feign.Util.checkNotNull;
import static feign.Util.isDefault;

/**
* Allows Feign interfaces to return reactive {@link Mono} or {@link Flux}.
*
Expand All @@ -34,7 +34,7 @@ public class ReactiveFeign extends Feign {
private final ParseHandlersByName targetToHandlersByName;
private final InvocationHandlerFactory factory;

private ReactiveFeign(
protected ReactiveFeign(
final ParseHandlersByName targetToHandlersByName,
final InvocationHandlerFactory factory) {
this.targetToHandlersByName = targetToHandlersByName;
Expand Down Expand Up @@ -80,7 +80,7 @@ public <T> T newInstance(Target<T> target) {
/**
* ReactiveFeign builder.
*/
public static final class Builder extends Feign.Builder {
public static class Builder extends Feign.Builder {
private final List<RequestInterceptor> requestInterceptors =
new ArrayList<>();
private Contract contract =
Expand All @@ -92,6 +92,8 @@ public static final class Builder extends Feign.Builder {
new ReactiveInvocationHandler.Factory();
private boolean decode404;

private feign.reactive.Logger logger = new feign.reactive.Logger();

/** Unsupported operation. */
@Override
public Builder client(final Client client) {
Expand Down Expand Up @@ -300,30 +302,33 @@ public ReactiveFeign build() {
checkNotNull(this.webClient,
"WebClient instance wasn't provided in ReactiveFeign builder");

final ReactiveMethodHandler.Factory methodHandlerFactory =
new ReactiveMethodHandler.Factory(webClient,
requestInterceptors, new feign.reactive.Logger(), decode404);


final ParseHandlersByName handlersByName = new ParseHandlersByName(
contract, encoder, errorDecoder,
methodHandlerFactory);
contract, encoder,
new ReactiveMethodHandler.Factory(
buildReactiveClientFactory(),
requestInterceptors));
return new ReactiveFeign(handlersByName, invocationHandlerFactory);
}

protected ReactiveClientFactory buildReactiveClientFactory() {
return metadata -> new WebReactiveClient(metadata, webClient, errorDecoder, decode404, logger);
}

}

private static final class ParseHandlersByName {
static final class ParseHandlersByName {
private final Contract contract;
private final Encoder encoder;
private final ErrorDecoder errorDecoder;
private final ReactiveMethodHandler.Factory factory;

ParseHandlersByName(
final Contract contract,
final Encoder encoder,
final ErrorDecoder errorDecoder,
final ReactiveMethodHandler.Factory factory) {
this.contract = contract;
this.factory = factory;
this.errorDecoder = errorDecoder;
this.encoder = checkNotNull(encoder, "encoder must not be null");
}

Expand All @@ -346,8 +351,8 @@ Map<String, MethodHandler> apply(final Target key) {
buildTemplate = new BuildTemplateByResolvingArgs(md);
}

result.put(md.configKey(), factory.create(
key, md, buildTemplate, errorDecoder));
ReactiveMethodHandler methodHandler = factory.create(buildTemplate, key, md);
result.put(md.configKey(), methodHandler);
}

return result;
Expand Down
Loading

0 comments on commit cf7f11c

Please sign in to comment.