[go: nahoru, domu]

Skip to content

Commit

Permalink
Refactor to HttpDataSource & HttpFileDataSource
Browse files Browse the repository at this point in the history
  • Loading branch information
izhangzhihao committed Aug 6, 2022
1 parent 07233d9 commit 52de1df
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 364 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.github.sharpdata.sharpetl.core.datasource.config

import com.github.sharpdata.sharpetl.core.annotation.configFor

import scala.beans.BeanProperty

@configFor(types = Array("http"))
class HttpDataSourceConfig extends DataSourceConfig with Serializable {

@BeanProperty
var connectionName: String = _

@BeanProperty
var url: String = _

@BeanProperty
var httpMethod: String = "GET"

@BeanProperty
var timeout: String = _

@BeanProperty
var requestBody: String = _

@BeanProperty
var fieldName: String = "value"

@BeanProperty
var jsonPath: String = "$"

@BeanProperty
var splitBy: String = ""
}

@configFor(types = Array("http_file"))
class HttpFileDataSourceConfig extends HttpDataSourceConfig {

@BeanProperty
var tempDestinationDir = "/tmp"

@BeanProperty
var hdfsDir = "/tmp"
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package com.github.sharpdata.sharpetl.spark.transformation
package com.github.sharpdata.sharpetl.spark.datasource

import com.fasterxml.jackson.databind.ObjectMapper
import com.github.sharpdata.sharpetl.core.annotation.source
import com.github.sharpdata.sharpetl.core.api.Variables
import com.github.sharpdata.sharpetl.core.datasource.Source
import com.github.sharpdata.sharpetl.core.datasource.config.HttpDataSourceConfig
import com.github.sharpdata.sharpetl.core.repository.model.JobLog
import com.github.sharpdata.sharpetl.core.syntax.WorkflowStep
import com.github.sharpdata.sharpetl.core.util.{ETLConfig, ETLLogger}
import com.github.sharpdata.sharpetl.spark.utils.{ETLSparkSession, HttpStatusUtils}
import com.google.common.base.Strings.isNullOrEmpty
import com.jayway.jsonpath.JsonPath
import com.github.sharpdata.sharpetl.core.util.{ETLConfig, ETLLogger}
import net.minidev.json.JSONArray
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
Expand All @@ -12,26 +19,27 @@ import org.apache.http.entity.StringEntity
import org.apache.http.impl.client._
import org.apache.http.util.EntityUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import java.net.URLEncoder

import scala.util.Using

object HttpTransformer extends Transformer {
@source(types = Array("http"))
class HttpDataSource extends Source[DataFrame, SparkSession] {

var httpClient: CloseableHttpClient = _

lazy val mapper = new ObjectMapper

override def transform(args: Map[String, String]): DataFrame = {
val responseBody = getHttpResponseBody(args)
transformerDF(responseBody, args)
def read(step: WorkflowStep, jobLog: JobLog, executionContext: SparkSession, variables: Variables): DataFrame = {
val config = step.getSourceConfig[HttpDataSourceConfig]
val responseBody = getHttpResponseBody(config)
transformerDF(responseBody, config)
}

private def getHttpResponseBody(args: Map[String, String]): String = {
private def getHttpResponseBody(config: HttpDataSourceConfig): String = {

val httpProperties = HttpProperties.initHttpProperties(args)
val httpProperties = HttpProperties.initHttpProperties(config)
val httpRequest = httpProperties.initRequest()
ETLLogger.info(s"url: ${httpRequest.getURI.toString}")
if (httpClient == null) {
Expand All @@ -53,10 +61,10 @@ object HttpTransformer extends Transformer {
}.get
}

private def transformerDF(value: String, args: Map[String, String]): DataFrame = {
val fieldName = args.getOrElse("fieldName", "value")
val jsonPath = args.getOrElse("jsonPath", "$")
val splitBy = args.getOrElse("splitBy", "")
private def transformerDF(value: String, config: HttpDataSourceConfig): DataFrame = {
val fieldName = config.fieldName
val jsonPath = config.jsonPath
val splitBy = config.splitBy

var result = value
if (jsonPath != "$" || splitBy != "") {
Expand Down Expand Up @@ -112,12 +120,12 @@ object HttpProperties {
s"""${URLEncoder.encode(keyAndValue(0), "UTF-8")}=${URLEncoder.encode(keyAndValue(1), "UTF-8")}"""
}

def initHttpProperties(args: Map[String, String]): HttpProperties = {
val url = getEncodeUrl(args("url"))
val httpMethod = args.getOrElse("httpMethod", "GET")
val connectionName = args.get("connectionName")
if (connectionName.nonEmpty) {
val httpConnectionProperties = ETLConfig.getHttpProperties(connectionName.get)
def initHttpProperties(config: HttpDataSourceConfig): HttpProperties = {
val url = getEncodeUrl(config.url)
val httpMethod = config.httpMethod
val connectionName = config.connectionName
if (!isNullOrEmpty(connectionName)) {
val httpConnectionProperties = ETLConfig.getHttpProperties(connectionName)
val headers = httpConnectionProperties
.filter(_._1.startsWith("header."))
.map { case (key, value) => key.substring("header.".length, key.length) -> value }
Expand All @@ -131,22 +139,22 @@ object HttpProperties {
}
val proxyPort = proxyProperties.getOrElse("port", "8080").toInt
val proxy = new HttpHost(proxyHost.get, proxyPort)
new HttpProperties(url, httpMethod, headers, Option(proxy), args)
new HttpProperties(url, httpMethod, headers, Option(proxy), config)
} else {
new HttpProperties(url, httpMethod, Map.empty, Option.empty, args)
new HttpProperties(url, httpMethod, Map.empty, Option.empty, config)
}
}
}

class HttpProperties(url: String, httpMethod: String, headers: Map[String, String], proxy: Option[HttpHost], optionalArgs: Map[String, String]) {
class HttpProperties(url: String, httpMethod: String, headers: Map[String, String], proxy: Option[HttpHost], config: HttpDataSourceConfig) {

def initRequest(): HttpRequestBase = {
val httpRequest = httpMethod.toUpperCase match {
case "GET" => new HttpGet(url)
case _ =>
val httpPost = new HttpPost(url)
if (optionalArgs.contains("requestBody")) {
val requestBody = optionalArgs.getOrElse("requestBody", "")
if (!isNullOrEmpty(config.requestBody)) {
val requestBody = config.requestBody
ETLLogger.info(s"request the $url with the body $requestBody")
httpPost.setEntity(new StringEntity(requestBody))
httpPost.addHeader("Content-type", "application/json")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,52 @@
package com.github.sharpdata.sharpetl.spark.transformation
package com.github.sharpdata.sharpetl.spark.datasource

import com.fasterxml.jackson.databind.ObjectMapper
import com.github.sharpdata.sharpetl.core.annotation.source
import com.github.sharpdata.sharpetl.core.api.Variables
import com.github.sharpdata.sharpetl.core.datasource.Source
import com.github.sharpdata.sharpetl.core.datasource.config.HttpFileDataSourceConfig
import com.github.sharpdata.sharpetl.core.repository.model.JobLog
import com.github.sharpdata.sharpetl.core.syntax.WorkflowStep
import com.github.sharpdata.sharpetl.core.util.{ETLLogger, HDFSUtil}
import com.github.sharpdata.sharpetl.spark.utils.ETLSparkSession
import org.apache.commons.io.{FileUtils, FilenameUtils}
import org.apache.http.HttpResponse
import org.apache.http.client.{ClientProtocolException, ResponseHandler}
import org.apache.http.impl.client._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.io.{File, IOException}
import java.nio.file.Paths

// $COVERAGE-OFF$
object HttpDownloadTransformer extends Transformer {
@source(types = Array("http_file"))
class HttpFileDataSource extends Source[DataFrame, SparkSession] {
var httpClient: CloseableHttpClient = _

lazy val mapper = new ObjectMapper

override def transform(args: Map[String, String]): DataFrame = {
downloadFile(args)
def read(step: WorkflowStep, jobLog: JobLog, executionContext: SparkSession, variables: Variables): DataFrame = {
val config = step.getSourceConfig[HttpFileDataSourceConfig]
downloadFile(config)
ETLSparkSession.sparkSession.emptyDataFrame
}

private def downloadFile(args: Map[String, String]): File = {

val httpProperties = HttpProperties.initHttpProperties(args)
private def downloadFile(config: HttpFileDataSourceConfig): File = {

val httpProperties = HttpProperties.initHttpProperties(config)
val httpRequest = httpProperties.initRequest()
ETLLogger.info(s"url: ${httpRequest.getURI.toString}")
if (httpClient == null) {
httpClient = HttpClients.createDefault()
}

val descDirPath = args.getOrElse("tempDestinationDir", "/tmp")
val descDirPath = config.tempDestinationDir
val sourceFileName = FilenameUtils.getName(httpRequest.getURI.toString)
val localDescPath = Paths.get(descDirPath, sourceFileName)
httpClient.execute(httpRequest, new HttpDownloadResponseHandler(localDescPath.toFile))

if (args.contains("hdfsDir")) {
val hdfsDir = args.getOrElse("hdfsDir", "/tmp")
val hdfsDescPath = Paths.get(hdfsDir, sourceFileName).toString
ETLLogger.info(s"upload the local file ${localDescPath.toString} to hdfs ${hdfsDescPath}")
HDFSUtil.moveFromLocal(localDescPath.toString, hdfsDescPath)
}
val hdfsDir = config.hdfsDir
val hdfsDescPath = Paths.get(hdfsDir, sourceFileName).toString
ETLLogger.info(s"upload the local file ${localDescPath.toString} to hdfs ${hdfsDescPath}")
HDFSUtil.moveFromLocal(localDescPath.toString, hdfsDescPath)

localDescPath.toFile
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ select from_unixtime(unix_timestamp('${DATA_RANGE_START}', 'yyyy-MM-dd HH:mm:ss'


-- step=2
-- source=transformation
-- className=com.github.sharpdata.sharpetl.spark.transformation.HttpTransformer
-- methodName=transform
-- transformerType=object
-- source=http
-- url=http://localhost:1080/get_workday?satrt=${START_TIME_TIMESTAMP}&end=${START_TIME_TIMESTAMP}
-- target=temp
-- tableName=`source_data`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ select from_unixtime(unix_timestamp('${DATA_RANGE_START}', 'yyyy-MM-dd HH:mm:ss'


-- step=2
-- source=transformation
-- className=com.github.sharpdata.sharpetl.spark.transformation.HttpTransformer
-- methodName=transform
-- transformerType=object
-- source=http
-- url=http://localhost:1080/get_workday?satrt=${START_TIME_TIMESTAMP}&end=${START_TIME_TIMESTAMP}
-- fieldName=types
-- jsonPath=$.phoneNumbers[*].type
Expand Down

This file was deleted.

Loading

0 comments on commit 52de1df

Please sign in to comment.