Saturday, June 25, 2016

Analytics Event Publisher for WSO2 CEP

WSO2 Complex Event Processor (CEP) is a lightweight, easy-to-use, open source Complex Event Processing server (CEP). It identifies the most meaningful events within the event cloud, analyzes their impact, and acts on them in real-time. Event publishers publish events to external systems via various transport protocols. and store data to databases for future analysis. Like the event receivers, this component also has different adapter implementations.


Following are the adapters that are available by default.

You can write custom event publishers to support other transports or you can download existing custom adapters from the store. for more....

Follow the steps below to write the custom event publishers. use the below maven command to generate the archetype for the custom event publishers.
mvn archetype:generate -DarchetypeGroupId=org.wso2.carbon.extension.archetype -DarchetypeArtifactId=org.wso2.carbon.extension.analytics.publisher-archetype -DarchetypeVersion=2.0.1 -DgroupId=org.wso2.carbon.extension.analytics -DartifactId=org.wso2.carbon.extension.analytics.publisher.sample -Dpackage=org.wso2.carbon.extension.analytics.publisher.sample -Dversion=1.0.0 -DarchetypeRepository=http://maven.wso2.org/nexus/content/repositories/wso2-public/

-DgroupId, -DartifactId, -Dpackage and -Dversion are user need to set before running the command but by default custom event publishers maintaining common values. -Dpackage for setting the folder structure it's optional.
-DgroupId=org.wso2.carbon.extension.analytics
-DartifactId=org.wso2.carbon.extension.analytics.publisher.<name> 
-Dpackage=org.wso2.carbon.extension.analytics.publisher.<name>
-Dversion=1.0.0 

Once you complete the above step you will get the below output in the console. Name of the publisher should be in camel case.
After the creation of custom event publisher, we will get files in below format.

 Normally current implementation doesn’t support for test case so we can skip the test while build or just remove the java file.

pom.xml

This is the sample pom file for event publisher. These dependencies are mandatory for publisher except org.wso2.cep.integration.common.utils it's used for integration test but the current version does not support to integration test so skip that part. If we wish we can develop test and run with product-cep.

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.wso2.carbon.extension.analytics</groupId>
    <artifactId>org.wso2.carbon.extension.analytics.publisher.sample</artifactId>
    <version>1.0.0</version>

    <packaging>bundle</packaging>
    <name>WSO2 Carbon - Analytics Publisher Library For Sample</name>
    <url>http://wso2.org</url>
    <dependencies>
        <dependency>
            <groupId>org.wso2.carbon</groupId>
            <artifactId>org.wso2.carbon.logging</artifactId>
            <version>${version.org.wso2.carbon}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.log4j.wso2</groupId>
            <artifactId>log4j</artifactId>
            <version>${version.org.apache.log4j.wso2}</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>${version.commons-logging}</version>
        </dependency>
        <dependency>
            <groupId>org.wso2.carbon.analytics-common</groupId>
            <artifactId>org.wso2.carbon.event.output.adapter.core</artifactId>
            <version>${version.org.wso2.carbon.analytics-common}</version>
        </dependency>
        <dependency>
            <groupId>org.wso2.cep</groupId>
            <artifactId>org.wso2.cep.integration.common.utils</artifactId>
            <version>${version.org.wso2.cep}</version>
            <scope>test</scope>
        </dependency>
    </dependencies> 
 <properties>
        <version.org.wso2.carbon>4.4.3</version.org.wso2.carbon>
        <version.org.apache.log4j.wso2>1.2.17.wso2v1</version.org.apache.log4j.wso2>
        <version.junit>4.10</version.junit>
        <version.commons-logging>1.1.3</version.commons-logging>
        <version.org.wso2.carbon.analytics-common>5.0.9</version.org.wso2.carbon.analytics-common>
        <version.org.wso2.cep>4.0.0</version.org.wso2.cep>
        <version.org.apache.felix>1.7.4</version.org.apache.felix>
    </properties>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.felix</groupId>
                <artifactId>maven-scr-plugin</artifactId>
                <version>${version.org.apache.felix}</version>
                <extensions>true</extensions>
                <executions>
                    <execution>
                        <id>generate-scr-descriptor</id>
                        <goals>
                            <goal>scr</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.felix</groupId>
                <artifactId>maven-bundle-plugin</artifactId>
                <extensions>true</extensions>
                <configuration>
                    <instructions>
                        <Bundle-SymbolicName>org.wso2.carbon.extension.analytics.publisher.sample</Bundle-SymbolicName>
                        <Bundle-Name>org.wso2.carbon.extension.analytics.publisher.sample</Bundle-Name>
                        <Private-Package>
                            org.wso2.carbon.extension.analytics.publisher.sample.internal,
                            org.wso2.carbon.extension.analytics.publisher.sample.internal.*
                        </Private-Package>
                        <Export-Package>
                            !org.wso2.carbon.extension.analytics.publisher.sample.internal,
                            !org.wso2.carbon.extension.analytics.publisher.sample.internal.*,
                            org.wso2.carbon.extension.analytics.publisher.sample.*,
                        </Export-Package>
                        <Import-Package>
                            org.wso2.carbon.event.output.adapter.core,
                            org.wso2.carbon.event.output.adapter.core.*,
                            !javax.xml.namespace,
                            javax.xml.namespace; version=0.0.0
                        </Import-Package>
                        <DynamicImport-Package>*</DynamicImport-Package>
                    </instructions>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>wso2-nexus</id>
            <name>WSO2 internal Repository</name>
            <url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
                <checksumPolicy>ignore</checksumPolicy>
            </releases>
        </repository>
        <repository>
            <id>wso2.releases</id>
            <name>WSO2 internal Repository</name>
            <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
                <checksumPolicy>ignore</checksumPolicy>
            </releases>
        </repository>
        <repository>
            <id>wso2.snapshots</id>
            <name>Apache Snapshot Repository</name>
            <url>http://maven.wso2.org/nexus/content/repositories/snapshots/</url>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
        <repository>
            <id>sonatype.releases</id>
            <url>https://oss.sonatype.org/content/repositories/releases/</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>wso2.releases</id>
            <name>WSO2 internal Repository</name>
            <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
                <checksumPolicy>ignore</checksumPolicy>
            </releases>
        </pluginRepository>
        <pluginRepository>
            <id>wso2.snapshots</id>
            <name>Apache Snapshot Repository</name>
            <url>http://maven.wso2.org/nexus/content/repositories/snapshots/</url>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </pluginRepository>
        <pluginRepository>
            <id>wso2-nexus</id>
            <name>WSO2 internal Repository</name>
            <url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
                <checksumPolicy>ignore</checksumPolicy>
            </releases>
        </pluginRepository>
    </pluginRepositories>
</project>


SampleEventAdapterServiceDS.java

Activate method will register the component with the service. 

package org.wso2.carbon.extension.analytics.publisher.sample.internal.ds;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterFactory;
import org.wso2.carbon.extension.analytics.publisher.sample.SampleEventAdapterFactory;


/**
 * @scr.component component.name="output.Sample.AdapterService.component" immediate="true"
 */

public class SampleEventAdapterServiceDS {

    private static final Log log = LogFactory.getLog(SampleEventAdapterServiceDS.class);

    /**
     * Deployment of the Sample event adapter service will be done.
     *
     * @param context
     */
    protected void activate(ComponentContext context) {

        try {
            OutputEventAdapterFactory sampleEventAdaptorFactory =
                    new SampleEventAdapterFactory();
            context.getBundleContext().registerService(OutputEventAdapterFactory.class.getName(),
                    sampleEventAdaptorFactory, null);
            if (log.isDebugEnabled()) {
                log.debug("Successfully deployed the Output Sample event service");
            }
        } catch (RuntimeException e) {
            log.error("Can not create  the Output Sample event service ", e);
        }
    }
}

SampleEventAdapter.java

Here publish is the method which is run for each event. Other methods follow the thread life cycle. Object message will have the values which are passed by the process. We can go further with the data/event.

package org.wso2.carbon.extension.analytics.publisher.sample;

import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter;
import org.wso2.carbon.event.output.adapter.core.exception.ConnectionUnavailableException;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException;
import org.wso2.carbon.event.output.adapter.core.exception.TestConnectionNotSupportedException;

import java.util.Map;

public class SampleEventAdapter implements OutputEventAdapter {

    /**
     * This method is called when initiating event publisher bundle.
     * Relevant code segments which are needed when loading OSGI bundle can be included in this method.
     *
     * @throws OutputEventAdapterException
     */
    public void init() throws OutputEventAdapterException {

    }

    /**
     * This method is used to test the connection of the publishing server.
     *
     * @throws TestConnectionNotSupportedException
     * @throws ConnectionUnavailableException
     */
    public void testConnect() throws TestConnectionNotSupportedException, ConnectionUnavailableException {

    }

    /**
     * Can be called to connect to back end before events are published.
     *
     * @throws ConnectionUnavailableException
     */
    public void connect() throws ConnectionUnavailableException {

    }

    /**
     * Publish events. Throws ConnectionUnavailableException if it cannot connect to the back end.
     *
     * @param o
     * @param map
     * @throws ConnectionUnavailableException
     */
    public void publish(Object message, Map<String, String> map) throws ConnectionUnavailableException {

    }

    /**
     * Will be called after publishing is done, or when ConnectionUnavailableException is thrown.
     */
    public void disconnect() {

    }

    /**
     * The method can be used to clean all the resources consumed.
     */
    public void destroy() {

    }

    /**
     * Checks whether events get accumulated at the adapter and clients connect to it to collect events.
     *
     * @return
     */
    public boolean isPolled() {
        return false;
    }
}

SampleEventAdapterFactory.java

getType is the method will display the name in the publisher drop town list. getSupportedMessgaeFormats is the method to  display the supported messages formats by our publisher.


package org.wso2.carbon.extension.analytics.publisher.sample;

import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterFactory;
import org.wso2.carbon.event.output.adapter.core.Property;

import java.util.List;
import java.util.Map;

/**
 * The Sample event adapter factory class to create an Sample output adapter
 */
public class SampleEventAdapterFactory extends OutputEventAdapterFactory {

    /**
     * Here type needs to be specified,
     * this string will be displayed in the publisher interface in the adapter type drop down list.
     *
     * @return
     */
    @Override
    public String getType() {
        return SampleEventAdapterConstants.ADAPTER_TYPE;
    }

    /**
     * Specify supported message formats for the created publisher type.
     *
     * @return
     */
    @Override
    public List<String> getSupportedMessageFormats() {
        List<String> supportedMessageFormats = new ArrayList<>();

  return supportedMessageFormats;
    }

    /**
     * Here static properties have to be specified.
     * These properties will use the values assigned when creating a publisher.
     * For more information on adapter properties see Event Publisher Configuration.
     *
     * @return
     */
    @Override
    public List<Property> getStaticPropertyList() {
         return new ArrayList<>();
    }

    /**
     * You can define dynamic properties similar to static properties,
     * the only difference is dynamic property values can be derived by events handling by publisher.
     * For more information on adapter properties see Event Publisher Configuration.
     *
     * @return
     */
    @Override
    public List<Property> getDynamicPropertyList() {
         return new ArrayList<>();
    }

    /**
     * Specify any hints to be displayed in the management console.
     *
     * @return
     */
    @Override
    public String getUsageTips() {
        return null;
    }

    /**
     * This method creates the publisher by specifying event adapter configuration
     * and global properties which are common to every adapter type.
     *
     * @param outputEventAdapterConfiguration
     * @param map
     * @return
     */
    @Override
    public OutputEventAdapter createEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration,
                                                 Map<String, String> map) {
        return new SampleEventAdapter(eventAdapterConfiguration);
    }
}

Once we compleat the development build the extension using mvn clean install and get the jar (org.wso2.carbon.extension.analytics.publisher.sample-1.0.0.jar) form target forder then paste it in cep dropins(wso2cep-4.1.0/repository/components/dropins) folder.

Now start the server and click to create new event publisher. In the list, you will get the publisher with youe name(sample). Now we can add our confiquration to work with our event publisher.



References.