diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index acea424544fa5c..4fe270f109e0b2 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -468,7 +468,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint } // Remove .healing.bin from all disks with similar heal-id - disks, err := z.GetDisks(poolIdx, setIdx) + disks, err := z.GetDisks(poolIdx, setIdx, false) if err != nil { return err } diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index abcb9ff3db0ac1..c7cabc205b0cb7 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -233,7 +233,7 @@ func TestListOnlineDisks(t *testing.T) { data := bytes.Repeat([]byte("a"), smallFileThreshold*16) z := obj.(*erasureServerPools) - erasureDisks, err := z.GetDisks(0, 0) + erasureDisks, err := z.GetDisks(0, 0, false) if err != nil { t.Fatal(err) } @@ -410,7 +410,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { data := bytes.Repeat([]byte("a"), smallFileThreshold/2) z := obj.(*erasureServerPools) - erasureDisks, err := z.GetDisks(0, 0) + erasureDisks, err := z.GetDisks(0, 0, false) if err != nil { t.Fatal(err) } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 96c0f3edbe4037..531d8146ac300c 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -1308,13 +1308,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } if !opts.Speedtest && versionsDisparity { - globalMRFState.addPartialOp(partialOperation{ - bucket: bucket, - object: object, - queued: time.Now(), - allVersions: true, - setIndex: er.setIndex, - poolIndex: er.poolIndex, + globalMRFState.addPartialOp(PartialOperation{ + Bucket: bucket, + Object: object, + Queued: time.Now(), + AllVersions: true, + SetIndex: er.setIndex, + PoolIndex: er.poolIndex, }) } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index ada14422e805b0..e3dd3cd8cd19fb 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -383,24 +383,16 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje // that we have some parts or data blocks missing or corrupted // - attempt a heal to successfully heal them for future calls. if written == partLength { - var scan madmin.HealScanMode - switch { - case errors.Is(err, errFileNotFound): - scan = madmin.HealNormalScan - case errors.Is(err, errFileCorrupt): - scan = madmin.HealDeepScan - } - switch scan { - case madmin.HealNormalScan, madmin.HealDeepScan: + if errors.Is(err, errFileNotFound) || errors.Is(err, errFileCorrupt) { healOnce.Do(func() { - globalMRFState.addPartialOp(partialOperation{ - bucket: bucket, - object: object, - versionID: fi.VersionID, - queued: time.Now(), - setIndex: er.setIndex, - poolIndex: er.poolIndex, - scanMode: scan, + globalMRFState.addPartialOp(PartialOperation{ + Bucket: bucket, + Object: object, + VersionID: fi.VersionID, + Queued: time.Now(), + SetIndex: er.setIndex, + PoolIndex: er.poolIndex, + BitrotScan: errors.Is(err, errFileCorrupt), }) }) // Healing is triggered and we have written @@ -827,13 +819,13 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s // additionally do not heal delete markers inline, let them be // healed upon regular heal process. if missingBlocks > 0 && missingBlocks < fi.Erasure.DataBlocks { - globalMRFState.addPartialOp(partialOperation{ - bucket: fi.Volume, - object: fi.Name, - versionID: fi.VersionID, - queued: time.Now(), - setIndex: er.setIndex, - poolIndex: er.poolIndex, + globalMRFState.addPartialOp(PartialOperation{ + Bucket: fi.Volume, + Object: fi.Name, + VersionID: fi.VersionID, + Queued: time.Now(), + SetIndex: er.setIndex, + PoolIndex: er.poolIndex, }) } @@ -1579,13 +1571,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st break } } else { - globalMRFState.addPartialOp(partialOperation{ - bucket: bucket, - object: object, - queued: time.Now(), - allVersions: true, - setIndex: er.setIndex, - poolIndex: er.poolIndex, + globalMRFState.addPartialOp(PartialOperation{ + Bucket: bucket, + Object: object, + Queued: time.Now(), + AllVersions: true, + SetIndex: er.setIndex, + PoolIndex: er.poolIndex, }) } } @@ -2076,11 +2068,11 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string // Send the successful but partial upload/delete, however ignore // if the channel is blocked by other items. func (er erasureObjects) addPartial(bucket, object, versionID string) { - globalMRFState.addPartialOp(partialOperation{ - bucket: bucket, - object: object, - versionID: versionID, - queued: time.Now(), + globalMRFState.addPartialOp(PartialOperation{ + Bucket: bucket, + Object: object, + VersionID: versionID, + Queued: time.Now(), }) } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 4d230c96425e07..4d5cd2374c8f23 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -189,6 +189,7 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ }) bootstrapTrace("initHealMRF", func() { + go globalMRFState.start() go globalMRFState.healRoutine(z) }) @@ -294,11 +295,30 @@ func (z *erasureServerPools) GetRawData(ctx context.Context, volume, file string } // Return the disks belonging to the poolIdx, and setIdx. -func (z *erasureServerPools) GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) { - if poolIdx < len(z.serverPools) && setIdx < len(z.serverPools[poolIdx].sets) { - return z.serverPools[poolIdx].sets[setIdx].getDisks(), nil +func (z *erasureServerPools) GetDisks(poolIdx, setIdx int, local bool) (disks []StorageAPI, err error) { + if poolIdx >= 0 { + if poolIdx > len(z.serverPools) || setIdx > len(z.serverPools[poolIdx].sets) { + return nil, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1)) + } + disks = z.serverPools[poolIdx].sets[setIdx].getDisks() + } else { + // poolIdx < 0 means collect all disks in this cluster + for pool := range z.serverPools { + for set := range z.serverPools[pool].sets { + disks = append(disks, z.serverPools[pool].sets[set].getDisks()...) + } + } } - return nil, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1)) + + if local { + for d := range disks { + if disks[d] != nil && !disks[d].IsLocal() { + disks[d] = nil + } + } + } + + return disks, nil } // Return the count of disks in each pool diff --git a/cmd/globals.go b/cmd/globals.go index 10f0d5563f471b..32e4ba46b686a5 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -388,7 +388,7 @@ var ( globalBackgroundHealState = newHealState(GlobalContext, false) globalMRFState = mrfState{ - opCh: make(chan partialOperation, mrfOpsQueueSize), + opCh: make(chan PartialOperation, mrfOpsQueueSize), } // If writes to FS backend should be O_SYNC. diff --git a/cmd/mrf.go b/cmd/mrf.go index 12da25d50ef440..ad68f48dd528c1 100644 --- a/cmd/mrf.go +++ b/cmd/mrf.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2024 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -15,50 +15,201 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//go:generate msgp -file=$GOFILE + package cmd import ( "context" + "encoding/binary" + "errors" + "fmt" + "io" + "sync" + "sync/atomic" "time" "github.com/minio/madmin-go/v3" "github.com/minio/pkg/v2/wildcard" + "github.com/tinylib/msgp/msgp" ) const ( mrfOpsQueueSize = 100000 ) -// partialOperation is a successful upload/delete of an object +const ( + healDir = ".heal" + healMRFDir = bucketMetaPrefix + SlashSeparator + healDir + SlashSeparator + "mrf" + healMRFMetaFormat = 1 + healMRFMetaVersionV1 = 1 +) + +// PartialOperation is a successful upload/delete of an object // but not written in all disks (having quorum) -type partialOperation struct { - bucket string - object string - versionID string - allVersions bool - setIndex, poolIndex int - queued time.Time - scanMode madmin.HealScanMode +type PartialOperation struct { + Bucket string + Object string + VersionID string + AllVersions bool + SetIndex, PoolIndex int + Queued time.Time + BitrotScan bool } // mrfState sncapsulates all the information // related to the global background MRF. type mrfState struct { - opCh chan partialOperation + opCh chan PartialOperation + closed int32 + wg sync.WaitGroup } // Add a partial S3 operation (put/delete) when one or more disks are offline. -func (m *mrfState) addPartialOp(op partialOperation) { +func (m *mrfState) addPartialOp(op PartialOperation) { if m == nil { return } + m.wg.Add(1) + defer m.wg.Done() + + if atomic.LoadInt32(&m.closed) == 1 { + return + } + select { case m.opCh <- op: default: } } +// Do not accept new MRF operations anymore and start to save +// the current heal status in one available disk +func (m *mrfState) shutdown() { + atomic.StoreInt32(&m.closed, 1) + m.wg.Wait() + close(m.opCh) + + if len(m.opCh) > 0 { + healingLogEvent(context.Background(), "Saving MRF healing data (%d entries)", len(m.opCh)) + } + + newReader := func() io.ReadCloser { + r, w := io.Pipe() + go func() { + // Initialize MRF meta header. + var data [4]byte + binary.LittleEndian.PutUint16(data[0:2], healMRFMetaFormat) + binary.LittleEndian.PutUint16(data[2:4], healMRFMetaVersionV1) + mw := msgp.NewWriter(w) + n, err := mw.Write(data[:]) + if err != nil { + w.CloseWithError(err) + return + } + if n != len(data) { + w.CloseWithError(io.ErrShortWrite) + return + } + for item := range m.opCh { + err = item.EncodeMsg(mw) + if err != nil { + break + } + } + mw.Flush() + w.CloseWithError(err) + }() + return r + } + + globalLocalDrivesMu.RLock() + localDrives := cloneDrives(globalLocalDrives) + globalLocalDrivesMu.RUnlock() + + for _, localDrive := range localDrives { + r := newReader() + err := localDrive.CreateFile(context.Background(), "", minioMetaBucket, pathJoin(healMRFDir, "list.bin"), -1, r) + r.Close() + if err == nil { + break + } + } +} + +func (m *mrfState) start() { + loadMRF := func(rc io.ReadCloser, opCh chan PartialOperation) error { + defer rc.Close() + var data [4]byte + n, err := rc.Read(data[:]) + if err != nil { + return err + } + if n != len(data) { + return errors.New("heal mrf: no data") + } + // Read resync meta header + switch binary.LittleEndian.Uint16(data[0:2]) { + case healMRFMetaFormat: + default: + return fmt.Errorf("heal mrf: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) + } + switch binary.LittleEndian.Uint16(data[2:4]) { + case healMRFMetaVersionV1: + default: + return fmt.Errorf("heal mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) + } + + mr := msgp.NewReader(rc) + for { + op := PartialOperation{} + err = op.DecodeMsg(mr) + if err != nil { + break + } + opCh <- op + } + + return nil + } + + var localDrives []StorageAPI + + for { + obj := newObjectLayerFn() + if obj != nil { + d, err := obj.GetDisks(-1, -1, true) + if err != nil { + healingLogIf(context.Background(), err) + return + } + localDrives = d + break + } + time.Sleep(5 * time.Second) + } + + for _, localDrive := range localDrives { + if localDrive == nil { + continue + } + rc, err := localDrive.ReadFileStream(context.Background(), minioMetaBucket, pathJoin(healMRFDir, "list.bin"), 0, -1) + if err != nil { + continue + } + err = loadMRF(rc, m.opCh) + if err != nil { + continue + } + // finally delete the file after processing mrf entries + localDrive.Delete(GlobalContext, minioMetaBucket, pathJoin(healMRFDir, "list.bin"), DeleteOptions{}) + break + } + + return +} + var healSleeper = newDynamicSleeper(5, time.Second, false) // healRoutine listens to new disks reconnection events and @@ -77,24 +228,24 @@ func (m *mrfState) healRoutine(z *erasureServerPools) { // We might land at .metacache, .trash, .multipart // no need to heal them skip, only when bucket // is '.minio.sys' - if u.bucket == minioMetaBucket { + if u.Bucket == minioMetaBucket { // No MRF needed for temporary objects - if wildcard.Match("buckets/*/.metacache/*", u.object) { + if wildcard.Match("buckets/*/.metacache/*", u.Object) { continue } - if wildcard.Match("tmp/*", u.object) { + if wildcard.Match("tmp/*", u.Object) { continue } - if wildcard.Match("multipart/*", u.object) { + if wildcard.Match("multipart/*", u.Object) { continue } - if wildcard.Match("tmp-old/*", u.object) { + if wildcard.Match("tmp-old/*", u.Object) { continue } } now := time.Now() - if now.Sub(u.queued) < time.Second { + if now.Sub(u.Queued) < time.Second { // let recently failed networks to reconnect // making MRF wait for 1s before retrying, // i.e 4 reconnect attempts. @@ -105,16 +256,17 @@ func (m *mrfState) healRoutine(z *erasureServerPools) { wait := healSleeper.Timer(context.Background()) scan := madmin.HealNormalScan - if u.scanMode != 0 { - scan = u.scanMode + if u.BitrotScan { + scan = madmin.HealDeepScan } - if u.object == "" { - healBucket(u.bucket, scan) + + if u.Object == "" { + healBucket(u.Bucket, scan) } else { - if u.allVersions { - z.serverPools[u.poolIndex].sets[u.setIndex].listAndHeal(u.bucket, u.object, u.scanMode, healObjectVersionsDisparity) + if u.AllVersions { + z.serverPools[u.PoolIndex].sets[u.SetIndex].listAndHeal(u.Bucket, u.Object, scan, healObjectVersionsDisparity) } else { - healObject(u.bucket, u.object, u.versionID, scan) + healObject(u.Bucket, u.Object, u.VersionID, scan) } } diff --git a/cmd/mrf_gen.go b/cmd/mrf_gen.go new file mode 100644 index 00000000000000..5f29b76f8bf5b9 --- /dev/null +++ b/cmd/mrf_gen.go @@ -0,0 +1,285 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *PartialOperation) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Object": + z.Object, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + case "VersionID": + z.VersionID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + case "AllVersions": + z.AllVersions, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "AllVersions") + return + } + case "SetIndex": + z.SetIndex, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "SetIndex") + return + } + case "PoolIndex": + z.PoolIndex, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "PoolIndex") + return + } + case "Queued": + z.Queued, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "Queued") + return + } + case "BitrotScan": + z.BitrotScan, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "BitrotScan") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *PartialOperation) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 8 + // write "Bucket" + err = en.Append(0x88, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "Object" + err = en.Append(0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Object) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + // write "VersionID" + err = en.Append(0xa9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x44) + if err != nil { + return + } + err = en.WriteString(z.VersionID) + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + // write "AllVersions" + err = en.Append(0xab, 0x41, 0x6c, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73) + if err != nil { + return + } + err = en.WriteBool(z.AllVersions) + if err != nil { + err = msgp.WrapError(err, "AllVersions") + return + } + // write "SetIndex" + err = en.Append(0xa8, 0x53, 0x65, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78) + if err != nil { + return + } + err = en.WriteInt(z.SetIndex) + if err != nil { + err = msgp.WrapError(err, "SetIndex") + return + } + // write "PoolIndex" + err = en.Append(0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78) + if err != nil { + return + } + err = en.WriteInt(z.PoolIndex) + if err != nil { + err = msgp.WrapError(err, "PoolIndex") + return + } + // write "Queued" + err = en.Append(0xa6, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteTime(z.Queued) + if err != nil { + err = msgp.WrapError(err, "Queued") + return + } + // write "BitrotScan" + err = en.Append(0xaa, 0x42, 0x69, 0x74, 0x72, 0x6f, 0x74, 0x53, 0x63, 0x61, 0x6e) + if err != nil { + return + } + err = en.WriteBool(z.BitrotScan) + if err != nil { + err = msgp.WrapError(err, "BitrotScan") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *PartialOperation) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 8 + // string "Bucket" + o = append(o, 0x88, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "Object" + o = append(o, 0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74) + o = msgp.AppendString(o, z.Object) + // string "VersionID" + o = append(o, 0xa9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x44) + o = msgp.AppendString(o, z.VersionID) + // string "AllVersions" + o = append(o, 0xab, 0x41, 0x6c, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73) + o = msgp.AppendBool(o, z.AllVersions) + // string "SetIndex" + o = append(o, 0xa8, 0x53, 0x65, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78) + o = msgp.AppendInt(o, z.SetIndex) + // string "PoolIndex" + o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78) + o = msgp.AppendInt(o, z.PoolIndex) + // string "Queued" + o = append(o, 0xa6, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64) + o = msgp.AppendTime(o, z.Queued) + // string "BitrotScan" + o = append(o, 0xaa, 0x42, 0x69, 0x74, 0x72, 0x6f, 0x74, 0x53, 0x63, 0x61, 0x6e) + o = msgp.AppendBool(o, z.BitrotScan) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *PartialOperation) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Object": + z.Object, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + case "VersionID": + z.VersionID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + case "AllVersions": + z.AllVersions, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "AllVersions") + return + } + case "SetIndex": + z.SetIndex, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "SetIndex") + return + } + case "PoolIndex": + z.PoolIndex, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "PoolIndex") + return + } + case "Queued": + z.Queued, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Queued") + return + } + case "BitrotScan": + z.BitrotScan, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "BitrotScan") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *PartialOperation) Msgsize() (s int) { + s = 1 + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 10 + msgp.StringPrefixSize + len(z.VersionID) + 12 + msgp.BoolSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 7 + msgp.TimeSize + 11 + msgp.BoolSize + return +} diff --git a/cmd/mrf_gen_test.go b/cmd/mrf_gen_test.go new file mode 100644 index 00000000000000..49ac17340940d1 --- /dev/null +++ b/cmd/mrf_gen_test.go @@ -0,0 +1,123 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalPartialOperation(t *testing.T) { + v := PartialOperation{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgPartialOperation(b *testing.B) { + v := PartialOperation{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgPartialOperation(b *testing.B) { + v := PartialOperation{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalPartialOperation(b *testing.B) { + v := PartialOperation{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodePartialOperation(t *testing.T) { + v := PartialOperation{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodePartialOperation Msgsize() is inaccurate") + } + + vn := PartialOperation{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodePartialOperation(b *testing.B) { + v := PartialOperation{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodePartialOperation(b *testing.B) { + v := PartialOperation{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 0f24c98336a5db..b53395b44b703e 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -286,8 +286,8 @@ type ObjectLayer interface { AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) - GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) // return the disks belonging to pool and set. - SetDriveCounts() []int // list of erasure stripe size for each pool in order. + GetDisks(poolIdx, setIdx int, local bool) ([]StorageAPI, error) // return the disks belonging to pool and set. + SetDriveCounts() []int // list of erasure stripe size for each pool in order. // Healing operations. HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go index aadbf2dce4fd46..622a3cc3be7744 100644 --- a/cmd/peer-s3-client.go +++ b/cmd/peer-s3-client.go @@ -254,9 +254,9 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu for bktName, count := range bucketsMap { if count < quorum { // Queue a bucket heal task - globalMRFState.addPartialOp(partialOperation{ - bucket: bktName, - queued: time.Now(), + globalMRFState.addPartialOp(PartialOperation{ + Bucket: bktName, + Queued: time.Now(), }) } } diff --git a/cmd/signals.go b/cmd/signals.go index 097242c7e46f22..20a8650030db85 100644 --- a/cmd/signals.go +++ b/cmd/signals.go @@ -46,6 +46,9 @@ func handleSignals() { } stopProcess := func() bool { + globalMRFState.shutdown() // this can take time sometimes, it needs to be executed + // before stopping s3 operations + // send signal to various go-routines that they need to quit. cancelGlobalContext() diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index f38128cd9b675c..e4fd2a2f337681 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -566,7 +566,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates poolIdx, setIdx, _ := s.GetDiskLoc() - disks, err := objAPI.GetDisks(poolIdx, setIdx) + disks, err := objAPI.GetDisks(poolIdx, setIdx, false) if err != nil { return cache, err }