首页 文章

使用Kafka在应用程序上打开太多文件错误

提问于
浏览
4

我正在使用Kafka和Spark Streaming构建应用程序 . 输入数据来自第三部分流媒体,并在kafka主题上发布 . 此代码显示了Stream Proxy模块:这是我从流式传输中获取结果以及如何将它们发送到KafkaPublisher的方式(它只显示了草图):

def on_result_response(self,*args):
    self.kafkaPublisher.pushMessage(str(args[0]))

KafkaPublisher通过以下两种方法实现:

class KafkaPublisher:

def __init__(self,address,port,topic):
    self.kafka = KafkaClient(str(address)+":"+str(port))
    self.producer = SimpleProducer(self.kafka)
    self.topic=topic



def pushMessage(self,message):
    self.producer.send_messages(self.topic, message)
    self.producer = SimpleProducer(self.kafka, async=True)

这个应用程序是由这个主要推出的:

from StreamProxy import StreamProxy


streamProxy=StreamProxy("localhost",9092,"task1")
streamProxy.getStreaming(20)  #seconds of streaming

经过一些批处理(或多或少10秒)后,它启动了以下 exceptions

线程中的异常Thread-2354:Traceback(最近一次调用最后一次):文件“/usr/lib/python2.7/threading.py”,第801行,在__bootstrap_inner文件中“/usr/lib/python2.7/threading . py“,第754行,在运行文件”/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py“,第164行,在_send_upstream文件”/ usr / local / lib / python2中 . 7 / dist-packages / kafka / client.py“,第649行,在send_produce_request文件中”/usr/local/lib/python2.7/dist-packages/kafka/client.py“,第253行,在_send_broker_aware_request文件中”/ usr / local / lib / python2.7 / dist-packages / kafka / client.py“,第74行,在_get_conn文件中”/usr/local/lib/python2.7/dist-packages/kafka/conn.py“,第236行,连接错误:[Errno 24]打开文件太多线程中的异常Thread-2355:Traceback(最近一次调用最后一次):文件“/usr/lib/python2.7/threading.py”,第801行, __bootstrap_inner文件“/usr/lib/python2.7/threading.py”,第754行,在运行文件“/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py”,第164行,在_send_upstream文件中“/usr/local/lib/python2.7/dist-packages/kafka/client.py”,第649行,在send_produce_request文件中“/usr/local/lib/python2.7/dist-packages/kafka/client.py “,第253行,在_send_broker_aware_request文件中”/usr/local/lib/python2.7/dist-packages/kafka/client.py“,第74行,在_get_conn文件中”/usr/local/lib/python2.7/dist -packages / kafka / conn.py“,第236行,连接错误:[Errno 24]打开文件太多

请注意,有相同的消息有许多不同的例外,当然问题是发布者方面 .

1 回答

  • 1

    尝试删除行:

    self.producer = SimpleProducer(self.kafka, async=True)
    

相关问题