[go: nahoru, domu]

Skip to content

Commit

Permalink
Use new batch status api /status-job (#4930)
Browse files Browse the repository at this point in the history
Signed-off-by: Shubhendu Ram Tripathi <shubhendu@minio.io>
  • Loading branch information
shtripat committed Jul 2, 2024
1 parent e0d2e95 commit bbdd963
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 62 deletions.
149 changes: 89 additions & 60 deletions cmd/batch-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/charmbracelet/lipgloss"
"github.com/dustin/go-humanize"
"github.com/minio/cli"
json "github.com/minio/colorjson"
"github.com/minio/madmin-go/v3"
"github.com/minio/mc/pkg/probe"
"github.com/minio/pkg/v3/console"
Expand Down Expand Up @@ -41,6 +42,24 @@ EXAMPLES:
`,
}

// batchJobStatusMessage container for batch job status messages
type batchJobStatusMessage struct {
Status string `json:"status"`
Metric madmin.JobMetric `json:"metric"`
}

// JSON jsonified batchJobStatusMessage message
func (c batchJobStatusMessage) JSON() string {
batchJobStatusMessageBytes, e := json.MarshalIndent(c, "", " ")
fatalIf(probe.NewError(e), "Unable to marshal into JSON.")

return string(batchJobStatusMessageBytes)
}

func (c batchJobStatusMessage) String() string {
return c.JSON()
}

// checkBatchStatusSyntax - validate all the passed arguments
func checkBatchStatusSyntax(ctx *cli.Context) {
if len(ctx.Args()) != 2 {
Expand Down Expand Up @@ -72,38 +91,60 @@ func mainBatchStatus(ctx *cli.Context) error {
fatalIf(probe.NewError(e), "Unable to lookup job status")

ui := tea.NewProgram(initBatchJobMetricsUI(jobID))
go func() {
opts := madmin.MetricsOptions{
Type: madmin.MetricsBatchJobs,
ByJobID: jobID,
Interval: time.Second,
}
e := client.Metrics(ctxt, opts, func(metrics madmin.RealtimeMetrics) {
if nosuchJob {
go func() {
res, e := client.BatchJobStatus(ctxt, jobID)
fatalIf(probe.NewError(e), "Unable to lookup job status")
if globalJSON {
if metrics.Aggregated.BatchJobs == nil {
cancel()
return
}

job, ok := metrics.Aggregated.BatchJobs.Jobs[jobID]
if !ok {
cancel()
return
}

printMsg(metricsMessage{RealtimeMetrics: metrics})
if job.Complete || job.Failed {
printMsg(batchJobStatusMessage{
Status: "success",
Metric: res.LastMetric,
})
if res.LastMetric.Complete || res.LastMetric.Failed {
cancel()
return
}
} else {
ui.Send(metrics)
ui.Send(res.LastMetric)
}
})
if e != nil && !errors.Is(e, context.Canceled) {
fatalIf(probe.NewError(e).Trace(ctx.Args()...), "Unable to get current batch status")
}
}()
}()
} else {
go func() {
opts := madmin.MetricsOptions{
Type: madmin.MetricsBatchJobs,
ByJobID: jobID,
Interval: time.Second,
}
e := client.Metrics(ctxt, opts, func(metrics madmin.RealtimeMetrics) {
if globalJSON {
if metrics.Aggregated.BatchJobs == nil {
cancel()
return
}

job, ok := metrics.Aggregated.BatchJobs.Jobs[jobID]
if !ok {
cancel()
return
}

printMsg(batchJobStatusMessage{
Status: "in-progress",
Metric: job,
})
if job.Complete || job.Failed {
cancel()
return
}
} else {
ui.Send(metrics.Aggregated.BatchJobs.Jobs[jobID])
}
})
if e != nil && !errors.Is(e, context.Canceled) {
fatalIf(probe.NewError(e).Trace(ctx.Args()...), "Unable to get current batch status")
}
}()
}

if !globalJSON {
if _, e := ui.Run(); e != nil {
Expand All @@ -128,7 +169,7 @@ func initBatchJobMetricsUI(jobID string) *batchJobMetricsUI {
}

type batchJobMetricsUI struct {
current madmin.JobMetric
metric madmin.JobMetric
spinner spinner.Model
quitting bool
jobID string
Expand All @@ -148,21 +189,9 @@ func (m *batchJobMetricsUI) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
default:
return m, nil
}
case madmin.RealtimeMetrics:
metrics := msg
if metrics.Aggregated.BatchJobs == nil {
m.quitting = true
return m, tea.Quit
}

job, ok := metrics.Aggregated.BatchJobs.Jobs[m.jobID]
if !ok {
m.quitting = true
return m, tea.Quit
}

m.current = job
if job.Complete || job.Failed {
case madmin.JobMetric:
m.metric = msg
if msg.Complete || msg.Failed {
m.quitting = true
return m, tea.Quit
}
Expand Down Expand Up @@ -204,39 +233,39 @@ func (m *batchJobMetricsUI) View() string {
if !m.quitting {
s.WriteString(m.spinner.View())
} else {
if m.current.Complete {
if m.metric.Complete {
s.WriteString(m.spinner.Style.Render((tickCell + tickCell + tickCell)))
} else if m.current.Failed {
} else if m.metric.Failed {
s.WriteString(m.spinner.Style.Render((crossTickCell + crossTickCell + crossTickCell)))
}
}
s.WriteString("\n")

switch m.current.JobType {
switch m.metric.JobType {
case string(madmin.BatchJobReplicate):
accElapsedTime := m.current.LastUpdate.Sub(m.current.StartTime)
accElapsedTime := m.metric.LastUpdate.Sub(m.metric.StartTime)

addLine("JobType: ", m.current.JobType)
addLine("Objects: ", m.current.Replicate.Objects)
addLine("Versions: ", m.current.Replicate.Objects)
addLine("FailedObjects: ", m.current.Replicate.ObjectsFailed)
addLine("JobType: ", m.metric.JobType)
addLine("Objects: ", m.metric.Replicate.Objects)
addLine("Versions: ", m.metric.Replicate.Objects)
addLine("FailedObjects: ", m.metric.Replicate.ObjectsFailed)
if accElapsedTime > 0 {
bytesTransferredPerSec := float64(m.current.Replicate.BytesTransferred) / accElapsedTime.Seconds()
objectsPerSec := float64(int64(time.Second)*m.current.Replicate.Objects) / float64(accElapsedTime)
bytesTransferredPerSec := float64(m.metric.Replicate.BytesTransferred) / accElapsedTime.Seconds()
objectsPerSec := float64(int64(time.Second)*m.metric.Replicate.Objects) / float64(accElapsedTime)
addLine("Throughput: ", fmt.Sprintf("%s/s", humanize.IBytes(uint64(bytesTransferredPerSec))))
addLine("IOPs: ", fmt.Sprintf("%.2f objs/s", objectsPerSec))
}
addLine("Transferred: ", humanize.IBytes(uint64(m.current.Replicate.BytesTransferred)))
addLine("Transferred: ", humanize.IBytes(uint64(m.metric.Replicate.BytesTransferred)))
addLine("Elapsed: ", accElapsedTime.String())
addLine("CurrObjName: ", m.current.Replicate.Object)
addLine("CurrObjName: ", m.metric.Replicate.Object)
case string(madmin.BatchJobExpire):
addLine("JobType: ", m.current.JobType)
addLine("Objects: ", m.current.Expired.Objects)
addLine("FailedObjects: ", m.current.Expired.ObjectsFailed)
addLine("CurrObjName: ", m.current.Expired.Object)
addLine("JobType: ", m.metric.JobType)
addLine("Objects: ", m.metric.Expired.Objects)
addLine("FailedObjects: ", m.metric.Expired.ObjectsFailed)
addLine("CurrObjName: ", m.metric.Expired.Object)

if !m.current.LastUpdate.IsZero() {
accElapsedTime := m.current.LastUpdate.Sub(m.current.StartTime)
if !m.metric.LastUpdate.IsZero() {
accElapsedTime := m.metric.LastUpdate.Sub(m.metric.StartTime)
addLine("Elapsed: ", accElapsedTime.String())
}

Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ github.com/minio/colorjson v1.0.7 h1:n69M42mIuQHdzbsxlmwji1zxDypaw4o39rHjAmX4Dh4
github.com/minio/colorjson v1.0.7/go.mod h1:9LGM5yybI+GuhSbuzAerbSgvFb4j8ux9NzyONR+NrAY=
github.com/minio/filepath v1.0.0 h1:fvkJu1+6X+ECRA6G3+JJETj4QeAYO9sV43I79H8ubDY=
github.com/minio/filepath v1.0.0/go.mod h1:/nRZA2ldl5z6jT9/KQuvZcQlxZIMQoFFQPvEXx9T/Bw=
github.com/minio/madmin-go/v3 v3.0.55-0.20240603092915-420a67132c32 h1:9se7/S4AlN2k/B1E7A8m1m07DM3p0JnIOzVhDuAV2PI=
github.com/minio/madmin-go/v3 v3.0.55-0.20240603092915-420a67132c32/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw=
github.com/minio/madmin-go/v3 v3.0.58-0.20240701162942-671010069ecb h1:6Hx1+R0GR79Vt4gOKgadH4OG8tkrq/UNyxfmR1C7C14=
github.com/minio/madmin-go/v3 v3.0.58-0.20240701162942-671010069ecb/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
Expand Down

0 comments on commit bbdd963

Please sign in to comment.