From a1e198a9b18bd2f92c3438e4f609412047f8ccf4 Mon Sep 17 00:00:00 2001 From: rahul2393 Date: Fri, 28 Jun 2024 12:44:10 +0530 Subject: [PATCH] fix(spanner): fix negative values for max_in_use_sessions metrics (#10449) * fix(spanner): fix negative values for max_in_use_sessions metrics * fix failing tests * incorporate changes * add comment --- spanner/session.go | 35 ++++++++++++++------------ spanner/session_test.go | 54 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/spanner/session.go b/spanner/session.go index b6b8996790fc..727952724d4a 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -197,7 +197,8 @@ func (sh *sessionHandle) destroy() { p.trackedSessionHandles.Remove(tracked) p.mu.Unlock() } - s.destroy(false) + // since sessionHandle is always used by Transactions we can safely destroy the session with wasInUse=true + s.destroy(false, true) } func (sh *sessionHandle) updateLastUseTime() { @@ -374,7 +375,7 @@ func (s *session) recycle() { // s is rejected by its home session pool because it expired and the // session pool currently has enough open sessions. s.pool.mu.Unlock() - s.destroy(false) + s.destroy(false, true) s.pool.mu.Lock() } s.pool.decNumInUseLocked(context.Background()) @@ -383,15 +384,15 @@ func (s *session) recycle() { // destroy removes the session from its home session pool, healthcheck queue // and Cloud Spanner service. -func (s *session) destroy(isExpire bool) bool { +func (s *session) destroy(isExpire, wasInUse bool) bool { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - return s.destroyWithContext(ctx, isExpire) + return s.destroyWithContext(ctx, isExpire, wasInUse) } -func (s *session) destroyWithContext(ctx context.Context, isExpire bool) bool { +func (s *session) destroyWithContext(ctx context.Context, isExpire, wasInUse bool) bool { // Remove s from session pool. - if !s.pool.remove(s, isExpire) { + if !s.pool.remove(s, isExpire, wasInUse) { return false } // Unregister s from healthcheck queue. @@ -900,14 +901,14 @@ func (p *sessionPool) close(ctx context.Context) { wg := sync.WaitGroup{} for _, s := range allSessions { wg.Add(1) - go deleteSession(ctx, s, &wg) + go closeSession(ctx, s, &wg) } wg.Wait() } -func deleteSession(ctx context.Context, s *session, wg *sync.WaitGroup) { +func closeSession(ctx context.Context, s *session, wg *sync.WaitGroup) { defer wg.Done() - s.destroyWithContext(ctx, false) + s.destroyWithContext(ctx, false, false) } // errInvalidSessionPool is the error for using an invalid session pool. @@ -1022,7 +1023,7 @@ func (p *sessionPool) isHealthy(s *session) bool { if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) { if err := s.ping(); isSessionNotFoundError(err) { // The session is already bad, continue to fetch/create a new one. - s.destroy(false) + s.destroy(false, true) return false } p.hc.scheduledHC(s) @@ -1133,7 +1134,7 @@ func (p *sessionPool) recycleLocked(s *session) bool { // remove atomically removes session s from the session pool and invalidates s. // If isExpire == true, the removal is triggered by session expiration and in // such cases, only idle sessions can be removed. -func (p *sessionPool) remove(s *session, isExpire bool) bool { +func (p *sessionPool) remove(s *session, isExpire bool, wasInUse bool) bool { p.mu.Lock() defer p.mu.Unlock() if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { @@ -1152,8 +1153,10 @@ func (p *sessionPool) remove(s *session, isExpire bool) bool { if s.invalidate() { // Decrease the number of opened sessions. p.numOpened-- - // Decrease the number of sessions in use. - p.decNumInUseLocked(ctx) + // Decrease the number of sessions in use, only when not from idle list. + if wasInUse { + p.decNumInUseLocked(ctx) + } p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) // Broadcast that a session has been destroyed. close(p.mayGetSession) @@ -1456,12 +1459,12 @@ func (hc *healthChecker) healthCheck(s *session) { defer hc.markDone(s) if !s.pool.isValid() { // Session pool is closed, perform a garbage collection. - s.destroy(false) + s.destroy(false, true) return } if err := s.ping(); isSessionNotFoundError(err) { // Ping failed, destroy the session. - s.destroy(false) + s.destroy(false, true) } } @@ -1643,7 +1646,7 @@ func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uin if s != nil { deleted++ // destroy session as expire. - s.destroy(true) + s.destroy(true, false) } else { break } diff --git a/spanner/session_test.go b/spanner/session_test.go index 367242e86cb3..0a9181b09258 100644 --- a/spanner/session_test.go +++ b/spanner/session_test.go @@ -997,7 +997,7 @@ func TestMinOpenedSessions(t *testing.T) { // Simulate session expiration. for _, s := range ss { - s.destroy(true) + s.destroy(true, false) } // Wait until the maintainer has had a chance to replenish the pool. @@ -1022,6 +1022,52 @@ func TestMinOpenedSessions(t *testing.T) { } } +// TestPositiveNumInUseSessions tests that num_in_use session should always be greater than 0. +func TestPositiveNumInUseSessions(t *testing.T) { + t.Parallel() + ctx := context.Background() + _, client, teardown := setupMockedTestServerWithConfig(t, + ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + healthCheckSampleInterval: time.Millisecond, + }, + }) + defer teardown() + sp := client.idleSessions + defer sp.close(ctx) + // Take ten sessions from session pool and recycle them. + var shs []*sessionHandle + for i := 0; i < 10; i++ { + sh, err := sp.take(ctx) + if err != nil { + t.Fatalf("failed to get session(%v): %v", i, err) + } + shs = append(shs, sh) + } + for _, sh := range shs { + sh.recycle() + } + waitFor(t, func() error { + sp.mu.Lock() + if sp.idleList.Len() != 1 { + sp.mu.Unlock() + return errInvalidSessionPool + } + sp.mu.Unlock() + return nil + }) + sp.mu.Lock() + defer sp.mu.Unlock() + if int64(sp.numInUse) < 0 { + t.Fatal("numInUse must be >= 0") + } + // There should be still one session left in the idle list. + if sp.idleList.Len() != 1 { + t.Fatalf("got %v sessions in idle lists, want 1. Opened: %d, Creation: %d", sp.idleList.Len(), sp.numOpened, sp.createReqs) + } +} + // TestMaxBurst tests max burst constraint. func TestMaxBurst(t *testing.T) { t.Parallel() @@ -1145,11 +1191,11 @@ func TestSessionDestroy(t *testing.T) { } s := sh.session sh.recycle() - if d := s.destroy(true); d || !s.isValid() { - // Session should be remaining because of min open sessions constraint. + if d := s.destroy(true, false); d || !s.isValid() { + // Session should be remaining because of min open session's constraint. t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d) } - if d := s.destroy(false); !d || s.isValid() { + if d := s.destroy(false, true); !d || s.isValid() { // Session should be destroyed. t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d) }