[go: nahoru, domu]

Skip to content

Commit

Permalink
Allow config list of notifies in workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
izhangzhihao committed Jul 20, 2022
1 parent 2afc01e commit f0a9506
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ class NotificationUtil(val jobLogAccessor: JobLogAccessor) {
def notify(jobResults: Seq[WFInterpretingResult]): Unit = {
val configToLogs: Seq[(NotifyConfig, Seq[JobLog])] =
jobResults
.filterNot(it => it.workflow.notifys == null || it.workflow.notifys.isEmpty)
.filterNot(it => it.workflow.notifies == null || it.workflow.notifies.isEmpty)
.flatMap(it =>
it.workflow.notifys
it.workflow.notifies
.flatMap(_.toConfigs())
.map(n => (n, it.jobLogs.map(_.get)))
.map { case (config, logs) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package com.github.sharpdata.sharpetl.core.syntax

import com.github.sharpdata.sharpetl.core.annotation.Annotations._
import com.github.sharpdata.sharpetl.core.notification.NotifyConfig
import com.github.sharpdata.sharpetl.core.util.Constants.Separator.ENTER
import com.github.sharpdata.sharpetl.core.util.StringUtil
import com.google.common.base.Strings.isNullOrEmpty

@Evolving(since = "1.0.0")
final case class Notify(notifyType: String, recipients: String, notifyCondition: String) {
Expand All @@ -16,69 +13,3 @@ final case class Notify(notifyType: String, recipients: String, notifyCondition:
}
}

@Evolving(since = "1.0.0")
final case class Workflow(
name: String,
period: String,
loadType: String,
logDrivenType: String,
upstream: String,
dependsOn: String,
comment: String,
timeout: Int,
defaultStart: String,
stopScheduleWhenFail: Boolean,
notification: Notify,
options: Map[String, String],
var steps: List[WorkflowStep]
) extends Formatable {
def getProjectName(): String = Option(options).map(_.getOrElse("projectName", "default")).getOrElse("default")

@deprecated
def notifys(): Seq[Notify] = if (notification == null) Seq() else Seq(notification)

// scalastyle:off
override def toString: String = {
val builder = new StringBuilder()
builder.append(headerStr)
builder.append(steps.mkString("\n"))
builder.toString()
}

def headerStr: String = {
val builder = new StringBuilder()
builder.append(s"-- workflow=$name$ENTER")
if (!StringUtil.isNullOrEmpty(period)) builder.append(s"-- period=$period$ENTER")
if (!StringUtil.isNullOrEmpty(loadType)) builder.append(s"-- loadType=$loadType$ENTER")
if (!StringUtil.isNullOrEmpty(logDrivenType)) builder.append(s"-- logDrivenType=$logDrivenType$ENTER")
if (!StringUtil.isNullOrEmpty(upstream)) builder.append(s"-- upstream=$upstream$ENTER")
if (!StringUtil.isNullOrEmpty(dependsOn)) builder.append(s"-- dependsOn=$dependsOn$ENTER")
if (!StringUtil.isNullOrEmpty(comment)) builder.append(s"-- comment=$comment$ENTER")
if (!StringUtil.isNullOrEmpty(defaultStart)) builder.append(s"-- defaultStart=$defaultStart$ENTER")
if (timeout > 1) builder.append(s"-- timeout=$timeout$ENTER")
if (stopScheduleWhenFail) builder.append(s"-- stopScheduleWhenFail=$stopScheduleWhenFail$ENTER")
//TODO: fix this later
if (notification != null && !isNullOrEmpty(notification.notifyType)) {
builder.append(s"-- notification$ENTER")
builder.append(s"-- notifyType=${notification.notifyType}$ENTER")
builder.append(s"-- recipients=${notification.recipients}$ENTER")
builder.append(s"-- notifyCondition=${notification.notifyCondition}$ENTER")
}
builder.append(optionsToString)
builder.append("\n")
builder.toString()
}

def optionsToString: String = {
if (options != null && options.nonEmpty) {
val builder = new StringBuilder()
builder.append(s"-- options$ENTER")
options.foreach { case (key, value) => builder.append(s"-- $key=$value$ENTER") }
builder.toString()
} else {
""
}
}

// scalastyle:on
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.github.sharpdata.sharpetl.core.syntax

import com.github.sharpdata.sharpetl.core.annotation.Annotations.Evolving
import com.github.sharpdata.sharpetl.core.util.Constants.Separator.ENTER
import com.github.sharpdata.sharpetl.core.util.StringUtil

@Evolving(since = "1.0.0")
final case class Workflow(
name: String,
period: String,
loadType: String,
logDrivenType: String,
upstream: String,
dependsOn: String,
comment: String,
timeout: Int,
defaultStart: String,
stopScheduleWhenFail: Boolean,
notifies: Seq[Notify],
options: Map[String, String],
var steps: List[WorkflowStep]
) extends Formatable {
def getProjectName(): String = Option(options).map(_.getOrElse("projectName", "default")).getOrElse("default")

// scalastyle:off
override def toString: String = {
val builder = new StringBuilder()
builder.append(headerStr)
builder.append(steps.mkString("\n"))
builder.toString()
}

def headerStr: String = {
val builder = new StringBuilder()
builder.append(s"-- workflow=$name$ENTER")
if (!StringUtil.isNullOrEmpty(period)) builder.append(s"-- period=$period$ENTER")
if (!StringUtil.isNullOrEmpty(loadType)) builder.append(s"-- loadType=$loadType$ENTER")
if (!StringUtil.isNullOrEmpty(logDrivenType)) builder.append(s"-- logDrivenType=$logDrivenType$ENTER")
if (!StringUtil.isNullOrEmpty(upstream)) builder.append(s"-- upstream=$upstream$ENTER")
if (!StringUtil.isNullOrEmpty(dependsOn)) builder.append(s"-- dependsOn=$dependsOn$ENTER")
if (!StringUtil.isNullOrEmpty(comment)) builder.append(s"-- comment=$comment$ENTER")
if (!StringUtil.isNullOrEmpty(defaultStart)) builder.append(s"-- defaultStart=$defaultStart$ENTER")
if (timeout > 1) builder.append(s"-- timeout=$timeout$ENTER")
if (stopScheduleWhenFail) builder.append(s"-- stopScheduleWhenFail=$stopScheduleWhenFail$ENTER")
if (notifies != null && notifies.nonEmpty) {
notifies.foreach { notify =>
builder.append(s"-- notify$ENTER")
builder.append(s"-- notifyType=${notify.notifyType}$ENTER")
builder.append(s"-- recipients=${notify.recipients}$ENTER")
builder.append(s"-- notifyCondition=${notify.notifyCondition}$ENTER")
}
}
builder.append(optionsToString)
builder.append("\n")
builder.toString()
}

def optionsToString: String = {
if (options != null && options.nonEmpty) {
val builder = new StringBuilder()
builder.append(s"-- options$ENTER")
options.foreach { case (key, value) => builder.append(s"-- $key=$value$ENTER") }
builder.toString()
} else {
""
}
}

// scalastyle:on
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.github.sharpdata.sharpetl.core.datasource.config.{DataSourceConfig, TransformationDataSourceConfig}
import com.github.sharpdata.sharpetl.core.annotation.AnnotationScanner.{configRegister, defaultConfigType}
import com.github.sharpdata.sharpetl.core.annotation.Annotations.Experimental
import com.github.sharpdata.sharpetl.core.exception.Exception.WorkFlowSyntaxException
import com.github.sharpdata.sharpetl.core.syntax.ParserUtils.{Until, objectMapper, trimSql}


Expand Down Expand Up @@ -48,18 +49,19 @@ object WorkflowParser {

def sql[_: P]: P[String] = Until(stepHeader | End)

def nestedObj[_: P](indent: Int): P[(String, Map[String, String])] = P(
comment(indent) ~ !P("step=" | "source=" | "target=") ~ P(key.rep(1).!) ~ newlines
~ keyValPairs(indent + 1)
).map {
case (obj, kv) => (obj, kv.toMap)
}

def options[_: P](indent: Int): P[Map[String, String]] = P(
comment(indent) ~ !P("step=" | "source=" | "target=") ~ P("options") ~ newlines
def nestedObj[_: P](objName: String, indent: Int): P[Map[String, String]] = P(
comment(indent) ~ P(objName) ~ newlines
~ keyValPairs(indent + 1)
).map(_.toMap)

def notifies[_: P](indent: Int): P[Seq[Map[String, String]]] = notify(indent).rep(sep = newlines)

def notify[_: P](indent: Int): P[Map[String, String]] = nestedObj("notify", indent)

def options[_: P](indent: Int): P[Map[String, String]] = nestedObj("options", indent)

def conf[_: P](indent: Int): P[Map[String, String]] = nestedObj("conf", indent)

def dataSource[_: P](`type`: String): P[DataSourceConfig] = P(
s"-- ${`type`}=" ~/ key.rep.! ~ newlines
~ keyValPairs(2) ~ newlines
Expand Down Expand Up @@ -98,11 +100,11 @@ object WorkflowParser {
~ P(transformer("source") | dataSource("source")) ~ newlines
~ P(transformer("target") | dataSource("target")) ~ newlines
~ keyValPairs(1).? ~ newlines
~ nestedObj(1).? ~ newlines
~ conf(1).? ~ newlines
~ sql
).map {
// scalastyle:off
case (step, source, target, kv, opts, sql) =>
case (step, source, target, kv, conf, sql) =>
val map = kv.getOrElse(Seq()).toMap
val workflowStep = new WorkflowStep
workflowStep.step = step
Expand All @@ -113,7 +115,7 @@ object WorkflowParser {
workflowStep.checkPoint = map.getOrElse("checkPoint", null)
workflowStep.writeMode = map.getOrElse("writeMode", null)
workflowStep.skipFollowStepWhenEmpty = map.getOrElse("skipFollowStepWhenEmpty", null) //TODO: drop this later
workflowStep.conf = opts.getOrElse(("", Map[String, String]()))._2
workflowStep.conf = conf.getOrElse(Map())
workflowStep
// WorkflowStep(step, source, target, sql.map(_.trim),
// map.getOrElse("persist", null), map.getOrElse("checkpoint", null),
Expand All @@ -128,16 +130,13 @@ object WorkflowParser {
Start
~ whitespace.rep
~ "-- workflow" ~/ "=" ~/ singleLineValue ~ newlines
~ keyValPairs(2) ~ newlines
~ nestedObj(2).rep(sep = newlines) ~ newlines
~ keyValPairs(2) ~/ newlines
~ options(2).? ~/ newlines
~ notifies(2).? ~/ newlines
~ steps
~ End
).map { case (name, kv, objs, steps) =>
val value = Map(
"name" -> name
) ++ kv.toMap ++ objs.map {
case (obj, kv) => obj -> kv
}
).map { case (name, kv, options, notifies, steps) =>
val value = kv.toMap + ("name" -> name) + ("options" -> options.getOrElse(Map())) + ("notifies" -> notifies.getOrElse(Seq()))
val json = objectMapper.writeValueAsString(value)
val wf = objectMapper.readValue(json, classOf[Workflow])
wf.steps = steps.toList
Expand Down Expand Up @@ -176,9 +175,17 @@ private object ParserUtils {
}
}

sealed trait WFParseResult
sealed trait WFParseResult {
def isSuccess: Boolean

def get: Workflow
}

case class WFParseSuccess(wf: Workflow) extends WFParseResult {
override def isSuccess: Boolean = true

case class WFParseSuccess(wf: Workflow) extends WFParseResult
override def get: Workflow = wf
}

case class WFParseFail(parsed: Parsed.Failure) extends WFParseResult {
override def toString: String = {
Expand Down Expand Up @@ -219,5 +226,9 @@ case class WFParseFail(parsed: Parsed.Failure) extends WFParseResult {
def formatTrailing(input: ParserInput, index: Int): String = {
fastparse.internal.Util.literalize(input.slice(index, index + 10))
}

override def isSuccess: Boolean = false

override def get: Workflow = throw WorkFlowSyntaxException(this.toString)
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.github.sharpdata.sharpetl.core.util.Constants.{BooleanString, WriteMo
import com.google.common.base.Strings.isNullOrEmpty
import com.github.sharpdata.sharpetl.core.annotation.Annotations.Evolving
import com.github.sharpdata.sharpetl.core.datasource.config.DataSourceConfig
import com.github.sharpdata.sharpetl.core.util.Constants.BooleanString
import com.github.sharpdata.sharpetl.core.util.Constants.Separator.ENTER
import com.github.sharpdata.sharpetl.core.util.StringUtil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ class NotificationUtilTest extends AnyFlatSpec with should.Matchers {
job2.setStepLogs(Array(mockStepLog(2, "1", JobStatus.SUCCESS), mockStepLog(2, "2", JobStatus.FAILURE)))

val wf1 = Workflow("job1", "1440", "full", "timewindow", null, null, null, -1, null, false,
Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.ALWAYS), Map(), List())
Seq(Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.ALWAYS)), Map(), List())

val wf2 = Workflow("job2", "1440", "full", "timewindow", null, null, null, -1, null, false,
Notify("email", "lisi@gmail.com", NotifyTriggerCondition.ALWAYS), Map(), List())
Seq(Notify("email", "lisi@gmail.com", NotifyTriggerCondition.ALWAYS)), Map(), List())

service.notify(Seq(
WFInterpretingResult(wf1, Seq(Failure(job1, new RuntimeException("???")))),
Expand All @@ -60,10 +60,10 @@ class NotificationUtilTest extends AnyFlatSpec with should.Matchers {
job2.setStepLogs(Array(mockStepLog(2, "1", JobStatus.SUCCESS), mockStepLog(2, "2", JobStatus.FAILURE)))

val wf1 = Workflow("job1", "1440", "full", "timewindow", null, null, null, -1, null, false,
Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.ALWAYS), Map(), List())
Seq(Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.ALWAYS)), Map(), List())

val wf2 = Workflow("job2", "1440", "full", "timewindow", null, null, null, -1, null, false,
Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.ALWAYS), Map(), List())
Seq(Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.ALWAYS)), Map(), List())

service.notify(Seq(
WFInterpretingResult(wf1, Seq(Failure(job1, new RuntimeException("???")))),
Expand All @@ -85,7 +85,7 @@ class NotificationUtilTest extends AnyFlatSpec with should.Matchers {
jobLog.setStepLogs(Array(mockStepLog(2, "1", JobStatus.FAILURE)))

val tempWf = Workflow("test", "1440", "full", "timewindow", null, null, null, -1, null, false,
Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.ALWAYS), Map(), List())
Seq(Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.ALWAYS)), Map(), List())

service.notify(Seq(WFInterpretingResult(tempWf, Seq(Success(jobLog)))))
verify(NotificationFactory, times(1)).sendNotification(any())
Expand All @@ -109,7 +109,7 @@ class NotificationUtilTest extends AnyFlatSpec with should.Matchers {
.thenReturn(previousJobLog)

val wf = Workflow("test", "1440", "full", "timewindow", null, null, null, -1, null, false,
Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.FAILURE), Map(), List())
Seq(Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.FAILURE)), Map(), List())

service.notify(Seq(WFInterpretingResult(wf, Seq(Success(jobLog)))))
verify(NotificationFactory, times(1)).sendNotification(any())
Expand All @@ -133,7 +133,7 @@ class NotificationUtilTest extends AnyFlatSpec with should.Matchers {
.thenReturn(previousJobLog)

val wf = Workflow("test", "1440", "full", "timewindow", null, null, null, -1, null, false,
Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.FAILURE), Map(), List())
Seq(Notify("email", "zhangsan@gmail.com", NotifyTriggerCondition.FAILURE)), Map(), List())

service.notify(Seq(WFInterpretingResult(wf, Seq(Success(jobLog)))))
verify(NotificationFactory, times(0)).sendNotification(any())
Expand Down
Loading

0 comments on commit f0a9506

Please sign in to comment.