首页 文章

在Flink错误中添加Cassandra作为接收器:所有尝试查询的主机都失败了

提问于
浏览
0

我正在https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html跟进一个例子,将Cassandra连接为Flink中的接收器

我的代码如下所示

public class writeToCassandra {

    private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE test WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
    private static final String createTable = "CREATE TABLE test.cassandraData(id varchar, heart_rate varchar, PRIMARY KEY(id));" ;


    private final static Collection<String> collection = new ArrayList<>(50);

    static {
        for (int i = 1; i <= 50; ++i) {
            collection.add("element " + i);
        }
    }

    public static void main(String[] args) throws Exception {


        //setting the env variable to local
        StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.createLocalEnvironment(1);


        DataStream<Tuple2<String, String>> dataStream = envrionment
                .fromCollection(collection)
                .map(new MapFunction<String, Tuple2<String, String>>() {

                    final String mapped = " mapped ";
                    String[] splitted;

                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        splitted = s.split("\\s+");
                        return Tuple2.of(
                                UUID.randomUUID().toString(),
                                splitted[0] + mapped + splitted[1]
                        );
                    }
                });


        CassandraSink.addSink(dataStream)
                .setQuery("INSERT INTO test.cassandraData(id,heart_rate) values (?,?);")
                .setHost("127.0.0.1")
                .build();


        envrionment.execute();

    } //main




} //writeToCassandra

我收到以下错误

Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)

2 回答

  • 0

    不确定是否总是需要这个,但我设置CassandraSink的方式是这样的:

    CassandraSink
        .addSink(dataStream)
        .setClusterBuilder(new ClusterBuilder() {
            @Override
            protected Cluster buildCluster(Cluster.Builder builder) {
                return Cluster.builder()
                    .addContactPoints(myListOfCassandraUrlsString.split(","))
                    .withPort(portNumber)
                    .build();
            }
        })
        .build();
    

    我有注释的dataJream返回的POJO所以我不需要查询,但你只需要在“.addSink(...)”行之后加入“.setQuery(...)” .

  • 1

    该异常仅表示示例程序无法访问C *数据库 .

    • flink-cassandra-connector提供流API以连接到指定的C *数据库 . 因此,您需要运行C *实例 .

    • 每个流作业都被推送/序列化到任务管理器运行的节点 . 在您的示例中,假设C *与TM节点在同一节点上运行 . 另一种方法是将C *地址从127.0.0.1更改为公共地址 .

相关问题