[go: nahoru, domu]

Skip to content

Commit

Permalink
fix: store notification events immediately for persistent queues (min…
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveenrajmani authored May 2, 2023
1 parent ab34f00 commit 1704aba
Show file tree
Hide file tree
Showing 18 changed files with 49 additions and 71 deletions.
19 changes: 0 additions & 19 deletions cmd/event-notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package cmd

import (
"context"
"errors"
"fmt"
"net/url"
"strings"
Expand All @@ -40,7 +39,6 @@ type EventNotifier struct {
targetResCh chan event.TargetIDResult
bucketRulesMap map[string]event.RulesMap
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
eventsQueue chan eventArgs
}

// NewEventNotifier - creates new event notification object.
Expand All @@ -51,7 +49,6 @@ func NewEventNotifier() *EventNotifier {
targetResCh: make(chan event.TargetIDResult),
bucketRulesMap: make(map[string]event.RulesMap),
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
eventsQueue: make(chan eventArgs, 10000),
}
}

Expand Down Expand Up @@ -105,12 +102,6 @@ func (evnot *EventNotifier) InitBucketTargets(ctx context.Context, objAPI Object
return err
}

go func() {
for e := range evnot.eventsQueue {
evnot.send(e)
}
}()

go func() {
for res := range evnot.targetResCh {
if res.Err != nil {
Expand Down Expand Up @@ -210,16 +201,6 @@ func (evnot *EventNotifier) RemoveAllRemoteTargets() {

// Send - sends the event to all registered notification targets
func (evnot *EventNotifier) Send(args eventArgs) {
select {
case evnot.eventsQueue <- args:
default:
// A new goroutine is created for each notification job, eventsQueue is
// drained quickly and is not expected to be filled with any scenario.
logger.LogIf(context.Background(), errors.New("internal events queue unexpectedly full"))
}
}

func (evnot *EventNotifier) send(args eventArgs) {
evnot.RLock()
targetIDSet := evnot.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
evnot.RUnlock()
Expand Down
9 changes: 4 additions & 5 deletions internal/event/target/amqp.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2022 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down Expand Up @@ -274,13 +274,12 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp091.Channel, confi

// Save - saves the events to the store which will be replayed when the amqp connection is active.
func (target *AMQPTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}

if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
ch, confirms, err := target.channel()
if err != nil {
return err
Expand Down
9 changes: 4 additions & 5 deletions internal/event/target/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down Expand Up @@ -200,13 +200,12 @@ func (target *ElasticsearchTarget) isActive() (bool, error) {

// Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.
func (target *ElasticsearchTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}

if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
7 changes: 3 additions & 4 deletions internal/event/target/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,12 @@ func (target *KafkaTarget) isActive() (bool, error) {

// Save - saves the events to the store which will be replayed when the Kafka connection is active.
func (target *KafkaTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}

if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
_, err := target.isActive()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/event/target/kafka_scram_client_contrib.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* MinIO Object Storage (c) 2021 MinIO, Inc.
* MinIO Object Storage (c) 2021-2023 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion internal/event/target/lazyinit.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2022 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down
9 changes: 4 additions & 5 deletions internal/event/target/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down Expand Up @@ -200,13 +200,12 @@ func (target *MQTTTarget) Send(eventKey string) error {
// Save - saves the events to the store if queuestore is configured, which will
// be replayed when the mqtt connection is active.
func (target *MQTTTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}

if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}

// Do not send if the connection is not active.
_, err := target.isActive()
Expand Down
8 changes: 4 additions & 4 deletions internal/event/target/mysql.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down Expand Up @@ -196,13 +196,13 @@ func (target *MySQLTarget) isActive() (bool, error) {

// Save - saves the events to the store which will be replayed when the SQL connection is active.
func (target *MySQLTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}

if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.isActive()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/event/target/mysql_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down
9 changes: 5 additions & 4 deletions internal/event/target/nats.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down Expand Up @@ -289,13 +289,14 @@ func (target *NATSTarget) isActive() (bool, error) {

// Save - saves the events to the store which will be replayed when the Nats connection is active.
func (target *NATSTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}

if err := target.init(); err != nil {
return err
}

if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.isActive()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/event/target/nats_contrib_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* MinIO Object Storage (c) 2021 MinIO, Inc.
* MinIO Object Storage (c) 2021-2023 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion internal/event/target/nats_tls_contrib_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* MinIO Object Storage (c) 2021 MinIO, Inc.
* MinIO Object Storage (c) 2021-2023 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
9 changes: 5 additions & 4 deletions internal/event/target/nsq.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down Expand Up @@ -145,13 +145,14 @@ func (target *NSQTarget) isActive() (bool, error) {

// Save - saves the events to the store which will be replayed when the nsq connection is active.
func (target *NSQTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}

if err := target.init(); err != nil {
return err
}

if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.isActive()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/event/target/nsq_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down
9 changes: 5 additions & 4 deletions internal/event/target/postgresql.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down Expand Up @@ -188,13 +188,14 @@ func (target *PostgreSQLTarget) isActive() (bool, error) {

// Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active.
func (target *PostgreSQLTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}

if err := target.init(); err != nil {
return err
}

if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.isActive()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/event/target/postgresql_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down
9 changes: 4 additions & 5 deletions internal/event/target/redis.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down Expand Up @@ -168,13 +168,12 @@ func (target *RedisTarget) isActive() (bool, error) {

// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}

if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
_, err := target.isActive()
if err != nil {
return err
Expand Down
9 changes: 4 additions & 5 deletions internal/event/target/webhook.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand Down Expand Up @@ -161,13 +161,12 @@ func (target *WebhookTarget) isActive() (bool, error) {
// Save - saves the events to the store if queuestore is configured,
// which will be replayed when the webhook connection is active.
func (target *WebhookTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}

if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
err := target.send(eventData)
if err != nil {
if xnet.IsNetworkOrHostDown(err, false) {
Expand Down

0 comments on commit 1704aba

Please sign in to comment.