Friday, August 26, 2016

Custom Window extension for siddhi

Siddhi  Window Extension allows events to be collected and expired without altering the event format based on the given input parameters like the Window operator. In this post, we are going to look how to write a custom window extension for siddhi and test case to test the function. By default in window extension archetype 2.0.0 it generates the code for length window so let's go deep into code and get some basic knowledge about the implementation.

First of all, we need to generate the source code from archetype copy the below code and run it in your directory.

mvn archetype:generate -DarchetypeGroupId=org.wso2.carbon.extension.archetype -DarchetypeArtifactId=org.wso2.extension.siddhi.window-archetype -DarchetypeVersion=2.0.0 -DgroupId=org.wso2.siddi.extension.window -DartifactId=org.wso2.siddi.extension.window.sample -Dversion=1.0.0 -DarchetypeRepository=http://maven.wso2.org/nexus/content/repositories/wso2-public/

After completing the previous step we will get the template in below format with the sample code for length window(since we are adding the name as Sample source will generate for sample window).

 

SampleWindow.java

/*
 * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * WSO2 Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.wso2.siddi.extension.window;

import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.MetaComplexEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.table.EventTable;
import org.wso2.siddhi.core.util.collection.operator.Finder;
import org.wso2.siddhi.core.util.parser.CollectionOperatorParser;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

import java.util.List;
import java.util.Map;

/**
 * Sample custom window.
 */
public class SampleWindow extends WindowProcessor implements FindableProcessor {
    private int length;
    private int count = 0;
    private ComplexEventChunk<StreamEvent> eventsToBeExpired;


    /**
     * The init method of the WindowProcessor, this method will be called before other methods
     *
     * @param attributeExpressionExecutors the executors of each function parameters
     * @param executionPlanContext         the context of the execution plan
     */
    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
        eventsToBeExpired = new ComplexEventChunk<StreamEvent>();
        if (attributeExpressionExecutors.length == 1) {
            length = (Integer) ((ConstantExpressionExecutor) attributeExpressionExecutors[0]).getValue();
        } else {
            throw new ExecutionPlanValidationException("Length window should only have one parameter (<int> windowLength), but found "  
+ attributeExpressionExecutors.length + " input attributes");
        }
    }

    /**
     * The main processing method that will be called upon event arrival
     *
     * @param streamEventChunk  the stream event chunk that need to be processed
     * @param nextProcessor     the next processor to which the success events need to be passed
     * @param streamEventCloner helps to clone the incoming event for local storage or modification
     */
    @Override
    protected synchronized void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor,
                                        StreamEventCloner streamEventCloner) {
        long currentTime = executionPlanContext.getTimestampGenerator().currentTime();
        while (streamEventChunk.hasNext()) {
            StreamEvent streamEvent = streamEventChunk.next();
            StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent);
            clonedEvent.setType(StreamEvent.Type.EXPIRED);
            if (count < length) {
                count++;
                this.eventsToBeExpired.add(clonedEvent);
            } else {
                StreamEvent firstEvent = this.eventsToBeExpired.poll();
                if (firstEvent != null) {
                    firstEvent.setTimestamp(currentTime);
                    streamEventChunk.insertBeforeCurrent(firstEvent);
                    this.eventsToBeExpired.add(clonedEvent);
                } else {
                    streamEventChunk.insertBeforeCurrent(clonedEvent);
                }
            }
        }
        nextProcessor.process(streamEventChunk);
    }

    /**
     * To find events from the processor event pool, that the matches the matchingEvent based on finder logic.
     *
     * @param complexEvent the event to be matched with the events at the processor
     * @param finder        the execution element responsible for finding the corresponding events that matches
     *                      the matchingEvent based on pool of events at Processor
     * @return the matched events
     */
    @Override
    public StreamEvent find(ComplexEvent complexEvent, Finder finder) {
        return finder.find(complexEvent, eventsToBeExpired, streamEventCloner);
    }

    /**
     * To construct a finder having the capability of finding events at the processor that corresponds to the incoming
     * matchingEvent and the given matching expression logic.
     *
     * @param expression                  the matching expression
     * @param matchingMetaComplexEvent    the meta structure of the incoming matchingEvent
     * @param executionPlanContext        current execution plan context
     * @param variableExpressionExecutors the list of variable ExpressionExecutors already created
     * @param eventTableMap               map of event tables
     * @param matchingStreamIndex         the stream index of the incoming matchingEvent
     * @param withinTime                  the maximum time gap between the events to be matched
     * @return finder having the capability of finding events at the processor against the expression and incoming
     * matchingEvent
     */
    @Override
    public Finder constructFinder(Expression expression, MetaComplexEvent matchingMetaComplexEvent,
                                  ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors,
                                  Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) {
        return CollectionOperatorParser.parse(expression, matchingMetaComplexEvent, executionPlanContext,
                variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);
    }

    /**
     * This will be called only once and this can be used to acquire
     * required resources for the processing element.
     * This will be called after initializing the system and before
     * starting to process the events.
     */
    @Override
    public void start() {
        //Implement start logic to acquire relevant resources
    }

    /**
     * This will be called only once and this can be used to release
     * the acquired resources for processing.
     * This will be called before shutting down the system.
     */
    @Override
    public void stop() {
        //Implement stop logic to release the acquired resources
    }

    /**
     * Used to collect the serializable state of the processing element, that need to be
     * persisted for the reconstructing the element to the same state on a different point of time
     *
     * @return stateful objects of the processing element as an array
     */
    @Override
    public Object[] currentState() {
        return new Object[]{eventsToBeExpired.getFirst(), count};
    }

    /**
     * Used to restore serialized state of the processing element, for reconstructing
     * the element to the same state as if was on a previous point of time.
     *
     * @param state the stateful objects of the element as an array on
     *              the same order provided by currentState().
     */
    @Override
    public void restoreState(Object[] state) {
        eventsToBeExpired.clear();
        eventsToBeExpired.add((StreamEvent) state[0]);
        count = (Integer) state[1];
    }
}

Here we need deep level understanding about 2 methods.

1. Init
This is the method will initiate the process and only the first time in the process so here we will initiate the process and this is the first method will run in the process when we call this window operation.  attributeExpressionExecutors is the ExpressionExecutor which is contain the values we passed to this operation through the quary (#window.Sample:SampleWindow(4)). In the above sample we just get the length of the window.

2. Process
This is the method handle all the operation once init method initiate the process. streamEventChunk is the Chunk will contains the event data based on the data we will update the process. here in this sample we copy the event and make it as EXPIRED on of the event type. Then we are checking weather the count is exceed the length or not, If not exceed the length then just we add the event into eventsToBeExpired it's an ComplexEventChunk contains the events which are on the queue to expire but still not expired.

If the count exceed the length then in the else part we call the top event through poll and assign it to firstEvent and setTime as currentTime then we make it as expired event through add  streamEventChunk.insertBeforeCurrent(firstEvent). After that only we add the new event into eventsToBeExpired  Chunk.

 

Sample.siddhiext

#
# Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
 
SampleWindow=org.wso2.siddi.extension.window.SampleWindow

This is an another important file to the extension call sample.siddhiext this extension should be *.siddhiext and it's need to assign a string to our class with package name(we will call the operation with that string and this file name here eg:#window.Sample:SampleWindow(4)).

 

SampleWindowTestCase.java

/*
 * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * WSO2 Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.wso2.siddi.extension.window;

import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.util.EventPrinter;


public class SampleWindowTestCase {
    private static Logger logger = Logger.getLogger(SampleWindowTestCase.class);
    private int inEventCount;
    private boolean eventArrived;

    @Before
    public void init() {
        inEventCount = 0;
        eventArrived = false;
    }

    @Test
    public void LengthWindow() throws InterruptedException {
        logger.info("Testing Sample length window with no of events smaller than window size");

        SiddhiManager siddhiManager = new SiddhiManager();

        String cseEventStream = "define stream cseEventStream (symbol string, price float, volume int);";
        String query = "" +
                "@info(name = 'query1') " +
                "from cseEventStream#window.Sample:SampleWindow(4) " +
                "select symbol,price,volume " +
                "insert all events into outputStream ;";

        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);

        executionPlanRuntime.addCallback("query1", new QueryCallback() {
            @Override
            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
                EventPrinter.print(timeStamp, inEvents, removeEvents);
                Assert.assertEquals("Message order inEventCount", inEventCount, inEvents[0].getData(2));
                Assert.assertEquals("Events cannot be expired", false, inEvents[0].isExpired());
                inEventCount = inEventCount + inEvents.length;
                eventArrived = true;
            }
        });

        InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream");
        executionPlanRuntime.start();
        inputHandler.send(new Object[]{"IBM", 700f, 0});
        inputHandler.send(new Object[]{"WSO2", 60.5f, 1});
        inputHandler.send(new Object[]{"IBM", 700f, 2});
        inputHandler.send(new Object[]{"WSO2", 60.5f, 3});
        inputHandler.send(new Object[]{"IBM", 700f, 4});
        inputHandler.send(new Object[]{"WSO2", 60.5f, 5});
        Thread.sleep(500);
        Assert.assertEquals(6, inEventCount);
        Assert.assertTrue(eventArrived);
        executionPlanRuntime.shutdown();
    }
}
We are writing this class to test the function in the development period and the testing time using SiddhiManager.

 

TestCase Output



Events{ @timestamp = 1473503429714, inEvents = [Event{timestamp=1473503429714, data=[IBM, 700.0, 0], isExpired=false}], RemoveEvents = null }
Events{ @timestamp = 1473503429717, inEvents = [Event{timestamp=1473503429717, data=[WSO2, 60.5, 1], isExpired=false}], RemoveEvents = null }
Events{ @timestamp = 1473503429717, inEvents = [Event{timestamp=1473503429717, data=[IBM, 700.0, 2], isExpired=false}], RemoveEvents = null }
Events{ @timestamp = 1473503429717, inEvents = [Event{timestamp=1473503429717, data=[WSO2, 60.5, 3], isExpired=false}], RemoveEvents = null }
Events{ @timestamp = 1473503429718, inEvents = [Event{timestamp=1473503429718, data=[IBM, 700.0, 4], isExpired=false}], RemoveEvents = [Event{timestamp=1473503429718, data=[IBM, 700.0, 0], isExpired=true}] }
Events{ @timestamp = 1473503429718, inEvents = [Event{timestamp=1473503429718, data=[WSO2, 60.5, 5], isExpired=false}], RemoveEvents = [Event{timestamp=1473503429718, data=[WSO2, 60.5, 1], isExpired=true}] }
Here until the 5th element arrive there is no expired events only inEvents but from the 5th event arrive 1st event start to expire.

If you want to test this sample extension please try it in siddhi try it tool through  learn this post.