-
0 votesanswersviews
Lagom将直接的事件流(记录)插入Cassandra
我是Lagom和Persistence Entities Database概念的新手 . 我正在构建Streaming Analytics Engine . 我的每个分析都将作为独立的微服务工作,并根据其设计理念为每个单独的微服务保存在自己的数据库中(我的案例是Cassandra) . 我正在使用Flink&Spark进行流分析,然后使用Phantom for Flink(Cassandra的Sc... -
0 votesanswersviews
如何在Flink CEP中使用“every”模式运算符
在Esper CEP引擎事件模式中,有 Every 运算符,表示当由every关键字限定的子表达式求值为true或false时,应该重新启动pattern子表达式 . According to this document,我们可以用不同的方式使用 every 运算符,如: A -> B every ( A -> B ) every A -> B A -&g... -
1 votesanswersviews
Flink CEP不具有确定性
我在没有集群的情况下在本地运行以下代码: val count = new AtomicInteger() val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val text: DataStream[String] = env.readTextFile("file:///fl... -
0 votesanswersviews
在Apache Flink中动态添加模式而无需重新启动作业
我的用例是我想将不同的CEP模式应用于同一个数据流 . CEP模式是动态的,我希望它们可以添加到flink而无需重新启动作业 . 虽然可以通过实现IterativeCondition的自定义类来处理所有条件,但我的主要问题是时间条件只接受TimeWindow;这是无法处理的 . 是否有某种方法可以根据输入元素设置传递给.within()的值? 这里有类似的问题:Flink and Dynamic... -
1 votesanswersviews
是否可以在KeyedStream(Apache Flink)中为每个键生成水印?
我正在努力实现一个用例,其中不同的物理设备正在发送事件,并且由于网络/电源问题,在flink源处接收事件可能会有延迟 . flink作业中的一个操作符是Pattern操作符,并且某些模式对时间敏感,因此我使用事件时间特性 . 但问题出现在来自特定设备的事件中存在不可预测的延迟时,这会导致这些事件被丢弃(因为我无法真正定义允许延迟的静态边界) . 由于我使用KeyedStream,键入源设备ID,... -
3 votesanswersviews
Flink窗口和状态维护
我正在研究apache flink的数据流,我几乎没有问题 . 任何帮助是极大的赞赏 . 谢谢 . 1)创建翻滚窗口是否有任何限制 . 例如,如果我想为每个用户ID创建一个翻滚窗口,持续2秒,让我们说如果我有超过1000万用户ID则会出现问题 . (我正在使用keyBy用户ID,然后创建一个timeWindow 2秒)?如何在flink内部维护这些窗口? 2)我查看了循环分区的重新 balanc... -
0 votesanswersviews
在代码中访问Flink的系统指标到终端,而不是像JMX那样使用任何指标报告者
我使用JMX作为度量报告器来获取Flink指标,但有没有办法将其作为终端输出? 我想为每个运营商绘制 numRecordsInPerSecond 进行性能分析,我该怎么办? 我已经看到了累加器的一些例子,但它没有给我正确的见解我如何进行Flink的性能分析 . 我会在这里举个例子 这是我的Flink程序的执行计划,我有多个问题,但我想问基本问题 如何测量每个运算符的延迟,然后将其相加以计算复... -
1 votesanswersviews
动态调节fl fl Kafka 来源
我们正在使用多个kafka主题,但希望优先考虑其中一些(〜服务质量) . 根据我在网上发现的情况,我们的共识是不会限制运算符,而是限制源,更具体地说是反序列化器[1] . 我们如何在源中访问有关流环境状态的信息(即主题滞后于当前偏移的距离) . 目前,我们计划将整个设置转换为CoFlatMaps [2]并且具有针对所有主题发出当前偏移滞后的控制流 - 低优先级流运营商然后根据高优先级流的滞后休眠 ... -
0 votesanswersviews
TaskManager的Flink状态后端
我有一个Flink v1.2设置,1个JobManager,2个TaskManagers,每个都在自己的VM中 . 我将状态后端配置到文件系统,并在每个上述主机的情况下将其指向本地位置(state.backend.fs.checkpointdir:file:/// home / ubuntu / Prototype / flink / flink-checkpoints) . 我已将并行性设置为1... -
0 votesanswersviews
“广播状态”为Flink的CEP库的“动态模式”功能的实现解除阻碍是什么意思?
从Flink 1.5发布公告中,我们知道Flink现在支持“广播状态”,并且描述了“广播状态解锁Flink的CEP库的”动态模式“功能的实现 . ” 这是否意味着目前我们可以使用“广播状态”来实现没有Flink CEP的“动态模式”?另外我不知道在有或没有广播状态的Flink CEP实现“动态模式”时有什么区别?我将不胜感激如果有人可以用代码举例解释差异 . ============= Updat... -
1 votesanswersviews
是否可以在apache flink CEP中处理多个流?
我的问题是,如果我们有两个原始事件流,即 Smoke 和 Temperature ,我们想通过将运算符应用于原始流来查明复杂事件,即 Fire 是否已经发生,我们可以在Flink中执行此操作吗? 我问这个问题,因为我到目前为Flink CEP看到的所有例子都只包含一个输入流 . 如果我错了,请纠正我 . -
0 votesanswersviews
从Apache Flink中的Kafka经纪人处读取最新数据
我想收到从Kafka到Flink Program的最新数据,但Flink正在阅读历史数据 . 我已将 auto.offset.reset 设置为 latest ,如下所示,但它不起作用 properties.setProperty("auto.offset.reset", "latest"); Flink Programm使用以下代码从Kafka接收数据 ... -
0 votesanswersviews
与Fafka 1.1.0的Apache Flink 1.3.2连接问题
我正在使用Apache Flink 1.3.2集群 . 我们正在使用Kafka消息,并且自从将代理升级到1.1.0(从0.10.2开始)后,我们经常在日志中注意到这个错误: ERROR o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async Kafka commit failed. org.apache.kafka.... -
0 votesanswersviews
如何使用Flink提供的成本估算器类获得Flink的运营成本
我想对Flink CEP引擎进行性能分析,我遇到了这些类 org.apache.flink.optimizer.costs.CostEstimator; org.apache.flink.optimizer.costs.Costs; org.apache.flink.optimizer.costs.DefaultCostEstimator; 但问题是我不知道如何使用这个类 . 有人可以向我提... -
0 votesanswersviews
命名运算符,源,接收器和模式将显示在Flink执行计划UI中
我只想给运营商和消息来源命名 . 我们来举个例子吧 这是执行计划的屏幕截图,我采用了Flink Dashboard . 这里我有2个DataStreams源,然后我加入它们 . 我的问题是,我可以将这些来源命名为 EcgStream 和 Sp02 Stream ,例如加入为 Join1 吗? 原因我问这个问题是因为它使可视化更容易 . 另外,正如我在_1679406页面那样,在页面的最后,他们提... -
0 votesanswersviews
如何将Datadog中的指标与Flink中的执行计划运算符相关联?
在我的情况下,Flink正在将指标发送到Datadog . Datadog主机 Map 如下所示{我不知道为什么在这里显示我的延迟} Flink指标发送到localhost . 这里的问题是什么时候 flink-conf.yaml 文件配置如下 # adding metrics metrics.reporters: stsd , dghttp metrics.reporter.stsd.cl... -
0 votesanswersviews
在Flink错误中添加Cassandra作为接收器:所有尝试查询的主机都失败了
我正在https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html跟进一个例子,将Cassandra连接为Flink中的接收器 我的代码如下所示 public class writeToCassandra { private static final String CRE...