[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add grpc gcs statistics support (#1158) #1190

Merged
merged 4 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics;
import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus.StatisticsType;
import com.google.cloud.hadoop.util.ITraceFactory;
import com.google.cloud.hadoop.util.ITraceOperation;
import com.google.common.base.Stopwatch;
import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.GoogleLogger;
import io.grpc.Status;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -239,6 +239,43 @@ private void updateGcsIOSpecificStatistics(int statusCode) {
}
}

private int grpcToHttpStatusCodeMapping(Status grpcStatusCode) {
// using code.proto as reference
// https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
switch (grpcStatusCode.getCode()) {
case OK:
return 200;
case CANCELLED:
return 499;
case INVALID_ARGUMENT:
case FAILED_PRECONDITION:
case OUT_OF_RANGE:
return 400;
case DEADLINE_EXCEEDED:
return 504;
case NOT_FOUND:
return 404;
case ALREADY_EXISTS:
case ABORTED:
return 409;
case PERMISSION_DENIED:
return 403;
case RESOURCE_EXHAUSTED:
return 429;
case UNIMPLEMENTED:
return 501;
case UNAVAILABLE:
return 503;
case UNAUTHENTICATED:
return 401;
case UNKNOWN:
case INTERNAL:
case DATA_LOSS:
default:
return 500;
}
}

/**
* Updating the required gcs specific statistics based on HttpResponseException.
*
Expand All @@ -263,23 +300,23 @@ private void subscriberOnGoogleJsonResponseException(
/**
* Updating the required gcs specific statistics based on HttpResponse.
*
* @param response contains statusCode based on which metrics are updated
* @param responseStatus responseStatus status code from HTTP response
*/
@Subscribe
private void subscriberOnHttpResponse(@Nonnull HttpResponse response) {
updateGcsIOSpecificStatistics(response.getStatusCode());
private void subscriberOnHttpResponseStatus(@Nonnull Integer responseStatus) {
updateGcsIOSpecificStatistics(responseStatus);
}

/**
* Updating the GCS_TOTAL_REQUEST_COUNT
*
* @param request
*/
@Subscribe
private void subscriberOnHttpRequest(@Nonnull HttpRequest request) {
private void subscriberOnGcsRequest(@Nonnull GcsRequestExecutionEvent event) {
incrementGcsTotalRequestCount();
}

@Subscribe
private void subscriberOnGrpcStatus(@Nonnull Status status) {
updateGcsIOSpecificStatistics(grpcToHttpStatusCodeMapping(status));
}

/**
* Updating the EXCEPTION_COUNT
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class GhfsStorageStatistics extends StorageStatisticsFromIOStatistics {
/** IOStatistics Instance */
private final IOStatistics ioStatistics;


/** Create the Storage Statistics instance from the IOStatistics */
public GhfsStorageStatistics(IOStatistics ioStatistics) {
super(NAME, "Ghfs", ioStatistics);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.fs.gcs;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.EXCEPTION_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_RATE_LIMIT_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_SIDE_ERROR_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_REQUEST_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_SERVER_SIDE_ERROR_COUNT;
import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.common.flogger.GoogleLogger;
import io.grpc.Status;
import java.util.Iterator;
import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class GoogleCloudStorageStatisticsTest {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private GhfsGlobalStorageStatistics subscriber = new GhfsGlobalStorageStatistics();

@Before
public void setUp() throws Exception {

GoogleCloudStorageEventBus.register(subscriber);
}

@After
public void cleanup() throws Exception {

GoogleCloudStorageEventBus.unregister(subscriber);
}

private void verifyStatistics(GhfsGlobalStorageStatistics expectedStats) {
Iterator<LongStatistic> statsIterator = expectedStats.getLongStatistics();
boolean metricsVerified = true;
while (statsIterator.hasNext()) {
LongStatistic stats = statsIterator.next();
Long value = subscriber.getLong(stats.getName());
if (stats.getValue() != value) {
logger.atWarning().log(
"Metric values not matching. for: %s, expected: %d, got: %d",
stats.getName(), stats.getValue(), value);
metricsVerified = false;
break;
}
}
assertThat(metricsVerified).isTrue();
}

@Test
public void gcs_requestCounter() throws Exception {
GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent());
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(GCS_REQUEST_COUNT, 1);
verifyStatistics(verifyCounterStats);
}

@Test
public void gcs_rateLimitCounter() {
// verify for http event i.e. via Apiary
GoogleCloudStorageEventBus.postOnHttpResponseStatus(429);
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(GCS_CLIENT_RATE_LIMIT_COUNT, 1);
verifyCounterStats.incrementCounter(GCS_CLIENT_SIDE_ERROR_COUNT, 1);
verifyStatistics(verifyCounterStats);

subscriber.reset();

// verify for gRPC event i.e. via java-storage
GoogleCloudStorageEventBus.onGrpcStatus(Status.RESOURCE_EXHAUSTED);
verifyStatistics(verifyCounterStats);
}

@Test
public void gcs_clientSideErrorCounter() {
GoogleCloudStorageEventBus.postOnHttpResponseStatus(404);
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(GCS_CLIENT_SIDE_ERROR_COUNT, 1);
verifyStatistics(verifyCounterStats);

subscriber.reset();

// verify for gRPC event i.e. via java-storage
GoogleCloudStorageEventBus.onGrpcStatus(Status.CANCELLED);
verifyStatistics(verifyCounterStats);
}

@Test
public void gcs_serverSideErrorCounter() {
GoogleCloudStorageEventBus.postOnHttpResponseStatus(503);
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(GCS_SERVER_SIDE_ERROR_COUNT, 1);
verifyStatistics(verifyCounterStats);

subscriber.reset();

// verify for gRPC event i.e. via java-storage
GoogleCloudStorageEventBus.onGrpcStatus(Status.INTERNAL);
verifyStatistics(verifyCounterStats);
}

@Test
public void gcs_ExceptionCounter() {
GoogleCloudStorageEventBus.postOnException();
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(EXCEPTION_COUNT, 1);
verifyStatistics(verifyCounterStats);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.gcsio;

import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;

/** This is a gRPC interceptor to capture the statistics related to calls made to gcs backend. */
@VisibleForTesting
public class GoogleCloudStorageClientGrpcStatisticsInterceptor implements ClientInterceptor {

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
try {
GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent());
} finally {
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
try {
GoogleCloudStorageEventBus.onGrpcStatus(status);
} finally {
super.onClose(status, trailers);
}
}
},
headers);
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ private static Storage createStorage(
if (storageOptions.isTraceLogEnabled()) {
list.add(new GoogleCloudStorageClientGrpcTracingInterceptor());
}
list.add(new GoogleCloudStorageClientGrpcStatisticsInterceptor());
return ImmutableList.copyOf(list);
})
.setCredentials(credentials != null ? credentials : NoCredentials.getInstance())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.util;

import com.google.common.annotations.VisibleForTesting;

/** This an Event which is published in EvenBus queue whenever a gcs request is created/executed. */
@VisibleForTesting
public class GcsRequestExecutionEvent {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package com.google.cloud.hadoop.util;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.common.eventbus.EventBus;
import io.grpc.Status;
import java.io.IOException;

/** Event Bus class */
Expand All @@ -45,6 +44,16 @@ public static void register(Object obj) {
eventBus.register(obj);
}

/**
* Method to unregister an obj to event bus
*
* @param obj to unregister from event bus
* @throws IllegalArgumentException if the object was not previously registered.
*/
public static void unregister(Object obj) {
eventBus.unregister(obj);
}

/**
* Posting GoogleJsonResponseException to invoke corresponding Subscriber method.
*
Expand All @@ -66,19 +75,19 @@ public static void postOnHttpResponseException(HttpResponseException response) {
/**
* Posting HttpResponse to invoke corresponding Subscriber method.
*
* @param response contains statusCode based on which metrics are updated in Subscriber method
* @param responseStatus response status code
*/
public static void postOnHttpResponse(HttpResponse response) {
eventBus.post(response);
public static void postOnHttpResponseStatus(int responseStatus) {
eventBus.post(responseStatus);
}

/**
* Posting HttpRequest to invoke corresponding Subscriber method.
* Posting Gcs request execution event i.e. request to gcs is being initiated.
*
* @param request based on which metrics are updated in Subscriber method
* @param event dummy event to map to request execution type.
*/
public static void postOnHttpRequest(HttpRequest request) {
eventBus.post(request);
public static void onGcsRequest(GcsRequestExecutionEvent event) {
eventBus.post(event);
}

/**
Expand All @@ -96,4 +105,12 @@ public static void postOnException() {
public static void postOnStatisticsType() {
eventBus.post(StatisticsType.DIRECTORIES_DELETED);
}
/**
* Posting grpc Status to invoke the corresponding Subscriber method.
*
* @param status status object of grpc response
*/
public static void onGrpcStatus(Status status) {
eventBus.post(status);
}
}
Loading