首页 文章

在Node.js中为Dataflow作业传递--staging-location参数的正确字段是什么?

提问于
浏览
0

我想知道我是否遇到了一个错误 . A写了一段Node.js代码来触发“GCS Text to PubSub”数据流 . 在将文件上载到GCS存储桶时触发该功能 . 但它永远不会成功执行:“textPayload:”运行数据流模板的问题,错误是:{错误:收到无效的JSON有效负载 . 未知名称“staging_location”:找不到字段 . “这是我指定作业的暂存位置的语法问题 . 我尝试过”staginglocation“,”stagingLocation“等等......没有一个有效 .

这是我的代码 . 谢谢你的帮助 .

var {google} = require('googleapis');

exports.moveDataFromGCStoPubSub = (event, callback) => {


const file = event.data;
const context = event.context;

console.log(`Event ${context.eventId}`);
console.log(`  Event Type: ${context.eventType}`);
console.log(`  Bucket: ${file.bucket}`);
console.log(`  File: ${file.name}`);
console.log(`  Metageneration: ${file.metageneration}`);
console.log(`  Created: ${file.timeCreated}`);
console.log(`  Updated: ${file.updated}`);

  google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;
     }

 console.log(projectId);

 const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
 console.log(`gs://${file.bucket}/${file.name}`);
 dataflow.projects.templates.create({
   projectId: projectId,
   resource: {
    parameters: {
        inputFile: `gs://${file.bucket}/${file.name}`,
        outputTopic: `projects/iot-fitness-198120/topics/MemberFitnessData`,
      },
      jobName: 'CStoPubSub',
      gcsPath: 'gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub',
      stagingLocation: 'gs://fitnessanalytics-tmp/tmp'    
    }
 }, function(err, response) {
   if (err) {
     console.error("problem running dataflow template, error was: ", err);
   }
   console.log("Dataflow template response: ", response);
   callback();
 });

   });

 callback();
};

2 回答

  • 0

    我认为这实际上是不可能的 .

    查看the documentation for the Dataflow API itself,'s nothing like a staging location in the parameter section, and the library you'重新使用基本上是此API的包装器 .

    我有点惊讶它改变了参数的名称 .

  • 0

    所以我终于开始工作了 . 这确实是参数部分中的语法问题 . 下面的代码就像一个魅力:

    var {google} = require('googleapis');
    
    exports.moveDataFromGCStoPubSub = (event, callback) => {
    
    
    const file = event.data;
    const context = event.context;
    
    console.log(`Event ${context.eventId}`);
    console.log(`  Event Type: ${context.eventType}`);
    console.log(`  Bucket: ${file.bucket}`);
    console.log(`  File: ${file.name}`);
    console.log(`  Metageneration: ${file.metageneration}`);
    console.log(`  Created: ${file.timeCreated}`);
    console.log(`  Updated: ${file.updated}`);
    
      google.auth.getApplicationDefault(function (err, authClient, projectId) {
         if (err) {
           throw err;
         }
    
     console.log(projectId);
    
     const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
     console.log(`gs://${file.bucket}/${file.name}`);
     dataflow.projects.templates.create({
      gcsPath: 'gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub', 
      projectId: projectId,
       resource: {
        parameters: {
            inputFilePattern: `gs://${file.bucket}/${file.name}`,
            outputTopic: 'projects/iot-fitness-198120/topics/MemberFitnessData2'
          },
        environment: {
          tempLocation: 'gs://fitnessanalytics-tmp/tmp'
        },
          jobName: 'CStoPubSub',
          //gcsPath: 'gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub',    
        }
     }, function(err, response) {
       if (err) {
         console.error("problem running dataflow template, error was: ", err);
       }
       console.log("Dataflow template response: ", response);
       callback();
     });
    
       });
    
     callback();
    };
    

相关问题