[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[optimize][admin]Enhancing Transaction Buffer Stats and Introducing TransactionBufferInternalStats API #20330

Merged
merged 20 commits into from
Jul 22, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
Expand All @@ -36,8 +37,10 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Transactions;
Expand All @@ -47,6 +50,8 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.SnapshotSystemTopicInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
Expand Down Expand Up @@ -170,9 +175,10 @@ protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBu
}

protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative,
boolean lowWaterMarks) {
boolean lowWaterMarks,
boolean segmentStats) {
return getExistingPersistentTopicAsync(authoritative)
.thenApply(topic -> topic.getTransactionBufferStats(lowWaterMarks));
.thenApply(topic -> topic.getTransactionBufferStats(lowWaterMarks, segmentStats));
}

protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats(
Expand Down Expand Up @@ -431,6 +437,69 @@ protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi
);
}

protected CompletableFuture<TransactionBufferInternalStats> internalGetTransactionBufferInternalStats(
boolean authoritative, boolean metadata) {
TransactionBufferInternalStats transactionBufferInternalStats = new TransactionBufferInternalStats();
return getExistingPersistentTopicAsync(authoritative)
.thenCompose(topic -> {
TransactionBuffer.SnapshotType snapshotType = topic.getTransactionBuffer().getSnapshotType();
if (snapshotType == null) {
return FutureUtil.failedFuture(new RestException(NOT_FOUND,
"Transaction buffer Snapshot for the topic does not exist"));
} else if (snapshotType == TransactionBuffer.SnapshotType.Segment) {
transactionBufferInternalStats.snapshotType = snapshotType.toString();
TopicName segmentTopic = TopicName.get(TopicDomain.persistent.toString(), namespaceName,
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
return getTxnSnapshotInternalStats(segmentTopic, metadata)
.thenApply(snapshotSystemTopicInternalStats -> {
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
transactionBufferInternalStats.segmentInternalStats =
snapshotSystemTopicInternalStats;
return transactionBufferInternalStats;
}).thenCompose(ignore -> {
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
TopicName indexTopic = TopicName.get(TopicDomain.persistent.toString(), namespaceName,
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
return getTxnSnapshotInternalStats(indexTopic, metadata)
.thenApply(indexStats -> {
transactionBufferInternalStats.segmentIndexInternalStats = indexStats;
return transactionBufferInternalStats;
});
});
} else if (snapshotType == TransactionBuffer.SnapshotType.Single) {
transactionBufferInternalStats.snapshotType = snapshotType.toString();
TopicName singleSnapshotTopic = TopicName.get(TopicDomain.persistent.toString(), namespaceName,
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
return getTxnSnapshotInternalStats(singleSnapshotTopic, metadata)
.thenApply(snapshotSystemTopicInternalStats -> {
transactionBufferInternalStats.singleSnapshotSystemTopicInternalStats =
snapshotSystemTopicInternalStats;
return transactionBufferInternalStats;
});
}
return FutureUtil.failedFuture(new RestException(INTERNAL_SERVER_ERROR, "Unknown SnapshotType "
+ snapshotType));
});
}

private CompletableFuture<SnapshotSystemTopicInternalStats> getTxnSnapshotInternalStats(TopicName topicName,
boolean metadata) {
final PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(new RestException(e));
}
NamespaceService ns = pulsar().getNamespaceService();
return ns.isServiceUnitOwnedAsync(topicName)
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
.thenCompose(isOwner -> admin.topics().getInternalStatsAsync(topicName.toString(), metadata)
.thenApply(persistentTopicInternalStats -> {
SnapshotSystemTopicInternalStats
snapshotSystemTopicInternalStats = new SnapshotSystemTopicInternalStats();
snapshotSystemTopicInternalStats.managedLedgerInternalStats = persistentTopicInternalStats;
snapshotSystemTopicInternalStats.managedLedgerName = topicName.getEncodedLocalName();
return snapshotSystemTopicInternalStats;
}));
}

protected CompletableFuture<PersistentTopic> getExistingPersistentTopicAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,13 @@ public void getTransactionBufferStats(@Suspended final AsyncResponse asyncRespon
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("lowWaterMarks") @DefaultValue("false")
boolean lowWaterMarks) {
boolean lowWaterMarks,
@QueryParam("segmentStats") @DefaultValue("false")
boolean segmentStats) {
try {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionBufferStats(authoritative, lowWaterMarks)
internalGetTransactionBufferStats(authoritative, lowWaterMarks, segmentStats)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
Expand Down Expand Up @@ -332,6 +334,51 @@ public void getPendingAckInternalStats(@Suspended final AsyncResponse asyncRespo
}
}

@GET
@Path("/transactionBufferInternalStats/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Get transaction buffer internal stats.")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 503, message = "This Broker is not enable transaction"),
@ApiResponse(code = 307, message = "Topic is not owned by this broker!"),
@ApiResponse(code = 405, message = "Transaction buffer don't use managedLedger!"),
@ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 409, message = "Concurrent modification")
})
public void getTransactionBufferInternalStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionBufferInternalStats(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get transaction buffer internal stats {}",
clientAppId(), topicName, ex);
}
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, cause));
} else if (cause instanceof BrokerServiceException.NotAllowedException) {
asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED, cause));
} else if (cause instanceof BrokerServiceException.SubscriptionNotFoundException) {
asyncResponse.resume(new RestException(NOT_FOUND, cause));
} else {
asyncResponse.resume(new RestException(cause));
}
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
return null;
});
} catch (Exception ex) {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
}

@POST
@Path("/transactionCoordinator/replicas")
@ApiResponses(value = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3547,8 +3547,8 @@ public boolean checkSubscriptionTypesEnable(SubType subType) {
return subTypesEnabled != null && subTypesEnabled.contains(subType);
}

public TransactionBufferStats getTransactionBufferStats(boolean lowWaterMarks) {
return this.transactionBuffer.getStats(lowWaterMarks);
public TransactionBufferStats getTransactionBufferStats(boolean lowWaterMarks, boolean segmentStats) {
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
return this.transactionBuffer.getStats(lowWaterMarks, segmentStats);
}

public TransactionPendingAckStats getTransactionPendingAckStats(String subName, boolean lowWaterMarks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;


public interface AbortedTxnProcessor {
Expand Down Expand Up @@ -66,9 +67,10 @@ public interface AbortedTxnProcessor {

/**
* Get the lastSnapshotTimestamps.
* @return the lastSnapshotTimestamps.
*
* @return a transactionBufferStats with the stats in the abortedTxnProcessor.
*/
long getLastSnapshotTimestamps();
TransactionBufferStats generateSnapshotStats(boolean segmentStats);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add a new method for compatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbortedTxnProcessor.java was not included in any release. So we don't need to consider the compatibility for it.


CompletableFuture<Void> closeAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
@Beta
public interface TransactionBuffer {

enum SnapshotType {
Single,
Segment,
}

liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
/**
* Return the metadata of a transaction in the buffer.
*
Expand Down Expand Up @@ -158,6 +163,17 @@ public interface TransactionBuffer {
*/
PositionImpl getMaxReadPosition();

/**
* Get the snapshot type.
*
* The snapshot type can be either "Single" or "Segment". In "Single" mode, a single snapshot log is used
* to record the transaction buffer stats. In "Segment" mode, a snapshot segment topic is used to record
* the stats, and a separate snapshot segment index topic is used to index these stats.
*
* @return the snapshot type
*/
SnapshotType getSnapshotType();

/**
* Get transaction in buffer stats.
* @return the transaction in buffer stats.
Expand All @@ -168,7 +184,7 @@ public interface TransactionBuffer {
* Get transaction stats in buffer.
* @return the transaction stats in buffer.
*/
TransactionBufferStats getStats(boolean lowWaterMarks);
TransactionBufferStats getStats(boolean lowWaterMarks, boolean segmentStats);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved

/**
* Wait TransactionBuffer Recovers completely.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,18 @@ public PositionImpl getMaxReadPosition() {
return PositionImpl.LATEST;
}

@Override
public SnapshotType getSnapshotType() {
return null;
}

@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
return null;
}

@Override
public TransactionBufferStats getStats(boolean lowWaterMarks) {
public TransactionBufferStats getStats(boolean lowWaterMarks, boolean segmentStats) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
Expand Down Expand Up @@ -173,8 +174,11 @@ public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl maxReadPosit
}

@Override
public long getLastSnapshotTimestamps() {
return this.lastSnapshotTimestamps;
public TransactionBufferStats generateSnapshotStats(boolean segmentStats) {
TransactionBufferStats transactionBufferStats = new TransactionBufferStats();
transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps;
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
transactionBufferStats.totalAbortedTransactions = aborts.size();
return transactionBufferStats;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SegmentStats;
import org.apache.pulsar.common.policies.data.SegmentsStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;

Expand Down Expand Up @@ -116,6 +119,8 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess

private volatile long lastSnapshotTimestamps;

private volatile long lastTakedSnapshotSegmentTimestamp;

/**
* The number of the aborted transaction IDs in a segment.
* This is calculated according to the configured memory size.
Expand Down Expand Up @@ -451,9 +456,25 @@ public CompletableFuture<Void> clearAbortedTxnSnapshot() {
persistentWorker::clearSnapshotSegmentAndIndexes);
}

@Override
public long getLastSnapshotTimestamps() {
return this.lastSnapshotTimestamps;
public TransactionBufferStats generateSnapshotStats(boolean segmentStats) {
TransactionBufferStats transactionBufferStats = new TransactionBufferStats();
transactionBufferStats.totalAbortedTransactions = this.aborts.size();
transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps;
SegmentsStats segmentsStats = new SegmentsStats();
segmentsStats.currentSegmentCapacity = this.snapshotSegmentCapacity;
segmentsStats.lastTookSnapshotSegmentTimestamp = this.lastTakedSnapshotSegmentTimestamp;
segmentsStats.unsealedAbortTxnIDSize = this.unsealedTxnIds.size();
segmentsStats.segmentsSize = indexes.size();
if (segmentStats) {
List<SegmentStats> statsList = new ArrayList<>();
segmentIndex.forEach((position, txnID) -> {
SegmentStats stats = new SegmentStats(txnID.toString(), position.toString());
statsList.add(stats);
});
segmentsStats.segmentStats = statsList;
}
transactionBufferStats.segmentsStats = segmentsStats;
return transactionBufferStats;
}

@Override
Expand Down Expand Up @@ -705,6 +726,7 @@ private CompletableFuture<Void> writeSnapshotSegmentAsync(LinkedList<TxnID> segm
transactionBufferSnapshotSegment.setSequenceId(this.sequenceID.get());
return segmentWriter.writeAsync(buildKey(this.sequenceID.get()), transactionBufferSnapshotSegment);
}).thenCompose((messageId) -> {
lastTakedSnapshotSegmentTimestamp = System.currentTimeMillis();
//Build index for this segment
TransactionBufferSnapshotIndex index = new TransactionBufferSnapshotIndex();
index.setSequenceID(transactionBufferSnapshotSegment.getSequenceId());
Expand Down
Loading