本文介紹了如何使用最新的Java SDK 3.1.2在Couchbase中執行批量插入的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我能夠使用2.9.7等較舊版本的Java SDK執行批插入,代碼如下。
公共作廢插入全部(收款單){
Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(final JsonDocument docToInsert) {
return couchbaseConfig.catalogBucket().async().insert(docToInsert)
.doOnError((Throwable throwable) -> log.error(
"Exception {} occured while inerting document {} to cb", throwable.getMessage(),
docToInsert));
}
}).last().toBlocking().single();
}
我們要求升級到最新版本的Java SDK 3.1.2。在Couchbase文檔方面,我無法獲得太多幫助。非常感謝下面的任何鏈接
推薦答案
Batching上的Couchbase Java SDK 3.x文檔假定您熟悉反應式編程,尤其是Project Reactor。
要更新SDK 3的代碼,您需要:
-
提供您自己的
JsonDocument
類。寫入
Collection
而不是Bucket
。從RxJava遷移到反應器(或使用適配器庫)。
提供您自己的JsonDocument
類
SDK 3不再需要或提供像SDK 2的JsonDocument
這樣的文檔類。您可以隨心所欲地為文檔ID和內容建模。下面是一個可以幫助您過渡到SDK 3的類:
public class JsonDocument {
private final String id;
private final JsonObject content;
public JsonDocument(String id, JsonDocument content) {
this.id = id;
this.content = content;
}
public String getId() { return id;}
public JsonObject getContent() { return content;}
@Override
public String toString() {
return "JsonDocument{id='" + id + "', content=" + content + "}";
}
}
寫入Collection
而不是Bucket
在SDK 3中,每個存儲桶都有一個或多個作用域。每個作用域包含一個或多個集合。作用域有助于多租戶;可以為每個客戶或部署分配其自己的作用域。集合可幫助您組織文檔;例如,您可以將widgets
和invoices
放在單獨的集合中。
每個存儲桶都有一個默認作用域,并且該作用域有一個默認集合。
Couchbase Server7將支持作用域和集合,但SDK 3要求您現在就考慮它們。以前屬于Bucket
類的所有Get/Insert/Remove/等方法都已移至Collection
類。
幸運的是,有一種方便的方法可以訪問存儲桶的默認集合。
這對您的代碼意味著什么?在SDK 2中,您有:
AsyncBucket catalog = couchbaseConfig.catalogBucket().async();
在SDK 3中,您可以編寫:
ReactiveCollection catalog = couchbaseConfig.catalogBucket()
.defaultCollection()
.reactive();
從RxJava遷移到反應器
您可能已經知道,Couchbase Java SDK 2.x使用RxJava作為其反應性模型。SDK 3改用了反應器。概念基本相同,但反應原語的名稱不同:
Obseravble<T>
-&>Flux<T>
Single<T>
-&>Mono<T>
Completable
-&>Mono<Void>
Reactor reference documentation是您的朋友。
將所有這些放在一起
假設documents
是上面提到的JsonDocument
類的列表,則SDK 3中的代碼可能如下所示:
ReactiveCollection destCollection = couchbaseConfig.catalogBucket()
.defaultCollection()
.reactive();
Flux.fromIterable(documents)
.flatMap(docToInsert ->
destCollection.insert(docToInsert.getId(), docToInsert.getContent())
.doOnError(throwable -> log.error(
"Exception {} occurred while inserting document {} to cb",
throwable.getMessage(), docToInsert)))
.blockLast();
此代碼(以及SDK 2版本)的一個問題是,如果任何插入失敗,則不會插入剩余的文檔。如果要繼續插入其他文檔,可以使用onErrorResume()
而不是doOnError()
:
Flux.fromIterable(documents)
.flatMap(docToInsert ->
destCollection.insert(docToInsert.getId(), docToInsert.getContent())
.onErrorResume(throwable -> {
log.error("Exception {} occurred while inserting document {} to cb",
throwable.getMessage(), docToInsert);
return Mono.empty();
}))
.blockLast();
這篇關于如何使用最新的Java SDK 3.1.2在Couchbase中執行批量插入的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,