4、lucene源码阅读



Lucene写文档

主类:IndexWriter

org.apache.lucene.index.IndexWriter

API 调用过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
IndexWriter.addDocument():
-> IndexWriter.updateDocument(): // 局部删除
docWriter.updateDocument(doc, analyzer, delNode);
-> DocumentsWriter.updateDocument():
dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);
-> DocumentsWriterPerThread.updateDocument():
consumer.processDocument();
-> DefaultIndexingChain.processDocument():
fieldCount = processField(field, fieldGen, fieldCount);
-> DefaultIndexingChain.processField():
storedFieldsConsumer.writeField(fp.fieldInfo, field);
-> StoredFieldsConsumer.writeField():
writer.writeField(info, field); // writer来源于IndexWriterConfig中的Codec
-> SimpleTextStoredFieldsWriter.writeField(); // 实际去写文档
1
2
3
4
5
6
7
8
9
10
11
SimpleTextStoredFieldsWriter.writeField();

public void writeField(FieldInfo info, IndexableField field) throws IOException {
write(FIELD);
write(Integer.toString(info.number));
newLine();

write(NAME);
write(field.name());
newLine();
}
1
2
3
public long addDocument(Iterable<? extends IndexableField> doc) {
return updateDocument((DocumentsWriterDeleteQueue.Node<?>) null, doc);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private long updateDocument(final DocumentsWriterDeleteQueue.Node<?> delNode,
Iterable<? extends IndexableField> doc) throws IOException {
ensureOpen();// 确保索引能够打开,因为Lucene允许多线程,可能拿不到锁
boolean success = false;
try {
// 关键:DocumentsWriter
long seqNo = docWriter.updateDocument(doc, analyzer, delNode);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true);
}
success = true;
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocument");
throw tragedy;
} finally {
if (success == false) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception updating document");
}
}
maybeCloseOnTragicEvent();
}
}

org.apache.lucene.index.DocumentsWriter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
long updateDocument(final Iterable<? extends IndexableField> doc, 
final Analyzer analyzer,
final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {

boolean hasEvents = preUpdate();
// class DocumentsWriterPerThreadPool.ThreadState extends ReentrantLock
// 本质是锁,配合DocumentsWriterPerThread完成Document的写操作
final ThreadState perThread = flushControl.obtainAndLock();

final DocumentsWriterPerThread flushingDWPT;
long seqNo;
try {
// This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
ensureOpen();
ensureInitialized(perThread);
assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
seqNo = dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);
} finally {
if (dwpt.isAborted()) {
flushControl.doOnAbort(perThread);
}
// We don't know whether the document actually
// counted as being indexed, so we must subtract here to
// accumulate our separate counter:
numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
}
final boolean isUpdate = delNode != null && delNode.isDelete();
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);

assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
perThread.lastSeqNo = seqNo;

} finally {
perThreadPool.release(perThread);
}

if (postUpdate(flushingDWPT, hasEvents)) {
seqNo = -seqNo;
}

return seqNo;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
startStoredFields(docState.docID);  // 开始存储Fields数据
try {
for (IndexableField field : docState.doc) {
fieldCount = processField(field, fieldGen, fieldCount);
}
} finally {
if (docWriter.hasHitAbortingException() == false) {
// Finish each indexed field name seen in the document:
for (int i=0;i<fieldCount;i++) {
fields[i].finish();
}
finishStoredFields();
}
}

Lucene删除文档

1
2
3
4
5
6
7
8
9
10
11
12
IndexWriter.deleteDocuments(Term... terms);
-> DocumentsWriter.deleteTerms(terms);
-> DocumentsWriter.applyDeleteOrUpdate(q -> q.addDelete(terms));
-> DocumentsWriterFlushControl.doOnDelete();
-> FlushByRamOrCountsPolicy.onDelete(this, null);
-> FlushByRamOrCountsPolicy.setApplyAllDeletes();
-> AtomicBoolean flushDeletes.set(true);
-> DocumentsWriter.applyAllDeletes(deleteQueue);
-> DocumentsWriterFlushQueue.addDeletes(deleteQueue);
-> org.apache.lucene.index.DocumentsWriterDeleteQueue#freezeGlobalBuffer
-> org.apache.lucene.index.DocumentsWriterFlushQueue.FlushTicket#FlushTicket
->
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package org.apache.lucene.index;

final class DocumentsWriter implements Closeable, Accountable {
private synchronized long applyDeleteOrUpdate(
ToLongFunction<DocumentsWriterDeleteQueue> function) throws IOException {
// TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
long seqNo = function.applyAsLong(deleteQueue);
flushControl.doOnDelete();
lastSeqNo = Math.max(lastSeqNo, seqNo);
if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
return seqNo;
}
}
1
2
3
4
5
6
7
8
9
class DocumentsWriterDeleteQueue {
private final BufferedUpdates globalBufferedUpdates; // 全局索引删除

long addDelete(Term... terms) {
long seqNo = add(new TermArrayNode(terms));
tryApplyGlobalSlice();
return seqNo;
}
}
1
2
3
4
// 便于内存控制
final class DocumentsWriterFlushControl implements Accountable {

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final class DocumentsWriterFlushQueue {
private final Queue<FlushTicket> queue = new LinkedList<>();
// we track tickets separately since count must be present even before the ticket is
// constructed ie. queue.size would not reflect it.
private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock();

synchronized void addDeletes(DocumentsWriterDeleteQueue deleteQueue) {
incTickets();// first inc the ticket count - freeze opens
// a window for #anyChanges to fail
boolean success = false;
try {
queue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
success = true;
} finally {
if (!success) {
decTickets();
}
}
}
}

globalBufferUpdates 是全局删除,删除Document

pendingUpdates 是局部删除, 添加或更新Document

局部指向最自己最后的节点, 全局永远指向 整个链表 的 最后一个节点;


Lucene检索过程

查询代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class QueryParseTest {
public static void main(String[] args) throws IOException, ParseException {
String field = "title";
Path indexPath = Paths.get("indexdir");
Directory directory = FSDirectory.open(indexPath);
IndexReader reader = DirectoryReader.open(directory);
IndexSearcher searcher = new IndexSearcher(reader);
Analyzer analyzer = new IKAnalyzer8x();
QueryParser parser = new QueryParser(field, analyzer);
parser.setDefaultOperator(QueryParser.Operator.AND);
Query query = parser.parse("农村学生");
System.out.println("Query: " + query.toString()); // 查询关键词
// 返回前10条
TopDocs topDocs = searcher.search(query, 10);
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
Document doc = searcher.doc(scoreDoc.doc);
System.out.println("DocID: " + scoreDoc.doc);
System.out.println("id: " + doc.get("id"));
System.out.println("title: " + doc.get("title"));
System.out.println("文档评分: " + scoreDoc.score);
}
directory.close();
reader.close();
}
}

查询关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package org.apache.lucene.search;
public class IndexSearcher {
/**
* Lower-level search API.
* Search all leaves using the given {@link CollectorManager}. In contrast
* to {@link #search(Query, Collector)}, this method will use the searcher's
* {@link ExecutorService} in order to parallelize execution of the collection
* on the configured {@link #leafSlices}.
* @see CollectorManager
* @lucene.experimental
*/
public <C extends Collector, T> T search(
Query query, CollectorManager<C, T> collectorManager) throws IOException {
if (executor == null) {
final C collector = collectorManager.newCollector();
search(query, collector);
return collectorManager.reduce(Collections.singletonList(collector));
} else {
final List<C> collectors = new ArrayList<>(leafSlices.length);
boolean needsScores = false;
for (int i = 0; i < leafSlices.length; ++i) {
final C collector = collectorManager.newCollector();
collectors.add(collector);
needsScores |= collector.needsScores();
}

query = rewrite(query);
// 创建打分 Weight ,默认 1
final Weight weight = createWeight(query, needsScores, 1);
// 根据 leafSlices.length 创建 topDocsFutures,线程调度
final List<Future<C>> topDocsFutures = new ArrayList<>(leafSlices.length);
for (int i = 0; i < leafSlices.length; ++i) {
final LeafReaderContext[] leaves = leafSlices[i].leaves;
final C collector = collectors.get(i);
topDocsFutures.add(executor.submit(new Callable<C>() {
@Override
public C call() throws Exception {
// 使用线程 去 遍历数据
search(Arrays.asList(leaves), weight, collector);
return collector;
}
}));
}

final List<C> collectedCollectors = new ArrayList<>();
for (Future<C> future : topDocsFutures) {
try {
collectedCollectors.add(future.get());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

return collectorManager.reduce(collectors);
}
}
}

Analysyer 包含两个组件

  • Tokenizer 分词器(分词 token)

  • TokenFilter 分词过滤器(大小写转换,词根cats)

Lucene70Codec => 实际去写Term

1
2
3
4
5
6
7
public class Lucene70Codec extends Codec {
private final TermVectorsFormat vectorsFormat = new Lucene50TermVectorsFormat();
private final FieldInfosFormat fieldInfosFormat = new Lucene60FieldInfosFormat();
private final SegmentInfoFormat segmentInfosFormat = new Lucene70SegmentInfoFormat();
private final LiveDocsFormat liveDocsFormat = new Lucene50LiveDocsFormat();
private final CompoundFormat compoundFormat = new Lucene50CompoundFormat();
}