Skip to content

praveenkumarb1207/http.almaren

This branch is 37 commits behind modakanalytics/http.almaren:master.

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ab4f8f5 · Nov 21, 2022
Nov 21, 2022
Mar 24, 2022
Oct 17, 2022
Apr 4, 2022
Sep 23, 2020
Oct 18, 2022
Nov 21, 2022

Repository files navigation

HTTP Connector

Build Status

To add http.almaren dependency to your sbt build:

libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.5-3.3"

To run in spark-shell: For scala-version(2.12):

spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.8-3.3,com.github.music-of-the-ainur:http-almaren_2.12:1.2.5-3.3"

For scala-version(2.13):

spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.13:0.9.8-3.3,com.github.music-of-the-ainur:http-almaren_2.13:1.2.5-3.3"

Table of Contents

Maven / Ivy Package Usage

The connector is also available from the Maven Central repository. It can be used using the --packages option or the spark.jars.packages configuration property. Use the following value

version Connector Artifact
Spark 3.3.x and scala 2.13 com.github.music-of-the-ainur:http-almaren_2.13:1.2.5-3.3
Spark 3.3.x and scala 2.12 com.github.music-of-the-ainur:http-almaren_2.12:1.2.5-3.3
Spark 3.2.x and scala 2.12 com.github.music-of-the-ainur:http-almaren_2.12:1.2.5-3.2
Spark 3.1.x and scala 2.12 com.github.music-of-the-ainur:http-almaren_2.12:1.2.5-3.1
Spark 2.4.x and scala 2.12 com.github.music-of-the-ainur:http-almaren_2.12:1.2.5-2.4
Spark 2.4.x and scala 2.11 com.github.music-of-the-ainur:http-almaren_2.11:1.2.5-2.4

Methods

HTTP

It will perform a HTTP request for each Row.

$ curl -X PUT -H "Authorization: {SESSION_ID}" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "language=English" \
-d "product=32131314" \
-d "audience_=consumer_vr" \
https://localhost/objects/documents/534

Example

import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.http.HTTPConn.HTTPImplicit

import spark.implicits._

// Generating table "USER_DATA"

case class Data(firstName:String, lastName:String, age:Int, code:Long)

List(Data("Roger","Laura",25,2342324232L),
    Data("Robert","Dickson",88,3218313131L),
    Data("Daniel","Pedro",28,32323232L))
    .toDS
    .createOrReplaceTempView("USER_DATA")


// Don't infer the schema manually, just follow the steps:
// val jsonColumn = spark.sql("SELECT __BODY__ FROM ...").as[String]
// To generate the schema: spark.read.json(jsonColumn).schema.toDDL

val httpOutpustSchema = Some("`data` STRING,`headers` STRUCT<`Accept`: STRING, `Accept-Encoding`: STRING, `Cache-Control`: STRING, `Content-Length`: STRING, `Content-Type`: STRING, `Host`: STRING, `Pragma`: STRING, `User-Agent`: STRING, `X-Amzn-Trace-Id`: STRING>,`method` STRING,`origin` STRING,`url` STRING")


val df = almaren.builder
    .sourceSql("SELECT uuid() as id,* FROM USER_DATA").alias("DATA")
    .sql("""SELECT 
                id as __ID__,
                concat('http://httpbin.org/anything/person/',code) as __URL__,
                to_json(named_struct('data',named_struct('name',firstName + " " + lastName))) as __DATA__ 
            FROM DATA""")
    .http(method = "POST", threadPoolSize = 10, batchSize = 10000)
    .deserializer("JSON","__BODY__",httpOutpustSchema).alias("TABLE")
    .sql("""SELECT
                T.origin,
                D.firstName,
                D.lastName,D.age,
                T.__STATUS_CODE__ as status_code,
                T.url,
                T.__ERROR__ as error,
                T.__ELAPSED_TIME__ as request_time
            FROM TABLE T JOIN DATA D ON d.id = t.__ID__""")
    .batch

df.show(false)

Output:

20/10/15 01:04:32 INFO SourceSql: sql:{SELECT uuid() as id,* FROM USER_DATA}
20/10/15 01:04:33 INFO Alias: {DATA}
20/10/15 01:04:33 INFO Sql: sql:{SELECT
                id as __ID__,
                concat('http://httpbin.org/anything/person/',code) as __URL__,
                to_json(named_struct('data',named_struct('name',firstName + " " + lastName))) as __DATA__
            FROM DATA}
20/10/15 01:04:33 INFO MainHTTP: headers:{Map()}, method:{POST}, connectTimeout:{60000}, readTimeout{1000}, threadPoolSize:{10}
20/10/15 01:04:33 INFO JsonDeserializer: columnName:{__BODY__}, schema:{Some(`data` STRING,`headers` STRUCT<`Accept`: STRING, `Accept-Encoding`: STRING, `Cache-Control`: STRING, `Content-Length`: STRING, `Content-Type`: STRING, `Host`: STRING, `Pragma`: STRING, `User-Agent`: STRING, `X-Amzn-Trace-Id`: STRING>,`method` STRING,`origin` STRING,`url` STRING)}
20/10/15 01:04:33 INFO Sql: sql:{SELECT
                T.origin,
                D.firstName,
                D.lastName,D.age,
                T.__STATUS_CODE__ as status_code,
                T.url,
                T.__ERROR__ as error,
                T.__ELAPSED_TIME__ as request_time
            FROM TABLE T JOIN DATA D ON d.id = t.__ID__}
            
+-----------+---------+--------+---+-----------+---------------------------------------------+-----+------------+
|origin     |firstName|lastName|age|status_code|url                                          |error|request_time|
+-----------+---------+--------+---+-----------+---------------------------------------------+-----+------------+
|151.46.49.9|Roger    |Laura   |25 |200        |http://httpbin.org/anything/person/2342324232|null |651         |
|151.46.49.9|Robert   |Dickson |88 |200        |http://httpbin.org/anything/person/3218313131|null |653         |
|151.46.49.9|Daniel   |Pedro   |28 |200        |http://httpbin.org/anything/person/32323232  |null |543         |
+-----------+---------+--------+---+-----------+---------------------------------------------+-----+------------+

Parameters

Parameter Description Type
headers HTTP headers Map[String,String]
params HTTP params Map[String,String]
hiddenParams HTTP params which are hidden (not exposed in logs) Map[String,String]
method HTTP Method String
requestHandler Closure to handle HTTP request (Row,Session,String,Map[String,String],String) => requests.Respons
session Closure to handle HTTP sessions () = requests.Session
connectTimeout Timeout in ms to keep the connection keep-alive, it's recommended to keep this number high Int
readTimeout Maximum number of ms to perform a single HTTP request Int
threadPoolSize How many connections in parallel for each executor. parallelism = number of excutors * number of cores * threadPoolSize Int
batchSize How many records a single thread will process Int

Special Columns

Input:
Parameters Mandatory Description
__ID__ Yes This field will be in response of http.almaren component, it's useful to join data
__URL__ Yes Used to perform the HTTP request
__DATA__ No Data Content, used in POST/PUT Method HTTP requests
Output:
Parameters Description
__ID__ Custom ID , This field will be useful to join data
__BODY__ HTTP response
__HEADER__ HTTP header
__STATUS_CODE__ HTTP response code
__STATUS_MSG__ HTTP response message
__ERROR__ Java Exception
__ELAPSED_TIME__ Request time in ms

Methods

The following methods are supported:

  • POST
  • GET
  • HEAD
  • OPTIONS
  • DELETE
  • PUT

Session

You can give an existing session to the HTTP component. To see all details check the documentation

val newSession = () => {
    val s = requests.Session(headers = Map("Custom-header" -> "foo"))
    s.post("https://bar.com/login",data = Map("user" -> "baz", "password" -> "123"))
    s
}

almaren.builder
    .sourceSql("SELECT concat('http://localhost:3000/user/',first_name,last_name,'/',country) as __URL__,id as __ID__")
    .http(method = "GET", session = newSession)

Request Handler

You can overwrite the default requestHandler closure to give any custom HTTP Request.

val customHandler = (row:Row,session:Session,url:String, headers:Map[String,String], method:String,connectTimeout:Int, readTimeout:Int) => {
    method.toUpperCase match {
      case "GET" => session.get(url, headers = headers, readTimeout = readTimeout, connectTimeout = connectTimeout)
      case "DELETE" => session.delete(url, headers = headers, readTimeout = readTimeout, connectTimeout = connectTimeout)
      case "OPTIONS" => session.options(url, headers = headers, readTimeout = readTimeout, connectTimeout = connectTimeout)
      case "HEAD" => session.head(url, headers = headers, readTimeout = readTimeout, connectTimeout = connectTimeout)
      case "POST" => session.post(url, headers = headers, data = row.getAs[String](Alias.DataCol), readTimeout = readTimeout, connectTimeout = connectTimeout)
      case "PUT" => session.put(url, headers = headers, data = row.getAs[String](Alias.DataCol), readTimeout = readTimeout, connectTimeout = connectTimeout)
      case method => throw new Exception(s"Invalid Method: $method")
    }
}
     
almaren.builder
    .sql("...")
    .http(method = "POST", requestHandler = customHandler)

HTTP Batch

Is used to perform a single HTTP request with a batch of data. You can choose how to create the batch data using the batchDelimiter closure. The default behavior is to concatenate by new line.

$ curl -X PUT -H "Authorization: {SESSION_ID}" \
-H "Content-Type: text/csv" \
-H "Accept: text/csv" \
--data-binary @"filename" \
https://localhost/objects/documents/batch

Example

import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.http.HTTPConn.HTTPImplicit

import spark.implicits._

  val httpBatchDf = almaren.builder
    .sourceDataFrame(df)
    .sqlExpr("to_json(struct(*)) as __DATA__", "monotonically_increasing_id() as __ID__").alias("BATCH_DATA")
    .httpBatch(
      url = "http://127.0.0.1:3000/batchAPI",
      method = "POST",
      batchSize = 3,
      batchDelimiter = (rows: Seq[Row]) => s"""[${rows.map(row => row.getAs[String](Alias.DataCol)).mkString(",")}]""")
    .batch

httpBatchDf.show(false)

Output:

21/06/09 17:52:20 INFO SourceDataFrame:
21/06/09 17:52:20 INFO SqlExpr: exprs:{to_json(struct(*)) as __DATA__
monotonically_increasing_id() as __ID__}
21/06/09 17:52:20 INFO Alias: {BATCH_DATA}
21/06/09 17:52:20 INFO HTTPBatch: url:{http://127.0.0.1:3000/batchAPI}, headers:{Map()},params:{Map()}, method:{POST}, connectTimeout:{60000}, readTimeout{1000}, batchSize:{3}com.github.music.of.the.ainur.almaren.http.Test 93s
+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+---------------+--------------+---------+----------------+------------------------------+
|__ID__   |__BODY__                                                                                                                                                                                                 |__HEADER__                                                                                                                                          |__STATUS_CODE__|__STATUS_MSG__|__ERROR__|__ELAPSED_TIME__|__URL__                       |
+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+---------------+--------------+---------+----------------+------------------------------+
|[0, 1, 2]|{"data":[{"country":"London","first_name":"John","last_name":"Smith"},{"country":"India","first_name":"David","last_name":"Jones"},{"country":"Indonesia","first_name":"Michael","last_name":"Johnson"}]}|[date -> [Wed, 09 Jun 2021 12:22:23 GMT], content-type -> [application/json;charset=UTF-8], server -> [Mojolicious (Perl)], content-length -> [201]]|200            |OK            |null     |122             |http://127.0.0.1:3000/batchAPI|
|[3, 4]   |{"data":[{"country":"Brazil","first_name":"Chris","last_name":"Lee"},{"country":"Russia","first_name":"Mike","last_name":"Brown"}]}                                                                      |[date -> [Wed, 09 Jun 2021 12:22:23 GMT], content-type -> [application/json;charset=UTF-8], server -> [Mojolicious (Perl)], content-length -> [131]]|200            |OK            |null     |171             |http://127.0.0.1:3000/batchAPI|
+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+---------------+--------------+---------+----------------+------------------------------+```

Parameters

Parameter Description Type
url Used to perform the HTTP request String
headers HTTP headers Map[String,String]
params HTTP params Map[String,String]
hiddenParams HTTP params which are hidden (not exposed in logs) Map[String,String]
method HTTP Method String
requestHandler Closure to handle HTTP request (Row,Session,String,Map[String,String],String) => requests.Respons
session Closure to handle HTTP sessions () = requests.Session
connectTimeout Timeout in ms to keep the connection keep-alive, it's recommended to keep this number high Int
readTimeout Maximum number of ms to perform a single HTTP request Int
batchSize Number of records sent in a single HTTP transaction Int
batchDelimiter Closure used to determine how the batch data will be created (Seq[Row]) => String

Special Columns

Input:
Parameters Mandatory Description
__ID__ Yes This field will be in response of http.almaren component which is array[string] , it's useful to join data
__DATA__ Yes Data Content, used in POST/PUT Method HTTP requests
Output:
Parameters Description
__ID__ Custom ID , This field will be useful to join data which is of type array[string]
__BODY__ HTTP response
__HEADER__ HTTP header
__STATUS_CODE__ HTTP response code
__STATUS_MSG__ HTTP response message
__ERROR__ Java Exception
__DATA__ Payload (Data content) sent to each batch API request

Methods

The following methods are supported:

  • POST
  • GET
  • HEAD
  • OPTIONS
  • DELETE
  • PUT

Request Handler Batch

You can overwrite the default _requestHandlerBatch_ closure to give any custom HTTP Request.

  val customHandlerBatch = (data:String, session:Session, url:String, headers:Map[String, String], params:Map[String, String], method:String, connectTimeout:Int, readTimeout:Int) => {
    method.toUpperCase match {
      case "GET" => session.get(url, headers = headers, params = params, readTimeout = readTimeout, connectTimeout = connectTimeout)
      case "DELETE" => session.delete(url, headers = headers, params = params, readTimeout = readTimeout, connectTimeout = connectTimeout)
      case "OPTIONS" => session.options(url, headers = headers, params = params, readTimeout = readTimeout, connectTimeout = connectTimeout)
      case "HEAD" => session.head(url, headers = headers, params = params, readTimeout = readTimeout, connectTimeout = connectTimeout)
      case "POST" => session.post(url, headers = headers, params = params, data = data, readTimeout = readTimeout, connectTimeout = connectTimeout)
      case "PUT" => session.put(url, headers = headers, params = params, data = data, readTimeout = readTimeout, connectTimeout = connectTimeout)
      case method => throw new Exception(s"Invalid Method: $method")
    }
  }
     
almaren.builder
    .sql("...")
    .httpBatch(method = "POST", requestHandler = customHandlerBatch)
Batch Delimiter

Is a closure used to determine how the batch data will be created. The default behavior is to concatenate by new line. Example, if your __DATA__ column has string by row. It will create a batch where the number of lines is defined by the batchSize parameter:

foo
bar
baz
...
Examples

If the __DATA__ column is a JSON string {foo:"bar"} where you need to convert to an array of JSON [{foo:"bar"},{foo:"baz"}}:

(rows:Seq[Row]) => s"""[${rows.map(row => row.getAs[String](Alias.DataCol)).mkString(",")}]""")

How to concatenate by new line:

(rows:Seq[Row]) => rows.map(row => row.getAs[String](Alias.DataCol)).mkString("\n")

About

HTTP Connector For Almaren Framework

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Scala 95.3%
  • Perl 4.7%