HyperExecutor Example

Introduction

This is an example of a Stream Processing lambda that makes use of the HyperExecutor system. The lambda needs to be implemented as a JVM lambda that can process one message at a time that comes from the stream.

Requirements

  • Gestalt Platform access

FizzBuzz Example

A super simple example of stream processing involves taking a sequence of integers and transforming them according to "FizzBuzz". The example code can be found here : FizzBuzz

Setup the project

In order to create a simple Scala project, you should see the example build.sbt above. You also need to organize your code into a directory structure that will work for your chosen build tool. In this case we place our files in src/main/scala/com/galacticfog/ directory.

FizzBuzz.scala

You can create a super simple scala project, and include a class called FizzBuzz that looks like this :

package com.galacticfog

import java.util.Base64

import org.json.simple.parser.JSONParser
import org.json.simple.JSONObject
import org.slf4j.LoggerFactory

class FizzBuzz {

  val log = LoggerFactory.getLogger( getClass )
  var parser = new JSONParser()

  log.debug( "**** STARTING FIZZBUZZ *****")

  def init(): Unit = {
    log.debug( "lambda init()" )
  }

  def destroy() {
    log.debug( "lambda destroy()" )
  }

  def fizz( stringEvent : String, stringContext : String ) : String = {

    //
    // first parse the event json
    //

    log.debug(s"event : ${stringEvent}")
    val event = parser.parse( stringEvent ).asInstanceOf[JSONObject]

    //
    // now fetch the value and decode it
    //

    val kafkaVal = event.get( "value" ).asInstanceOf[String]
    val jsonVal = new String( Base64.getDecoder.decode( kafkaVal ) )

    //
    // now the business logic.  do whatever you like with the event data that you are expecting.
    // in this case we expect a json payload with a integer called "index"
    //

    val msg = parser.parse( jsonVal ).asInstanceOf[JSONObject]
    val number = msg.get( "index" ).asInstanceOf[Long].toInt
    log.debug( s"index : ${number}" )

    //
    // now do fizzbuzz
    //

    val out = number match {
      case n : Int if ( n == 0 ) => n.toString
      case n : Int if ( (n % 5 == 0) && (n % 3 == 0))  => {
        "fizzbuzz"
      }
      case n : Int if (n % 3 == 0) => {
        "fizz"
      }
      case n : Int if (number % 5 == 0) => {
        "buzz"
      }
      case dunno => dunno.toString
    }

    //
    // output is String
    //

    log.debug( s"in : ${number} -> out : ${out}" )
    out
  }

Streaming Infrastructure

The gestalt streaming infrastructure will handle the communication with the streaming subsystems. (e.g. Kafka in this example) In doing so, it will maintain ordering in the stream while allowing for parallel computations in your lambda. This is controlled by the parallelization setting in your Stream Defintion. The input data format looks like the following JSON object :

{
    "key":"test-key",
    "value":"eyAiaW5kZXgiIDogOTkgfQ==",
    "offset":199,
    "topic":"avro-input"
}

As you can see above, you receive some metadata about the record itself, specifically the topic, offset, key as well as the actual value itself. It should also be noted that the Gestalt streaming subsystem will read the records value as an Array[Byte] which it will then Base64.encode as a string before passing into your lambda function.

This is to simplify the processing for now, but this is subject to change in the future.

Build and Pacakage the Project

SBT includes several useful plugins for packaging JAR files. In the example we make use of the following plugin in our project/plugins.sbt file.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

Using this plugin allows for simply building and packaging the project like so :

sbt clean update compile assembly

After the build you will find a publishable artifact in the directory found here :

target/scala-2.12/fizzbuzz-assembly-0.1.jar

Publish the artifact

The artifact will need to reside somewhere that Gestalt Laser can access via HTTP. This is typically an S3 bucket, or some other internal CDN that can host binary artifacts. This example will use an S3 bucket for this.

Define the Lambda in Gestalt

  1. Navigate to your desired Environment and on the Create Menu Click Create Lambda
  2. Select the desired Lambda Provider and Name
  3. Select the a JVM Runtime and select Package Type
  4. Specify the Handler as com.galacticfog.FizzBuzz;fizz
  5. Specify the Package URL as the place where you hosted your artifact

No other options are technically necessary for this example

Set up the Stream Process Definition

This exercise will be covered in a separate guide found here : [TODO]