首页 文章

如何从Google App Engine应用运行Google Dataflow管道?

提问于
浏览
3

我需要定期运行Dataflow管道 . The FAQ for Dataflow陈述如下:

您可以通过GCE上的Google App Engine或自定义(CRON)作业流程自动执行管道 . SDK的未来版本将支持命令行选项,以实现对作业管理的更细粒度控制 .

我尝试使用以下代码从我的Java应用程序运行一个非常简单的管道:

public class MyAnalyticsServlet extends HttpServlet {
    @Override
    public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
        resp.setContentType("text/plain");
        if (req.getRequestURI().equals("/dataflow/test")) {
            DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
            options.setProject("redacted");
            options.setRunner(DataflowPipelineRunner.class);
            Pipeline p = Pipeline.create(options);
            p.apply(TextIO.Read.named("TestInput").from("gs://redacted/test/in.txt"))
                    .apply(new TestTransform())
                    .apply(TextIO.Write.named("TestOutput")
                            .to("gs://redacted/test")
                            .withNumShards(0));
            p.run();
        } else {
            resp.setStatus(404);
            resp.getWriter().println("Not Found");
            return;
        }
        resp.getWriter().println("OK");
    }
}

我收到以下错误:

java.lang.IllegalArgumentException: Methods [setRunner(Class), getRunner()] on [com.google.cloud.dataflow.sdk.options.PipelineOptions] do not conform to being bean properties.
    at com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
    at com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.validateClass(PipelineOptionsFactory.java:1059)
    ...

有任何想法吗?

1 回答

相关问题