首页 文章

Google Cloud PubSub消息未通过回调处理

提问于
浏览
1

我正在尝试使用Google PubSub在两个服务之间传递和接收消息 . 但是,发送的某些消息似乎是随机丢弃的,并且不会被订阅者的回调方法处理 .

发送消息时,回调方法处理大约一半的消息 . 对于另一半,回调方法似乎根本没有被调用(没有记录信息) . 但是,消息仍然从主题中消失,并且不会重新发送 .

用于启动订户的代码:

logger = logging.getLogger(LOGGER_NAME)
logger.info('Starting the pubsub subscriber')
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(GOOGLE_CLOUD_PROJECT, SUBSCRIPTION_NAME)
subscriber.subscribe(subscription_path, callback=callback)
while True:
    try:
        sleep(60)
    except Exception as e:
        // Log exception

回调方法:

def callback(message):
    logger = logging.getLogger(LOGGER_NAME)
    logger.info(f'Recieved callback with message: {message}', extra = {'callback_message': message}  )
    // Process message

该错误似乎在订户方 . 消息是从发布者发送的,如果订阅者未连接到该主题,则消息不会消失 .

我曾尝试使用Flow Control来控制订阅者检索的邮件数量,但似乎没有任何效果 .

可以在不调用回调方法的情况下处理消息吗?消息可能会从主题中消失,还有其他一些原因吗?

EDIT: 结果是另一个服务正在从同一个订阅中读取,处理丢失的消息 .

1 回答

  • 0

    我知道你找到了问题的答案,但我认为列出调试此类问题的一些有用步骤是值得的:

    • 检查以确保实际发布的消息 . 当发布成功时,响应应该包括消息的ID,例如,作为Java Publish方法中APIFuture产生的字符串 .

    • 检查是否积压了积压的消息 . 您可以通过Stackdriver查看 subscription/oldest_unacked_message_agesubscription/num_undelivered_messages .

    • 检查您的订户是否设置了flow control,这使您无法及时收到所有消息 . 如果您有流控制集并且它阻止了所有消息的传递,您可能会看到堆栈驱动程序中未传递消息的数量正在增加 .

    • 确保您没有任何其他客户订阅同一订阅的消息 . 例如,您可能正在使用gcloud tool to pull并查看消息 . 在这种情况下,您可能不会在Stackdriver中看到未传递的消息数量增加 .

    如果在检查完所有内容后您不确定消息发生了什么,最好联系支持部门,告知您的项目名称和订阅,以及您认为未提供的任何消息的ID .

相关问题