首页 文章

参加Siddhi的3个活动

提问于
浏览
0

我想加入Siddhi有条件选择的3个赛事 . 我这样做是通过在存储桶定义中定义两个查询 .

这是第一个查询定义:

from inStream[severity == 1] as a1
join inStream[name == 'name1'] as a2
on a1.time == a2.time
insert into midStream a1.product as a1_product;

第二个:

from midStream
join inStream[vendor == 'aVendor'] as a3
on a3.product == a1_product
insert into outStream a3.time as a3_time;

执行查询时,我不断收到ArrayIndexOutOfBounds异常 . 我究竟做错了什么?异常日志:

java.lang.ArrayIndexOutOfBoundsException: -1
        at org.wso2.siddhi.core.event.StateEvent.getStreamEvent(StateEvent.java:51)
        at org.wso2.siddhi.core.executor.expression.VariableExpressionExecutor.execute(VariableExpressionExecutor.java:128)
        at org.wso2.siddhi.core.executor.conditon.compare.CompareConditionExecutor.execute(CompareConditionExecutor.java:46)
        at org.wso2.siddhi.core.query.processor.join.JoinProcessor.process(JoinProcessor.java:117)
        at org.wso2.siddhi.core.query.processor.handler.SimpleHandlerProcessor.processHandler(SimpleHandlerProcessor.java:142)
        at org.wso2.siddhi.core.query.processor.handler.SimpleHandlerProcessor.receive(SimpleHandlerProcessor.java:75)
        at org.wso2.siddhi.core.stream.StreamJunction.send(StreamJunction.java:36)
        at org.wso2.siddhi.core.query.projector.QueryProjector.send(QueryProjector.java:240)
        at org.wso2.siddhi.core.query.projector.QueryProjector.process(QueryProjector.java:212)
        at org.wso2.siddhi.core.query.processor.join.JoinProcessor.process(JoinProcessor.java:118)
        at org.wso2.siddhi.core.query.processor.handler.SimpleHandlerProcessor.processHandler(SimpleHandlerProcessor.java:142)
        at org.wso2.siddhi.core.query.processor.handler.SimpleHandlerProcessor.receive(SimpleHandlerProcessor.java:75)
        at org.wso2.siddhi.core.stream.StreamJunction.send(StreamJunction.java:36)
        at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:46)
        at org.wso2.carbon.cep.siddhi.backend.SiddhiBackEndRuntime.insertEvent(SiddhiBackEndRuntime.java:110)
        at org.wso2.carbon.cep.core.internal.CEPBucket.insertEvent(CEPBucket.java:171)
        at org.wso2.carbon.cep.core.listener.TopicEventListener.onEvent(TopicEventListener.java:53)
        at org.wso2.carbon.cep.core.listener.BrokerEventListener.onEvent(BrokerEventListener.java:58)
        at org.wso2.carbon.broker.core.internal.brokers.ws.SubscriptionMessageReceiver.invokeBusinessLogic(SubscriptionMessageReceiver.java:69)
        at org.apache.axis2.receivers.AbstractMessageReceiver.receive(AbstractMessageReceiver.java:110)
        at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
        at org.apache.axis2.transport.http.HTTPTransportUtils.processHTTPPostRequest(HTTPTransportUtils.java:172)
        at org.apache.axis2.transport.http.AxisServlet.doPost(AxisServlet.java:146)
        at org.wso2.carbon.core.transports.CarbonServlet.doPost(CarbonServlet.java:231)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:641)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:722)
        at org.eclipse.equinox.http.servlet.internal.ServletRegistration.handleRequest(ServletRegistration.java:90)
        at org.eclipse.equinox.http.servlet.internal.ProxyServlet.processAlias(ProxyServlet.java:111)
        at org.eclipse.equinox.http.servlet.internal.ProxyServlet.service(ProxyServlet.java:67)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:722)
        at org.wso2.carbon.tomcat.ext.servlet.DelegationServlet.service(DelegationServlet.java:68)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:305)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210)
        at org.wso2.carbon.tomcat.ext.filter.CharacterSetFilter.doFilter(CharacterSetFilter.java:61)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:225)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:123)
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:472)
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:168)
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:98)
        at org.wso2.carbon.tomcat.ext.valves.CompositeValve.invoke(CompositeValve.java:172)
        at org.wso2.carbon.tomcat.ext.valves.CarbonStuckThreadDetectionValve.invoke(CarbonStuckThreadDetectionValve.java:156)
        at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:927)
        at org.wso2.carbon.tomcat.ext.valves.CarbonContextCreatorValve.invoke(CarbonContextCreatorValve.java:52)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:118)
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:407)
        at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1001)
        at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:579)
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1653)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

顺便说一句,输入映射是正确的并且存储桶已成功部署 .

铲斗定义:

<?xml version="1.0" encoding="UTF-8"?>
<cep:bucket name="ABucket" xmlns:cep="http://wso2.org/carbon/cep">
    <cep:description/>
    <cep:engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <cep:property name="siddhi.persistence.snapshot.time.interval.minutes">0</cep:property>
        <cep:property name="siddhi.enable.distributed.processing">false</cep:property>
    </cep:engineProviderConfiguration>
    <cep:input brokerName="localBroker" topic="InputTopic">
        <cep:xmlMapping queryEventType="Tuple" stream="inStream">
            <cep:xpathDefinition namespace="name.space.aa" prefix="aa"/>
            <cep:xpathDefinition namespace="name.space.bb" prefix="bb"/>
            <cep:property name="product" type="java.lang.String" xpath="//aa:cefxml/bb:product"/>
            <cep:property name="time" type="java.lang.String" xpath="//aa:cefxml/bb:time"/>
            <cep:property name="name" type="java.lang.String" xpath="//aa:cefxml/bb:name"/>
            <cep:property name="vendor" type="java.lang.String" xpath="//aa:cefxml/bb:vendor"/>
            <cep:property name="severity" type="java.lang.Integer" xpath="//aa:cefxml/bb:severity"/>
        </cep:xmlMapping>
    </cep:input>
    <cep:query name="SendEventQuery">
        <cep:expression><![CDATA[from inStream[severity == 1] as a1
join inStream[name == 'name1'] as a2
on a1.time == a2.time
insert into midStream a1.product as a1_product;]]></cep:expression>
    </cep:query>
    <cep:query name="JoinQuery">
        <cep:expression><![CDATA[from midStream
join inStream[vendor == 'aVendor'] as a3
on a3.product == midStream.a1_product
insert into outStream a3.time as a3_time;]]></cep:expression>
        <cep:output brokerName="activemqJmsBroker" topic="OutputTopic">
            <cep:xmlMapping>
                <xmlMapping>
                    <data:DataEv xmlns:data="http://ws.cdyne.com/"
            xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
                        <data:a3_time>{a3_time}</data:a3_time>
                    </data:DataEv>
                </xmlMapping>
            </cep:xmlMapping>
        </cep:output>
    </cep:query>
</cep:bucket>

1 回答

  • 0

    修改第二个查询,如下所示,并尝试:

    from midStream
    join inStream[vendor == 'aVendor'] as a3
    on a3.product == midStream.a1_product
    insert into outStream a3.time as a3_time;
    

相关问题