我正在使用clj-kafka,我正在尝试在REPL中创建一个 core.async
接口 .
我收到一些消息,但我的结构错了:我要么无法停止接收消息,要么必须再次启动 go
例程以接收更多消息 .
这是我的尝试:
(defn consume [topic]
(let [consume-chan (chan)]
(with-resource [c (consumer config)]
shutdown
(go (doseq [m (messages c "test")]
(>! chan message) ;; should I check the return value?
)))
consume-chan)) ;; is it the right place to return a channel ?
(def consume-chan (consume "test"))
;;(close! consume-chan)
(go (>! consume-chan "hi")) ;; manual test, but I have some messages in Kafka already
(def cons-ch (go
(with-resource [c (consumer config)]
shutdown
(doseq [m (messages c "test")]
(>! consume-chan m))))) ;; should I check something here ?
;;(close! cons-ch)
(def go-ch
(go-loop []
(if-let [km (<! consume-chan)]
(do (println "Got a value in this loop:" km)
(recur))
(do (println "Stop recurring - channel closed")))))
;;(close! go-ch)
如何使用 core.async
接口使用延迟的消息序列?
1 回答
这就是我要做的事情:
如果通道关闭,
>!
和<!
将返回nil,因此请确保在发生这种情况时循环退出 - 这样您就可以通过关闭通道轻松地从外部结束循环 .使用try / catch检查go块内的异常,并将返回值作为异常,这样它们就不会丢失 .
检查读取值的异常,以从通道内捕获任何内容 .
go块返回一个通道,块内代码的返回值(如上面的例外)将放在通道上 . 检查这些通道是否有异常,可能需要重新抛出 .
您现在可以写入这样的 Channels :
你读的是这样的:
您现在将拥有两个通道,因此请使用类似
alts!
或alts!!
的内容来检查您的循环是否已退出 . 完成后关闭通道 .