首页 文章

Clojure - apache kafka的core.async接口

提问于
浏览
0

我正在使用clj-kafka,我正在尝试在REPL中创建一个 core.async 接口 .

我收到一些消息,但我的结构错了:我要么无法停止接收消息,要么必须再次启动 go 例程以接收更多消息 .

这是我的尝试:

(defn consume [topic]
  (let [consume-chan (chan)]
    (with-resource [c (consumer config)]
      shutdown
      (go (doseq [m (messages c "test")]
                   (>! chan message) ;; should I check the return value?
                   )))
    consume-chan)) ;; is it the right place to return a channel ?


  (def consume-chan (consume "test"))
  ;;(close! consume-chan)

  (go (>! consume-chan "hi")) ;; manual test, but I have some messages in Kafka already

  (def cons-ch (go
                 (with-resource [c (consumer config)]
                   shutdown
                   (doseq [m (messages c "test")]
                     (>! consume-chan m)))))  ;; should I check something here ?

  ;;(close! cons-ch)

  (def go-ch
    (go-loop []
      (if-let [km (<! consume-chan)]
        (do  (println "Got a value in this loop:" km)
              (recur))
        (do (println "Stop recurring - channel closed")))))

  ;;(close! go-ch)

如何使用 core.async 接口使用延迟的消息序列?

1 回答

  • 0

    这就是我要做的事情:

    如果通道关闭,

    • >!<! 将返回nil,因此请确保在发生这种情况时循环退出 - 这样您就可以通过关闭通道轻松地从外部结束循环 .

    • 使用try / catch检查go块内的异常,并将返回值作为异常,这样它们就不会丢失 .

    • 检查读取值的异常,以从通道内捕获任何内容 .

    • go块返回一个通道,块内代码的返回值(如上面的例外)将放在通道上 . 检查这些通道是否有异常,可能需要重新抛出 .

    您现在可以写入这样的 Channels :

    (defn write-seq-to-channel
      [channel
       values-seq]
      (a/go
        (try
          (loop [values values-seq]
            (when (seq values)
              (when (a/>! channel (first values))
                (recur (rest values)))))
          (catch Throwable e
            e))))
    

    你读的是这样的:

    (defn read-from-channel-and-print
      [channel]
      (a/go
        (try
          (loop []
            (let [value (a/<! channel)]
              (when value
                (when (instance? Throwable value)
                  (throw value))
                (println "Value read:" value)
                (recur))))
          (catch Throwable e
            e))))
    

    您现在将拥有两个通道,因此请使用类似 alts!alts!! 的内容来检查您的循环是否已退出 . 完成后关闭通道 .

相关问题