Thursday, June 23, 2016

Stream Definitions for wso2 complex event processor

An event is a unit of data, and an event stream is a sequence of events of a particular type. The type of events can be defined as an event stream definition. so different stream definitions are required to be used to store several data streams into the same column family, different stream versions should be used with the same stream name corresponding to the column family.
After the stream definition is sent once in each stream, the types of data transferring will be as chunks to the Data Receiver where the data is read as the given stream definition.
 

Event stream definition

Definitions of the event streams are stored in the filesystem as deployable artifacts in the  <PRODUCT _HOME>/repository/deployment/server/eventstreams/ directory as .json files. These are hot deployable files and can be added/removed when the server is up and running. Although a Data Stream can be defined using Java in code.
 

Sample format of event stream definition

{
  'name': 'stream.name',
  'version': '1.0.0',
  'nickName': 'stream nick name',
  'description': 'description of the stream',
  'metaData':[
          {'name':'meta_data_1','type':'STRING'},
          {'name':'meta_data_2','type':'INT'}
  ],
  'correlationData':[
          {'name':'correlation_data_1','type':'STRING'},
          {'name':'correlation_data_2','type':'DOUBLE'}
  ],
  'payloadData':[
          {'name':'payload_data_1','type':'BOOL'},
          {'name':'payload_data_2','type':'LONG'}
  ]
}

This is the sample format of stream definition then let's look the properties of stream definition.
  • Event Stream Name  Name of the event stream.
  • Event Stream Version Version of the event stream. Default value is 1.0.0.The default version should be "1.0.0" and it can be incremented when another stream is required to be added to the same Cassandra column family or if the existing stream is to be edited. The important thing to note here is that a stream cannot be deleted or edited at the moment but when required, another stream should be created with the same name but with a different version.
  • Event Stream Description Description of the events stream. (This is optional.) 
  • Event Stream Nick-Name Nicknames of an event streams separated by commas. (This is optional.)
  • Stream Attributes   Stream Attributes contains the data of the event. These are divided into three logical separations to give more usability and maintenance to the user. It is not mandatory to have attributes in all three sections, but there should be at least one section with at least one attribute defined. Also attribute names should be unique within each section and Their type should be specified as the "type" in each field.
    • Meta Data : Contains the meta information of the events.e.g.: character set encoding and message type. (Referred to as  meta_<attribute name>.)
    • Correlation Data : Contains the correlation information of the events corresponds to the data required to correlate between different monitoring points such as the "activity ID" of a message flow (Referred to as  correlation_<attribute name>.).
    • Payload Data : Contains the actual data that the event intends to have such as SOAP header of the message, SOAP body of the message and properties intercepted from the message.  (Referred to as <attribute name>.)
     

Sample Scenario for stream definition

Let's assume our input stream  in below format to the cep. before into our process, we have to format the data into our stream definition.
timestamp:19900813115534,isPowerSaverEnabled:false,sensorId:0,sensorName:temperature,longitude:20.44345,latitude:5.443435,humidity:8.9,sensorValue:1.23434

So, stream definition for the above event below one is the sample stream definition.

{
  "name": "org.wso2.event.sensor.stream",
  "version": "1.0.0",
  "nickName": "",
  "description": "",
  "metaData": [
    {
      "name": "timestamp",
      "type": "LONG"
    },
    {
      "name": "isPowerSaverEnabled",
      "type": "BOOL"
    },
    {
      "name": "sensorId",
      "type": "INT"
    },
    {
      "name": "sensorName",
      "type": "STRING"
    }
  ],
  "correlationData": [
    {
      "name": "longitude",
      "type": "DOUBLE"
    },
    {
      "name": "latitude",
      "type": "DOUBLE"
    }
  ],
  "payloadData": [
    {
      "name": "humidity",
      "type": "FLOAT"
    },
    {
      "name": "sensorValue",
      "type": "DOUBLE"
    }
  ]
}

Once we set the stream definition hereafter our event stream will send the date in below format to the data receivers or execution plans.

{
    "event": {
        "metaData": {
            "timestamp": 56783,
            "isPowerSaverEnabled": false,
            "sensorId": 4,
            "sensorName": "data3"
        },
        "correlationData": {
            "longitude": 1.23434,
            "latitude": 1.23434
        },
        "payloadData": {
            "humidity": 6.6,
            "sensorValue": 20.44345
        }
    }
}





I hope this post will give little understand about stream definition.

References