本文介紹了尋找春天的時間戳-卡夫卡的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我正在嘗試尋找時間戳功能,但由于某些原因,它不適合我。
在我的Producer中,我有下一個代碼:
ProducerRecord<String, Obj> producer = new ProducerRecord<>("topic", 0, System.currentTimeMillis() - 10000, "key", obj);
kafkaTemplate.send(producer);
在我的卡夫卡監聽器中,我試圖尋找比上面的時間戳更高的時間戳的偏移量:
@Component
@RequiredArgsConstructor
@KafkaListener(id = "container",
topics = "topic",
clientIdPrefix = "init_client",
autoStartup = "true")
public class KafkaList implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
long timestamp = System.currentTimeMillis()+60*1000;
log.info("Search for a time that is great or equal then {}", timestamp);
callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);
}
@KafkaHandler
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, Obj obj,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) {
log.info("Received message timestamp: {}, date: {}", timestamp,
Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).toLocalDate());
}
}
在日志中,我看到下一個輸出:
Search for a time that is great or equal then 1613079865328
Received message timestamp: 1613079798676, date: 2021-02-11
卡夫卡主題1613079798676中的時間戳值低于我的搜索值1613079865328,為什么消費者會選擇這個偏移量?
推薦答案
我剛剛用您的代碼測試了它,它對我來說工作正常;我在日志中看到了這一點…
2021-02-16 11:30:16.587信息36721-[o66163492-0-C-1]com.example.demo.So66163492應用程序:搜索大于或等于1613493076587的時間
2021-02-16 11:30:16.590信息36721-[o66163492-0-C-1]o.a.k.clients.Consumer:[Consumer ClientID=Consumer-so66163492-1,groupID=so66163492]正在尋找分區so66163492-0的偏移量1
2021-02-16 11:30:16.590信息36721-[o66163492-0-C-1]o.s.k.l.KafkaMessageListenerContainer:so66163492:已分配分區:[so66163492-0]
2021-02-16 11:30:16.611信息36721-[o66163492-0-C-1]com.example.demo.So66163492應用程序:收到消息時間戳:1613529016472 qux
@SpringBootApplication
public class So66163492Application extends AbstractConsumerSeekAware {
private static final Logger log = LoggerFactory.getLogger(So66163492Application.class);
public static void main(String[] args) {
SpringApplication.run(So66163492Application.class, args);
}
@KafkaListener(id = "so66163492", topics = "so66163492")
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String obj,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) {
log.info("Received message timestamp: {} {}", timestamp, obj);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so66163492").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send(new ProducerRecord<>("so66163492", 0, System.currentTimeMillis() - 10_000, "foo", "bar"));
template.send(new ProducerRecord<>("so66163492", 0, System.currentTimeMillis() + 36_000_000, "baz", "qux"));
};
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
long timestamp = System.currentTimeMillis() + 60 * 1000;
log.info("Search for a time that is great or equal then {}", timestamp);
callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);
}
}
這篇關于尋找春天的時間戳-卡夫卡的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,