经常遇到这样的场景,13点-14点的时候flink程序发生了故障,或者集群崩溃,导致实时程序挂掉1小时,程序恢复的时候想把程序倒回13点或者更前,重新消费kafka中的数据.
下面的代码就是根据指定时间戳(也可以换算成时间)开始消费数据,支持到这样就灵活了,可以在启动命令中加个参数,然后再配个守护程序来控制程序.
flink代码
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.cdp.kafka.KafkaOffsetFind
object flinkkafka1 {
def main(args: Array[String]): Unit = {
/** ***************************************************************************************************************
* kafka info
*/
val zkCluster = "localhost:2181"
val kafkaCluster = "localhost:9092"
val topic = "cdp20"
val timestamp = 1519804800000L
/** ***************************************************************************************************************
* flink env
*/
val env = StreamExecutionEnvironment.getExecutionEnvironment
/** ***************************************************************************************************************
* create kafka stream
*/
val props = new Properties()
props.setProperty("bootstrap.servers", kafkaCluster)
props.setProperty("zookeeper.connect", zkCluster)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("group.id", "cdp20-c1")
/* ***********************************************************************************************************
* stream
*/
//找到时间戳对应偏移量
val offsetFinder = new KafkaOffsetFind[String]
val offset = offsetFinder.useTimestamp(timestamp,topic,props)
print(offset)
val kafkaOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]
for (o <- offset) {
kafkaOffsets.put(new KafkaTopicPartition(topic, o._1), o._2)
}
//创建根据时间消费kafka的数据流
val kafkaTime = env
.addSource {
new FlinkKafkaConsumer010[String](topic,
new KeyedDeserializationSchemaWithKey(new DefaultStringDeserializer),
props)
.setStartFromSpecificOffsets(kafkaOffsets)
}
/** ***************************************************************************************************************
* exec
*/
kafkaTime.print()
/** ***************************************************************************************************************
* flink execute
*/
env.execute("flink-kafka")
}
}
kafka根据时间找偏移量代码
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._
/* ***********************************************************************************************************
* 作者:陈大炮
* 时间:2018-02-28
* 内容:根据时间消费kafka
* 使用unix时间戳,查找kafka分区对应的偏移量
*/
class KafkaOffsetFind[T] {
//超时时间
val POLL_TIMEOUT = 2000
//使用时间查询
def useTimestamp(timestamp: Long, topic: String, kafkaProps: Properties): List[(Int, Long)] = {
//创建消费者,获得消费者分区
val consumer = createConsumer(kafkaProps)
consumer.subscribe(util.Arrays.asList(topic))
consumer.poll(POLL_TIMEOUT)
val partitions = consumer.assignment().asScala.toList
//拼出一个查询map
val findMap = new util.HashMap[TopicPartition, java.lang.Long]
partitions
.foreach {
c =>
findMap.put(new TopicPartition(topic, c.partition()), timestamp)
}
//使用查询map去获得偏移量
val offsetMap = consumer.offsetsForTimes(findMap)
//返回前关闭下消费者
consumer.close()
//返回分区号和对应的偏移量
partitions.map {
p =>
(p.partition(), offsetMap.get(new TopicPartition(topic, 0)).offset())
}
}
//创建消费者
protected def createConsumer(kafkaProps: Properties): KafkaConsumer[String, T] = {
val props = kafkaProps.clone().asInstanceOf[Properties]
props.put("enable.auto.commit", "false")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
new KafkaConsumer[String, T](props)
}
}
注意事项(由漂泊的美好提供) 1.使用KafkaConsumer.offsetsForTimes要确认集群已开启log.message.timestamp.type参数 2.client端要使用0.10.*的客户端发送数据,使用低版本会造成数据格式不同问题
参考内容 http://blog.csdn.net/forrest_ou/article/details/78978575 https://github.com/noris-network/KafkaOffsetFinder