[go: nahoru, domu]

Skip to content

Commit

Permalink
support of hystrix added (in progress)
Browse files Browse the repository at this point in the history
  • Loading branch information
skarpenko committed Mar 6, 2018
1 parent 39c6e45 commit cae258c
Show file tree
Hide file tree
Showing 15 changed files with 568 additions and 345 deletions.
19 changes: 18 additions & 1 deletion feign-reactive-cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
<artifactId>feign-reactive-cloud</artifactId>

<properties>
<ribbon-version>2.1.1</ribbon-version>
</properties>

<dependencies>
Expand All @@ -23,6 +22,18 @@
<artifactId>feign-reactive-core</artifactId>
</dependency>

<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>${hystrix.version}</version>
</dependency>

<dependency>
<groupId>com.netflix.archaius</groupId>
<artifactId>archaius-core</artifactId>
<version>${archaius.version}</version>
</dependency>

<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-core</artifactId>
Expand All @@ -45,7 +56,13 @@
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
Expand Down
82 changes: 44 additions & 38 deletions feign-reactive-cloud/src/main/java/feign/CloudReactiveFeign.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package feign;

import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.loadbalancer.reactive.LoadBalancerCommand;
import feign.codec.Encoder;
import feign.codec.ErrorDecoder;
import feign.reactive.ReactiveMethodHandlerFactory;
import feign.reactive.client.ReactiveClient;
import feign.reactive.client.ReactiveClientFactory;
import feign.reactive.client.RibbonReactiveClient;
import org.springframework.web.reactive.function.client.WebClient;

import java.util.function.Function;

/**
* Allows to specify ribbon {@link LoadBalancerCommand}.
* Allows to specify ribbon {@link LoadBalancerCommand}
* and HystrixObservableCommand.Setter.
*
* @author Sergii Karpenko
*/
Expand All @@ -19,19 +24,47 @@ private CloudReactiveFeign(ReactiveFeign.ParseHandlersByName targetToHandlersByN
super(targetToHandlersByName, factory);
}

public static CloudReactiveFeign.Builder builder() {
return new CloudReactiveFeign.Builder();
public static <T> Builder<T> builder() {
return new Builder<>();
}

public static class Builder extends ReactiveFeign.Builder {
public static class Builder<T> extends ReactiveFeign.Builder<T> {

private HystrixObservableCommand.Setter hystrixObservableCommandSetter;
private Function<Throwable, ? extends T> fallbackFactory;
private LoadBalancerCommand<Object> loadBalancerCommand;

public Builder setLoadBalancerCommand(LoadBalancerCommand<Object> loadBalancerCommand) {
public Builder<T> setHystrixObservableCommandSetter(HystrixObservableCommand.Setter hystrixObservableCommandSetter) {
this.hystrixObservableCommandSetter = hystrixObservableCommandSetter;
return this;
}

public Builder<T> setFallback(T fallback) {
setFallbackFactory(throwable -> fallback);
return this;
}

public Builder<T> setFallbackFactory(Function<Throwable, ? extends T> fallbackFactory) {
this.fallbackFactory = fallbackFactory;
return this;
}

public Builder<T> setLoadBalancerCommand(LoadBalancerCommand<Object> loadBalancerCommand) {
this.loadBalancerCommand = loadBalancerCommand;
return this;
}

@Override
protected ReactiveMethodHandlerFactory buildReactiveMethodHandlerFactory() {
ReactiveMethodHandlerFactory reactiveMethodHandlerFactory = super.buildReactiveMethodHandlerFactory();
return hystrixObservableCommandSetter != null
? new HystrixMethodHandler.Factory(
reactiveMethodHandlerFactory,
(Function<Throwable, Object>) fallbackFactory,
hystrixObservableCommandSetter)
: reactiveMethodHandlerFactory;
}

@Override
protected ReactiveClientFactory buildReactiveClientFactory() {
ReactiveClientFactory reactiveClientFactory = super.buildReactiveClientFactory();
Expand All @@ -44,67 +77,40 @@ protected ReactiveClientFactory buildReactiveClientFactory() {
}

@Override
public Builder webClient(final WebClient webClient){
public Builder<T> webClient(final WebClient webClient){
super.webClient(webClient);
return this;
}

@Override
public Builder contract(final Contract contract) {
public Builder<T> contract(final Contract contract) {
super.contract(contract);
return this;
}

@Override
public Builder encoder(final Encoder encoder) {
public Builder<T> encoder(final Encoder encoder) {
super.encoder(encoder);
return this;
}

@Override
public Builder decode404() {
public Builder<T> decode404() {
super.decode404();
return this;
}

@Override
public Builder errorDecoder(final ErrorDecoder errorDecoder) {
public Builder<T> errorDecoder(final ErrorDecoder errorDecoder) {
super.errorDecoder(errorDecoder);
return this;
}

@Override
public Builder options(final Request.Options options) {
public Builder<T> options(final Request.Options options) {
super.options(options);
return this;
}


/**
* Adds a single request interceptor to the builder.
*
* @param requestInterceptor request interceptor to add
*
* @return this builder
*/
public Builder requestInterceptor(
final RequestInterceptor requestInterceptor) {
super.requestInterceptor(requestInterceptor);
return this;
}

/**
* Sets the full set of request interceptors for the builder, overwriting
* any previous interceptors.
*
* @param requestInterceptors set of request interceptors
*
* @return this builder
*/
public Builder requestInterceptors(
final Iterable<RequestInterceptor> requestInterceptors) {
super.requestInterceptors(requestInterceptors);
return this;
}
}
}
97 changes: 97 additions & 0 deletions feign-reactive-cloud/src/main/java/feign/HystrixMethodHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package feign;

import com.netflix.hystrix.HystrixObservableCommand;
import feign.reactive.ReactiveMethodHandlerFactory;
import org.reactivestreams.Publisher;
import org.springframework.lang.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.function.Function;

import static feign.Util.checkNotNull;

/**
* @author Sergii Karpenko
*/
public class HystrixMethodHandler implements ReactiveMethodHandler {

private final MethodMetadata methodMetadata;
private final Type returnPublisherType;
private final ReactiveMethodHandler methodHandler;
private final Function<Throwable, Object> fallbackFactory;
private HystrixObservableCommand.Setter hystrixObservableCommandSetter;

private HystrixMethodHandler(
MethodMetadata methodMetadata,
ReactiveMethodHandler methodHandler,
@Nullable
Function<Throwable, Object> fallbackFactory,
HystrixObservableCommand.Setter hystrixObservableCommandSetter) {
this.methodMetadata = checkNotNull(methodMetadata, "methodMetadata must be not null");
returnPublisherType = ((ParameterizedType) methodMetadata.returnType()).getRawType();
this.methodHandler = checkNotNull(methodHandler, "methodHandler must be not null");
this.fallbackFactory = fallbackFactory;
this.hystrixObservableCommandSetter = checkNotNull(hystrixObservableCommandSetter, "hystrixObservableCommandSetter must be not null");
}

@Override
@SuppressWarnings("unchecked")
public Publisher invoke(final Object[] argv) {

Observable<Object> observable = new HystrixObservableCommand<Object>(hystrixObservableCommandSetter){
@Override
protected Observable<Object> construct() {
return RxReactiveStreams.toObservable((Publisher)methodHandler.invoke(argv));
}

@Override
protected Observable<Object> resumeWithFallback() {
if(fallbackFactory != null){
Object fallback = fallbackFactory.apply(getExecutionException());
Object fallbackValue = getFallbackValue(fallback, argv);
return Observable.just(fallbackValue);
} else {
return super.resumeWithFallback();
}
}
}.toObservable();

Publisher<Object> publisher = RxReactiveStreams.toPublisher(observable);

return returnPublisherType == Mono.class ? Mono.from(publisher) : Flux.from(publisher);
}

protected Object getFallbackValue(Object fallback, final Object[] argv){
//TODO implement
return null;
}

static class Factory implements ReactiveMethodHandlerFactory {
private final ReactiveMethodHandlerFactory methodHandlerFactory;
private final Function<Throwable, Object> fallbackFactory;
private final HystrixObservableCommand.Setter hystrixObservableCommandSetter;

Factory(ReactiveMethodHandlerFactory methodHandlerFactory,
@Nullable
Function<Throwable, Object> fallbackFactory,
HystrixObservableCommand.Setter hystrixObservableCommandSetter) {
this.methodHandlerFactory = checkNotNull(methodHandlerFactory, "methodHandlerFactory must not be null");
this.fallbackFactory = fallbackFactory;
this.hystrixObservableCommandSetter = checkNotNull(hystrixObservableCommandSetter, "hystrixObservableCommandSetter must not be null");
}

@Override
public ReactiveMethodHandler create(final Target target, final MethodMetadata metadata) {
return new HystrixMethodHandler(
metadata,
methodHandlerFactory.create(target, metadata),
fallbackFactory,
hystrixObservableCommandSetter);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package feign.reactive.client;

import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.reactive.LoadBalancerCommand;
import feign.MethodMetadata;
import feign.Request;
import org.reactivestreams.Publisher;
import org.springframework.lang.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;

import java.lang.reflect.ParameterizedType;
Expand All @@ -23,7 +26,9 @@ public class RibbonReactiveClient implements ReactiveClient{
private final Type returnPublisherType;

public RibbonReactiveClient(MethodMetadata metadata,
LoadBalancerCommand<Object> loadBalancerCommand, ReactiveClient reactiveClient) {
@Nullable
LoadBalancerCommand<Object> loadBalancerCommand,
ReactiveClient reactiveClient) {
this.loadBalancerCommand = loadBalancerCommand;
this.reactiveClient = reactiveClient;

Expand All @@ -32,15 +37,21 @@ public RibbonReactiveClient(MethodMetadata metadata,

@Override
public Publisher<Object> executeRequest(Request request) {
Publisher<Object> publisher = RxReactiveStreams.toPublisher(
loadBalancerCommand.submit(server -> {

Request lbRequest = loadBalanceRequest(request, server);
if(loadBalancerCommand != null){
Observable<Object> observable = loadBalancerCommand.submit(server -> {

return RxReactiveStreams.toObservable(reactiveClient.executeRequest(lbRequest));
}));
Request lbRequest = loadBalanceRequest(request, server);

return returnPublisherType == Mono.class ? Mono.from(publisher) : Flux.from(publisher);
return RxReactiveStreams.toObservable(reactiveClient.executeRequest(lbRequest));
});

Publisher<Object> publisher = RxReactiveStreams.toPublisher(observable);

return returnPublisherType == Mono.class ? Mono.from(publisher) : Flux.from(publisher);
} else {
return reactiveClient.executeRequest(request);
}
}

Request loadBalanceRequest(Request request, Server server) {
Expand Down
Loading

0 comments on commit cae258c

Please sign in to comment.