[go: nahoru, domu]

Skip to content

Commit

Permalink
Replace Executor with Reloader
Browse files Browse the repository at this point in the history
  • Loading branch information
nqv committed Jul 17, 2021
1 parent 11fdd18 commit f6da914
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 24 deletions.
11 changes: 6 additions & 5 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ type LoadingCache interface {
// LoaderFunc retrieves the value corresponding to given Key.
type LoaderFunc func(Key) (Value, error)

// Executor specifies how cache loader is run to refresh value for the Key.
// By default, it is run in a new go routine.
type Executor interface {
// Execute runs the fn asynchronously.
Execute(fn func())
// Reloader specifies how cache loader is run to refresh value for the given Key.
// If Reloader is not set, cache values are refreshed in a new go routine.
type Reloader interface {
// Reload should reload the value asynchronously.
// Application must call setFunc to set new value or error.
Reload(key Key, oldValue Value, setFunc func(Value, error))
// Close shuts down all running tasks. Currently, the error returned is not being used.
Close() error
}
42 changes: 29 additions & 13 deletions local.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ type localCache struct {
onInsertion Func
onRemoval Func

loader LoaderFunc
exec Executor
stats StatsCounter
loader LoaderFunc
reloader Reloader
stats StatsCounter

// cap is the cache capacity.
cap int
Expand Down Expand Up @@ -225,9 +225,9 @@ func (c *localCache) processEntries() {
}
c.postReadCleanup()
case eventClose:
if c.exec != nil {
if c.reloader != nil {
// Stop all refresh tasks.
c.exec.Close()
c.reloader.Close()
}
c.removeAll()
return
Expand Down Expand Up @@ -317,15 +317,12 @@ func (c *localCache) load(k Key) (Value, error) {

// refreshAsync reloads value in a go routine or using custom executor if defined.
func (c *localCache) refreshAsync(en *entry) bool {
if c.loader == nil {
panic("cache loader function must be set")
}
if en.setLoading(true) {
// Only do refresh if it isn't running.
if c.exec == nil {
if c.reloader == nil {
go c.refresh(en)
} else {
c.exec.Execute(func() { c.refresh(en) })
c.reload(en)
}
return true
}
Expand Down Expand Up @@ -353,6 +350,25 @@ func (c *localCache) refresh(en *entry) {
}
}

// reload uses user-defined reloader to reloads value.
func (c *localCache) reload(en *entry) {
start := currentTime()
setFn := func(newValue Value, err error) {
defer en.setLoading(false)
now := currentTime()
loadTime := now.Sub(start)
if err == nil {
en.setValue(newValue)
c.setEntryWriteTime(en, now)
c.sendEvent(eventWrite, en)
c.stats.RecordLoadSuccess(loadTime)
} else {
c.stats.RecordLoadError(loadTime)
}
}
c.reloader.Reload(en.key, en.getValue(), setFn)
}

// postReadCleanup is run after entry access/delete event.
// This function must only be called from processEntries goroutine.
func (c *localCache) postReadCleanup() {
Expand Down Expand Up @@ -547,12 +563,12 @@ func WithPolicy(name string) Option {
}
}

// WithExecutor returns an option which sets executor for cache loader.
// WithReloader returns an option which sets reloader for a loading cache.
// By default, each asynchronous reload is run in a go routine.
// This option is only applicable for LoadingCache.
func WithExecutor(executor Executor) Option {
func WithReloader(reloader Reloader) Option {
return func(c *localCache) {
c.exec = executor
c.reloader = reloader
}
}

Expand Down
16 changes: 10 additions & 6 deletions local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestRefreshAterWrite(t *testing.T) {
mockTime := newMockTime()
currentTime = mockTime.now
c := NewLoadingCache(loader, WithExpireAfterAccess(4*time.Second), WithRefreshAfterWrite(2*time.Second),
WithExecutor(syncExecutor{}), withInsertionListener(insFunc))
WithReloader(&syncReloader{loader}), withInsertionListener(insFunc))
defer c.Close()

wg.Add(3)
Expand Down Expand Up @@ -391,7 +391,8 @@ func TestLoadingAsyncReload(t *testing.T) {
}
mockTime := newMockTime()
currentTime = mockTime.now
c := NewLoadingCache(loader, WithExpireAfterWrite(5*time.Millisecond), WithExecutor(syncExecutor{}))
c := NewLoadingCache(loader, WithExpireAfterWrite(5*time.Millisecond),
WithReloader(&syncReloader{loader}))
val = "a"
v, err := c.Get(1)
if err != nil || v != val {
Expand Down Expand Up @@ -505,12 +506,15 @@ func (t *mockTime) now() time.Time {
return t.value
}

type syncExecutor struct{}
type syncReloader struct {
loaderFn LoaderFunc
}

func (syncExecutor) Execute(f func()) {
f()
func (s *syncReloader) Reload(k Key, v Value, setFn func(Value, error)) {
v, err := s.loaderFn(k)
setFn(v, err)
}

func (syncExecutor) Close() error {
func (s *syncReloader) Close() error {
return nil
}

0 comments on commit f6da914

Please sign in to comment.