首页 文章

使用node.js流处理错误

提问于
浏览
122

处理流错误的正确方法是什么?我已经知道你可以听到一个'错误'事件,但我想了解一些关于任意复杂情况的更多细节 .

对于初学者,当你想做一个简单的管道链时你会怎么做:

input.pipe(transformA).pipe(transformB).pipe(transformC)...

那么如何正确创建其中一个转换以便正确处理错误?

更多相关问题:

  • 发生错误时,'end'事件会发生什么?它永远不会被解雇吗?它有时会被解雇吗?它取决于转换/流吗?这里的标准是什么?

  • 有没有通过管道传播错误的机制?

  • 域名有效地解决了这个问题吗?例子很好 .

  • 来自'error'事件的错误是否有堆栈跟踪?有时?决不?有没有办法从他们那里得到一个?

6 回答

  • 0

    transform

    变换流既可读又可写,因此非常好'middle'流 . 出于这个原因,它们有时被称为 through 流 . 它们以这种方式类似于双工蒸汽,除了它们提供了一个很好的界面来操纵数据而不是仅通过它来发送数据 . 变换流的目的是在数据通过流传输时操纵数据 . 例如,您可能想要执行一些异步调用,或者派生几个字段,重新映射一些内容等 .


    Where you might put a transform stream


    有关如何创建转换流,请参阅herehere . 你所要做的就是:

    • 包括流模块

    • 实例化(或继承自)Transform类

    • 实现 _transform 方法,该方法需要 (chunk, encoding, callback) .

    块是你的数据 . 如果你在 objectMode = true 工作,大多数时候你不需要担心编码 . 处理完块后调用回调 . 然后将该块推送到下一个流 .

    如果你想要一个很好的帮助模块,让你真的很容易通过流,我建议through2 .

    对于错误处理,请继续阅读 .

    pipe

    在管道链中,处理错误确实非常重要 . 根据this thread .pipe()不是为了转发错误而构建的 . 所以像......

    var a = createStream();
    a.pipe(b).pipe(c).on('error', function(e){handleError(e)});
    

    ...只会监听流 c 上的错误 . 如果在 a 上发出错误事件,则不会传递该事件,实际上会抛出 . 要正确执行此操作:

    var a = createStream();
    a.on('error', function(e){handleError(e)})
    .pipe(b)
    .on('error', function(e){handleError(e)})
    .pipe(c)
    .on('error', function(e){handleError(e)});
    

    现在,虽然第二种方式更详细,但您至少可以保留错误发生位置的上下文 . 这通常是件好事 .

    我觉得有一个图书馆很有帮助,但如果你有一个案例,你只想在目的地捕获错误,而你并不在乎它在哪里发生的事情是event-stream .

    end

    触发错误事件时,不会(显式)触发结束事件 . 发出错误事件将结束流 .

    domains

    根据我的经验,域名在大多数情况下都非常有效 . 如果您有未处理的错误事件(即在没有侦听器的流上发出错误),服务器可能会崩溃 . 现在,正如上面的文章所指出的,您可以将流包装在一个应该正确捕获所有错误的域中 .

    var d = domain.create();
     d.on('error', handleAllErrors);
     d.run(function() {
         fs.createReadStream(tarball)
           .pipe(gzip.Gunzip())
           .pipe(tar.Extract({ path: targetPath }))
           .on('close', cb);
     });
    

    域的美妙之处在于它们将保留堆栈跟踪 . 虽然事件流也很好 .

    如需更多阅读,请查看stream-handbook . 相当深入,但超级有用,并提供了一些很好的链接到许多有用的模块 .

  • 1

    域名已弃用 . 你不需要它们 .

    对于这个问题,变换或可写之间的区别并不那么重要 .

    mshell_lauren的答案很棒,但作为替代方案,您还可以在您认为可能出错的每个流上显式侦听错误事件 . 如果您愿意,可以重用处理函数 .

    var a = createReadableStream()
    var b = anotherTypeOfStream()
    var c = createWriteStream()
    
    a.on('error', handler)
    b.on('error', handler)
    c.on('error', handler)
    
    a.pipe(b).pipe(c)
    
    function handler (err) { console.log(err) }
    

    这样做可以防止臭名昭着的未捕获异常,如果其中一个流触发其错误事件

  • 6

    整个链中的错误可以使用一个简单的函数传播到最右边的流:

    function safePipe (readable, transforms) {
        while (transforms.length > 0) {
            var new_readable = transforms.shift();
            readable.on("error", function(e) { new_readable.emit("error", e); });
            readable.pipe(new_readable);
            readable = new_readable;
        }
        return readable;
    }
    

    可以使用如下:

    safePipe(readable, [ transform1, transform2, ... ]);
    
  • 177

    .on("error", handler) 仅处理Stream错误,但如果您使用自定义转换流, .on("error", handler) 不会捕获 _transform 函数内发生的错误 . 所以人们可以做这样的事情来控制应用程序流程: -

    _transform 函数中的 this 关键字是指 Stream 本身,它是 EventEmitter . 因此,您可以像下面这样使用 try catch 来捕获错误,然后将它们传递给自定义事件处理程序 .

    // CustomTransform.js
    CustomTransformStream.prototype._transform = function (data, enc, done) {
      var stream = this
      try {
        // Do your transform code
      } catch (e) {
        // Now based on the error type, with an if or switch statement
        stream.emit("CTError1", e)
        stream.emit("CTError2", e)
      }
      done()
    }
    
    // StreamImplementation.js
    someReadStream
      .pipe(CustomTransformStream)
      .on("CTError1", function (e) { console.log(e) })
      .on("CTError2", function (e) { /*Lets do something else*/ })
      .pipe(someWriteStream)
    

    这样,您可以将逻辑和错误处理程序分开 . 此外,您可以选择仅处理某些错误并忽略其他错误 .

  • 2

    如果您使用的是节点> = v10.0.0,则可以使用stream.pipelinestream.finished .

    例如:

    const { pipeline } = require('stream');
    const { finished } = require('stream');
    
    pipeline(
      input, 
      transformA, 
      transformB, 
      transformC, 
      (err) => {
        if (err) {
          console.error('Pipeline failed', err);
        } else {
          console.log('Pipeline succeeded');
        }
    });
    
    
    finished(input, (err) => {
      if (err) {
        console.error('Stream failed', err);
      } else {
        console.log('Stream is done reading');
      }
    });
    

    有关更多讨论,请参阅github PR .

  • 19

    通过创建一个Transform流机制并使用参数调用其回调 done 来使用Node.js模式为了传播错误:

    var transformStream1 = new stream.Transform(/*{objectMode: true}*/);
    
    transformStream1.prototype._transform = function (chunk, encoding, done) {
      //var stream = this;
    
      try {
        // Do your transform code
        /* ... */
      } catch (error) {
        // nodejs style for propagating an error
        return done(error);
      }
    
      // Here, everything went well
      done();
    }
    
    // Let's use the transform stream, assuming `someReadStream`
    // and `someWriteStream` have been defined before
    someReadStream
      .pipe(transformStream1)
      .on('error', function (error) {
        console.error('Error in transformStream1:');
        console.error(error);
        process.exit(-1);
       })
      .pipe(someWriteStream)
      .on('close', function () {
        console.log('OK.');
        process.exit();
      })
      .on('error', function (error) {
        console.error(error);
        process.exit(-1);
       });
    

相关问题