[go: nahoru, domu]

Skip to content

Commit

Permalink
1. add actor tick schedule and refactor some code of akka
Browse files Browse the repository at this point in the history
  • Loading branch information
Joezeo committed Jul 22, 2022
1 parent 8b57dfe commit 18fa9a4
Show file tree
Hide file tree
Showing 19 changed files with 443 additions and 25 deletions.
16 changes: 16 additions & 0 deletions common/src/main/java/com/toocol/common/akka/ActorAddTicker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.toocol.common.akka;

import akka.actor.ActorRef;
import lombok.AllArgsConstructor;

/**
* @author :JoeZane (joezane.cn@gmail.com)
* @date: 2022/7/23 0:31
* @version: 0.0.1
*/
@AllArgsConstructor
public class ActorAddTicker {

public final ActorRef ticker;

}
9 changes: 9 additions & 0 deletions common/src/main/java/com/toocol/common/akka/ActorCreate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.toocol.common.akka;

/**
* @author ZhaoZhe (joezane.cn@gmail.com)
* @date 2022/7/21 20:27
*/
public enum ActorCreate {
of
}
21 changes: 18 additions & 3 deletions common/src/main/java/com/toocol/common/akka/ActorInitializer.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package com.toocol.common.akka;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.routing.SmallestMailboxPool;
import com.toocol.common.events.AsyncEventDispatcher;
import com.toocol.common.functional.OnceCheck;
import com.toocol.common.vessel.AbstractVessel;
import com.toocol.common.vessel.DefaultVessel;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -32,12 +37,22 @@ public void initialize() {
return;
}

vessel.actorTickScheduler = vessel.actorSystem.actorOf(Props.create(ActorTickScheduler.class), "actor-tick-scheduler");
log.info("Create the actor tick scheduler success.");

vessel.asyncEventDispatcherRef = vessel.actorSystem.actorOf(
new SmallestMailboxPool(Runtime.getRuntime().availableProcessors())
.props(Props.create(AsyncEventDispatcher.class)),
new SmallestMailboxPool(AsyncEventDispatcher.ACTOR_COUNT_OF_DISPATCHER).props(Props.create(AsyncEventDispatcher.class)),
"async-event-system"
);
log.info("Create the async event system.");
log.info("Create the async event system success.");

ThreadFactory factory = new DefaultThreadFactory("actor-tick", false);
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(8), factory);
vessel.actorSystem.scheduler()
.schedule(FiniteDuration.apply(10, TimeUnit.SECONDS), FiniteDuration.apply(1, TimeUnit.SECONDS),
vessel.actorTickScheduler, ActorTick.of, ExecutionContext.fromExecutor(executorService), ActorRef.noSender());
log.info("Initialize the actor tick scheduler success.");
}

@Override
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/java/com/toocol/common/akka/ActorTick.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.toocol.common.akka;

/**
* @author :JoeZane (joezane.cn@gmail.com)
* @date: 2022/7/22 23:07
* @version: 0.0.1
*/
public enum ActorTick {
of
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.toocol.common.akka;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;

import java.util.ArrayList;
import java.util.List;

/**
* @author :JoeZane (joezane.cn@gmail.com)
* @date: 2022/7/23 0:22
* @version: 0.0.1
*/
public class ActorTickScheduler extends AbstractActor {

private final List<ActorRef> tickList = new ArrayList<>();

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ActorAddTicker.class, actorAddTicker -> addTicker(actorAddTicker.ticker))
.match(ActorTick.class, tick -> tick())
.build();
}

protected void addTicker(ActorRef ticker) {
tickList.add(ticker);
}

private void tick() {
tickList.forEach(actorRef -> actorRef.tell(ActorTick.of, ActorRef.noSender()));
}

}
15 changes: 15 additions & 0 deletions common/src/main/java/com/toocol/common/akka/IActorTicker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.toocol.common.akka;

/**
* Perform some tasks periodically(per second).
* Only support the actor in akka system.
*
* @author :JoeZane (joezane.cn@gmail.com)
* @date: 2022/7/22 22:51
* @version: 0.0.1
*/
public interface IActorTicker {

void tick();

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.toocol.common.akka.CreateActor;
import com.google.common.collect.ImmutableMap;
import com.toocol.common.akka.ActorAddTicker;
import com.toocol.common.akka.ActorCreate;
import com.toocol.common.utils.ClassScanner;
import com.toocol.common.vessel.AbstractVessel;
import com.toocol.common.vessel.DefaultVessel;

import java.util.*;

Expand All @@ -15,37 +19,44 @@
* @version: 0.0.1
*/
public class AsyncEventDispatcher extends AbstractActor {
public static final int ACTOR_COUNT_OF_DISPATCHER = 20;

protected static final Map<Class<? extends AsyncEvent>, List<ActorRef>> listenerMap = new HashMap<>();
private static Map<Class<? extends AsyncEvent>, List<ActorRef>> listenerMap;

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(CreateActor.class, create -> createAsyncListeners())
.match(ActorCreate.class, create -> createAsyncListeners())
.match(AsyncEvent.class, this::dispatch)
.build();
}

private void createAsyncListeners() {
Map<Class<? extends AsyncEvent>, List<ActorRef>> map = new HashMap<>();
DefaultVessel vessel = AbstractVessel.get().as();
new ClassScanner("com.toocol", clazz -> Optional.ofNullable(clazz.getSuperclass()).map(clz -> clz.equals(AsyncEventListener.class)).orElse(false))
.scan()
.forEach(listenerClass -> {
AsyncEventListen annotation = listenerClass.getAnnotation(AsyncEventListen.class);
if (annotation == null) {
return;
}
listenerMap.compute(annotation.listen(), (k, v) -> {
map.compute(annotation.value(), (k, v) -> {
if (v == null) {
v = new ArrayList<>();
}
v.add(getContext().actorOf(Props.create(AsyncEventListener.class)));
ActorRef ref = getContext().actorOf(Props.create(listenerClass), listenerClass.getSimpleName());
v.add(ref);
vessel.actorTickScheduler.tell(new ActorAddTicker(ref), ActorRef.noSender());
return v;
});
});
listenerMap = ImmutableMap.copyOf(map);
}

private void dispatch(AsyncEvent event) {
Optional.ofNullable(listenerMap.get(event.getClass()))
.ifPresent(eventListeners -> eventListeners.forEach(listener -> listener.tell(event.as(), ActorRef.noSender())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncEventListen {

Class<? extends AsyncEvent> listen();
Class<? extends AsyncEvent> value();

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import com.toocol.common.akka.ActorTick;
import com.toocol.common.akka.IActorTicker;

/**
* @author ZhaoZhe (joezane.cn@gmail.com)
* @date 2022/7/21 19:56
*/
public abstract class AsyncEventListener<T extends AsyncEvent> extends AbstractActor implements IEventListener<T> {
public abstract class AsyncEventListener<T extends AsyncEvent> extends AbstractActor implements IEventListener<T>, IActorTicker {

private final Class<T> clazz;

Expand All @@ -19,6 +21,12 @@ protected AsyncEventListener(Class<T> clazz) {
public Receive createReceive() {
return ReceiveBuilder.create()
.match(clazz, this::handler)
.match(ActorTick.class, actorTick -> tick())
.build();
}

@Override
public void tick() {

}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.toocol.common.events;

import akka.actor.ActorRef;
import com.toocol.common.akka.CreateActor;
import com.toocol.common.akka.ActorCreate;
import com.toocol.common.functional.OnceCheck;
import com.toocol.common.vessel.AbstractVessel;
import com.toocol.common.vessel.DefaultVessel;
Expand All @@ -28,7 +28,7 @@ public void initialize() {

DefaultVessel vessel = AbstractVessel.get().as();
if (vessel.asyncEventDispatcherRef != null) {
vessel.asyncEventDispatcherRef.tell(CreateActor.of(), ActorRef.noSender());
vessel.asyncEventDispatcherRef.tell(ActorCreate.of, ActorRef.noSender());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
* @date: 2022/7/21 0:35
* @version: 0.0.1
*/
public abstract class SyncEvent extends AbstractEvent{
public abstract class SyncEvent extends AbstractEvent {
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.lang.annotation.Target;

/**
* Each synchronization listener needs to add this annotation.
* Each synchronization listener needs to add this annotation.
*
* @author :JoeZane (joezane.cn@gmail.com)
* @date: 2022/7/22 1:04
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* @author ZhaoZhe (joezane.cn@gmail.com)
* @date 2022/7/21 19:55
*/
public abstract class SyncEventListener<T extends SyncEvent> implements IEventListener<T>{
public abstract class SyncEventListener<T extends SyncEvent> implements IEventListener<T> {

public final Class<T> clazz;

Expand Down
Loading

0 comments on commit 18fa9a4

Please sign in to comment.