[go: nahoru, domu]

Skip to content

Commit

Permalink
fix(spanner): fix negative values for max_in_use_sessions metrics (#1…
Browse files Browse the repository at this point in the history
…0449)

* fix(spanner): fix negative values for max_in_use_sessions metrics

* fix failing tests

* incorporate changes

* add comment
  • Loading branch information
rahul2393 committed Jun 28, 2024
1 parent fc4c910 commit a1e198a
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 20 deletions.
35 changes: 19 additions & 16 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func (sh *sessionHandle) destroy() {
p.trackedSessionHandles.Remove(tracked)
p.mu.Unlock()
}
s.destroy(false)
// since sessionHandle is always used by Transactions we can safely destroy the session with wasInUse=true
s.destroy(false, true)
}

func (sh *sessionHandle) updateLastUseTime() {
Expand Down Expand Up @@ -374,7 +375,7 @@ func (s *session) recycle() {
// s is rejected by its home session pool because it expired and the
// session pool currently has enough open sessions.
s.pool.mu.Unlock()
s.destroy(false)
s.destroy(false, true)
s.pool.mu.Lock()
}
s.pool.decNumInUseLocked(context.Background())
Expand All @@ -383,15 +384,15 @@ func (s *session) recycle() {

// destroy removes the session from its home session pool, healthcheck queue
// and Cloud Spanner service.
func (s *session) destroy(isExpire bool) bool {
func (s *session) destroy(isExpire, wasInUse bool) bool {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
return s.destroyWithContext(ctx, isExpire)
return s.destroyWithContext(ctx, isExpire, wasInUse)
}

func (s *session) destroyWithContext(ctx context.Context, isExpire bool) bool {
func (s *session) destroyWithContext(ctx context.Context, isExpire, wasInUse bool) bool {
// Remove s from session pool.
if !s.pool.remove(s, isExpire) {
if !s.pool.remove(s, isExpire, wasInUse) {
return false
}
// Unregister s from healthcheck queue.
Expand Down Expand Up @@ -900,14 +901,14 @@ func (p *sessionPool) close(ctx context.Context) {
wg := sync.WaitGroup{}
for _, s := range allSessions {
wg.Add(1)
go deleteSession(ctx, s, &wg)
go closeSession(ctx, s, &wg)
}
wg.Wait()
}

func deleteSession(ctx context.Context, s *session, wg *sync.WaitGroup) {
func closeSession(ctx context.Context, s *session, wg *sync.WaitGroup) {
defer wg.Done()
s.destroyWithContext(ctx, false)
s.destroyWithContext(ctx, false, false)
}

// errInvalidSessionPool is the error for using an invalid session pool.
Expand Down Expand Up @@ -1022,7 +1023,7 @@ func (p *sessionPool) isHealthy(s *session) bool {
if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) {
if err := s.ping(); isSessionNotFoundError(err) {
// The session is already bad, continue to fetch/create a new one.
s.destroy(false)
s.destroy(false, true)
return false
}
p.hc.scheduledHC(s)
Expand Down Expand Up @@ -1133,7 +1134,7 @@ func (p *sessionPool) recycleLocked(s *session) bool {
// remove atomically removes session s from the session pool and invalidates s.
// If isExpire == true, the removal is triggered by session expiration and in
// such cases, only idle sessions can be removed.
func (p *sessionPool) remove(s *session, isExpire bool) bool {
func (p *sessionPool) remove(s *session, isExpire bool, wasInUse bool) bool {
p.mu.Lock()
defer p.mu.Unlock()
if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) {
Expand All @@ -1152,8 +1153,10 @@ func (p *sessionPool) remove(s *session, isExpire bool) bool {
if s.invalidate() {
// Decrease the number of opened sessions.
p.numOpened--
// Decrease the number of sessions in use.
p.decNumInUseLocked(ctx)
// Decrease the number of sessions in use, only when not from idle list.
if wasInUse {
p.decNumInUseLocked(ctx)
}
p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
// Broadcast that a session has been destroyed.
close(p.mayGetSession)
Expand Down Expand Up @@ -1456,12 +1459,12 @@ func (hc *healthChecker) healthCheck(s *session) {
defer hc.markDone(s)
if !s.pool.isValid() {
// Session pool is closed, perform a garbage collection.
s.destroy(false)
s.destroy(false, true)
return
}
if err := s.ping(); isSessionNotFoundError(err) {
// Ping failed, destroy the session.
s.destroy(false)
s.destroy(false, true)
}
}

Expand Down Expand Up @@ -1643,7 +1646,7 @@ func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uin
if s != nil {
deleted++
// destroy session as expire.
s.destroy(true)
s.destroy(true, false)
} else {
break
}
Expand Down
54 changes: 50 additions & 4 deletions spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ func TestMinOpenedSessions(t *testing.T) {

// Simulate session expiration.
for _, s := range ss {
s.destroy(true)
s.destroy(true, false)
}

// Wait until the maintainer has had a chance to replenish the pool.
Expand All @@ -1022,6 +1022,52 @@ func TestMinOpenedSessions(t *testing.T) {
}
}

// TestPositiveNumInUseSessions tests that num_in_use session should always be greater than 0.
func TestPositiveNumInUseSessions(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
healthCheckSampleInterval: time.Millisecond,
},
})
defer teardown()
sp := client.idleSessions
defer sp.close(ctx)
// Take ten sessions from session pool and recycle them.
var shs []*sessionHandle
for i := 0; i < 10; i++ {
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session(%v): %v", i, err)
}
shs = append(shs, sh)
}
for _, sh := range shs {
sh.recycle()
}
waitFor(t, func() error {
sp.mu.Lock()
if sp.idleList.Len() != 1 {
sp.mu.Unlock()
return errInvalidSessionPool
}
sp.mu.Unlock()
return nil
})
sp.mu.Lock()
defer sp.mu.Unlock()
if int64(sp.numInUse) < 0 {
t.Fatal("numInUse must be >= 0")
}
// There should be still one session left in the idle list.
if sp.idleList.Len() != 1 {
t.Fatalf("got %v sessions in idle lists, want 1. Opened: %d, Creation: %d", sp.idleList.Len(), sp.numOpened, sp.createReqs)
}
}

// TestMaxBurst tests max burst constraint.
func TestMaxBurst(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -1145,11 +1191,11 @@ func TestSessionDestroy(t *testing.T) {
}
s := sh.session
sh.recycle()
if d := s.destroy(true); d || !s.isValid() {
// Session should be remaining because of min open sessions constraint.
if d := s.destroy(true, false); d || !s.isValid() {
// Session should be remaining because of min open session's constraint.
t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d)
}
if d := s.destroy(false); !d || s.isValid() {
if d := s.destroy(false, true); !d || s.isValid() {
// Session should be destroyed.
t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d)
}
Expand Down

0 comments on commit a1e198a

Please sign in to comment.