elasticsearch索引的创建过程index create逻辑分析-mile米乐体育
目录
- 索引的创建过程
- materoperation方法实现
- clusterservice处理
- 建立索引 修改配置
- 总结
索引的创建过程
从本篇开始,就进入了index的核心代码部分。这里首先分析一下索引的创建过程。elasticsearch中的索引是多个分片的集合,它只是逻辑上的索引,并不具备实际的索引功能,所有对数据的操作最终还是由每个分片完成。
创建索引的过程,从elasticsearch集群上来说就是写入索引元数据的过程,这一操作只能在master节点上完成。这是一个阻塞式动作,在加上分配在集群上均衡的过程也非常耗时,因此在一次创建大量索引的过程master节点会出现单点性能瓶颈,能够看到响应过程很慢。
在开始具体源码分析之前,首先回顾一下action部分的内容(参考index action分析),elasticsearch的每一个功能都对应两个action,*action和transport*action。*action中定义了每个功能对应的路径,同时action的instance绑定对应的transport*action。所有功能请求都需要在集群上转发,这大概也是每个功能都有transport*action的原因吧。对于create当然也不例外,它的开始点也是transportcreateaction。另外,在action support分析中分析过,不同的action需要经过和需要操作的节点也不同。create index只能由master节点进行,而且也只在master节点上进行,保证集群数据的一致性。
materoperation方法实现
因此transportcreateaction继承了transportmasternodeoperationaction,并实现了materoperation方法。它的方法如下所示:
protected void masteroperation(final createindexrequest request, final clusterstate state, final actionlistener<createindexresponse> listener) throws elasticsearchexception { string cause = request.cause(); if (cause.length() == 0) { cause = "api"; } final createindexclusterstateupdaterequest updaterequest = new createindexclusterstateupdaterequest(request, cause, request.index()) .acktimeout(request.timeout()).masternodetimeout(request.masternodetimeout()) .settings(request.settings()).mappings(request.mappings()) .aliases(request.aliases()).customs(request.customs()); createindexservice.createindex(updaterequest, new actionlistener<clusterstateupdateresponse>() { @override public void onresponse(clusterstateupdateresponse response) { listener.onresponse(new createindexresponse(response.isacknowledged())); } @override public void onfailure(throwable t) { if (t instanceof indexalreadyexistsexception) { logger.trace("[{}] failed to create", t, request.index()); } else { logger.debug("[{}] failed to create", t, request.index()); } listener.onfailure(t); } }); }
这里看上很简单,只是调用了createindexservice(它其实是metadatacreateindexservice)的方法,就是修改集群matedata过程。
clusterservice处理
修改前首先获取到index名称对应的lock,这样保证操作数据一致性,然后生成updatetask,交给clusterservice处理。代码如下所示:
public void createindex(final createindexclusterstateupdaterequest request, final actionlistener<clusterstateupdateresponse> listener) { // 获取锁,只对该索引的操作加锁,而不是整个cluster final semaphore mdlock = metadataservice.indexmetadatalock(request.index()); // 如果能够获取锁离开创建索引,否则在下面启动新的线程进行 if (mdlock.tryacquire()) { createindex(request, listener, mdlock); return; } threadpool.executor(threadpool.names.management).execute(new actionrunnable(listener) { @override public void dorun() throws interruptedexception { if (!mdlock.tryacquire(request.masternodetimeout().nanos(), timeunit.nanoseconds)) { listener.onfailure(new processclustereventtimeoutexception(request.masternodetimeout(), "acquire index lock")); return; } createindex(request, listener, mdlock); } }); }
createindex方法,会封装create请求,然后向cluster发送一个updatetask。代码如下所示:
private void createindex(final createindexclusterstateupdaterequest request, final actionlistener<clusterstateupdateresponse> listener, final semaphore mdlock) { immutablesettings.builder updatedsettingsbuilder = immutablesettings.settingsbuilder(); updatedsettingsbuilder.put(request.settings()).normalizeprefix(indexmetadata.index_setting_prefix); request.settings(updatedsettingsbuilder.build()); clusterservice.submitstateupdatetask("create-index [" request.index() "], cause [" request.cause() "]", priority.urgent, new ackedclusterstateupdatetask<clusterstateupdateresponse>(request, listener)
建立索引 修改配置
增加或者修改mapping都是对集群状态修改,它们的过程都很相似,都是通过clusterservice提交一个更新操作,同时附带有优先级。clusterservice会根据优先级和更新状态task的类型来进行对应的操作。如下所示:
public void submitstateupdatetask(final string source, priority priority, final clusterstateupdatetask updatetask) { if (!lifecycle.started()) { return; } try { final updatetask task = new updatetask(source, priority, updatetask);//根据优先级新建不同的task if (updatetask instanceof timeoutclusterstateupdatetask) {//超时任务,这类任务需要即时返回,因此立刻执行。 final timeoutclusterstateupdatetask timeoutupdatetask = (timeoutclusterstateupdatetask) updatetask; updatetasksexecutor.execute(task, threadpool.scheduler(), timeoutupdatetask.timeout(), new runnable() { @override public void run() { threadpool.generic().execute(new runnable() { @override public void run() { timeoutupdatetask.onfailure(task.source(), new processclustereventtimeoutexception(timeoutupdatetask.timeout(), task.source())); } }); } }); } else {//其它类型,可以延迟执行,则交给线程池来执行。 updatetasksexecutor.execute(task); } } catch (esrejectedexecutionexception e) { // ignore cases where we are shutting down..., there is really nothing interesting // to be done here... if (!lifecycle.stoppedorclosed()) { throw e; } } }
说完它们的执行过程,再来看一下create index的具体逻辑。这个逻辑在matedataservice所提交的ackedclusterstateupdatetask中的execute方法中。总体来说,这一过程就是将request中关于索引的配置mapping等取出来加入到当前的clustermatedata中,构造一个新的matedata的过程。这一过程还是比较复杂,限于篇幅将在下次中进行分析。
总结
创建索引的过程就是master节点更新集群matedata的过程,为了保证数据一致性,需要获取锁。
因此存在单点瓶颈。对于外部调用来说,跟其它功能一样,外部接口调用createindexaction的相关方法,然后通过transportcreateindexaction讲请求发送到集群上,进行索引创建。
以上就是elasticsearch索引创建过程index create的详细内容,更多关于elasticsearch索引创建过程index create的资料请关注mile米乐体育其它相关文章!