基于Nested Document的RealtimeGet实现
背景
原生的solr中有/get 的request handler实现(org.apache.solr.handler.component.RealTimeGetComponent),实现原理是,先到tlog的内存中去找记录,找不到再到indexSearch中去找。这样当用户利用solrj客户端提交到solr服务端之后,可以不需要等待服务端softcommit,就能立即从服务端中取得最新提交的记录信息。
这样可以避免数据脏写的问题,但是,collection的索引结构使用Nested Document,客户端提交一个Nested Document 记录之后,再使用solrj调用“getById”方法,只能返回parent document了。因为这个原因,在生产环境中,对同一条记录的两次业务操作时间间隔比soft commit的时间周期短,就会产生数据脏写的问题。
办法
要解决这个问题,需要扩展solr的SearchComponent,实现通过Id,可以将文档的子文档(child docuemnt)全部加载。
扩展SearchComponent
public class NestRealtimeGetComponet extends SearchComponent {
public static final String COMPONENT_NAME = "nestget";
@Override
public void prepare(ResponseBuilder rb) throws IOException {
SolrQueryRequest req = rb.req;
SolrQueryResponse rsp = rb.rsp;
SolrParams params = req.getParams();
if (!params.getBool(COMPONENT_NAME, false)) {
return;
}
// Set field flags
ReturnFields returnFields = new SolrReturnFields(rb.req);
rb.rsp.setReturnFields(returnFields);
}
@Override
public void process(ResponseBuilder rb) throws IOException {
SolrQueryRequest req = rb.req;
SolrQueryResponse rsp = rb.rsp;
SolrParams params = req.getParams();
if (!params.getBool(COMPONENT_NAME, false)) {
return;
}
String id = params.get("id");
SchemaField idField = req.getSchema().getUniqueKeyField();
FieldType fieldType = idField.getType();
BytesRefBuilder idBytes = new BytesRefBuilder();
fieldType.readableToIndexed(id, idBytes);
SolrCore core = req.getCore();
SolrInputDocument doc = RealTimeGetComponent.getInputDocumentFromTlog(
core, idBytes.get());
SolrDocumentList docList = new SolrDocumentList();
if (doc != null) {
docList.add(convertDocument(doc));
docList.setNumFound(1);
} else {
RefCounted<SolrIndexSearcher> searchHolder = req.getCore()
.getSearcher();
SolrIndexSearcher searcher = searchHolder.get();
// 取得transfer
DocTransformer transformer = rsp.getReturnFields().getTransformer();
if (transformer != null) {
ResultContext context = new BasicResultContext(null,
rsp.getReturnFields(), null, null, req);
transformer.setContext(context);
}
try {
int docid = -1;
long segAndId = searcher.lookupId(idBytes.get());
if (segAndId >= 0) {
int segid = (int) segAndId;
LeafReaderContext ctx = searcher.getTopReaderContext()
.leaves().get((int) (segAndId >> 32));
docid = segid + ctx.docBase;
}
if (docid >= 0) {
Document luceneDocument = searcher.doc(docid, rsp
.getReturnFields().getLuceneFieldNames());
SolrDocument d = toSolrDoc(luceneDocument,
core.getLatestSchema());
searcher.decorateDocValueFields(d, docid,
searcher.getNonStoredDVs(true));
if (transformer != null) {
transformer.transform(d, docid, 0);
}
docList.add(d);
docList.setNumFound(1);
}
} finally {
searchHolder.decref();
}
}
rb.rsp.addResponse(docList);
}
private static SolrDocument toSolrDoc(Document doc, IndexSchema schema) {
SolrDocument out = new SolrDocument();
for (IndexableField f : doc.getFields()) {
// Make sure multivalued fields are represented as lists
Object existing = out.get(f.name());
if (existing == null) {
SchemaField sf = schema.getFieldOrNull(f.name());
// don't return copyField targets
if (sf != null && schema.isCopyFieldTarget(sf))
continue;
if (sf != null && sf.multiValued()) {
List<Object> vals = new ArrayList<>();
vals.add(f);
out.setField(f.name(), vals);
} else {
out.setField(f.name(), f);
}
} else {
out.addField(f.name(), f);
}
}
return out;
}
protected SolrDocument convertDocument(SolrInputDocument doc) {
SolrDocument sdoc = new SolrDocument();
for (String k : doc.getFieldNames()) {
sdoc.setField(k, doc.getFieldValue(k));
}
if (doc.hasChildDocuments()) {
for (SolrInputDocument s : doc.getChildDocuments()) {
sdoc.addChildDocument(convertDocument(s));
}
}
return sdoc;
}
}
solrconfig.xml中的配置:
<searchComponent name="nestget"
class="com.qlangtech.tis.solrextend.handler.component.NestRealtimeGetComponet" />
requestHandler name="/select" class="solr.SearchHandler">
<lst name="defaults">
<str name="echoParams">explicit</str>
<int name="rows">10</int>
<str name="df">text</str>
</lst>
<arr name="last-components">
<str>nestget</str>
</arr>
</requestHandler>
客户端查询示例:
SolrQuery query = new SolrQuery();
query.setParam("nestget", true);
query.set("id", pid);
query.setQuery("id:0");
query.setFields("*"
, "[child parentFilter=type:p childFilter=\"{!terms f=id}" + cid + "\" limit=100]");
QueryResponse r = this.client.query(collection, pid, query);
SolrDocumentList doclist = r.getResults();
for (SolrDocument d : doclist) {
System.out.println(d.get("id"));
System.out.println();
if (d.getChildDocumentCount() > 0) {
for (SolrDocument c : d.getChildDocuments()) {
StringBuffer f = new StringBuffer();
for (String key : c.getFieldNames()) {
f.append(key).append(":").append(c.getFirstValue(key));
}
System.out.println(f.toString());
}
}
}