elasticsearch索引创建create index集群matedata更新-mile米乐体育
目录
- 创建索引更新集群index matedata
- 首先创建index的create方法
- 从indice中获取对应的indexservice
- 总结
创建索引更新集群index matedata
创建索引需要创建索引并且更新集群index matedata,这一过程在metadatacreateindexservice的createindex方法中完成。这里会提交一个高优先级,ackedclusterstateupdatetask类型的task。索引创建需要即时得到反馈,异常这个task需要返回,会超时,而且这个任务的优先级也非常高。
下面具体看一下它的execute方法,这个方法会在master执行任务时调用,这个方法非常长,主要完成以下三个功能:更新合并request,template中的mapping和setting,调用indiceservice创建索引,对创建后的索引添加mapping。这一系列功能完成后,合并完成后生成新的matedata,并更新集群状态,完成了索引的创建。具体的调用方法参考上一篇。
首先创建index的create方法
代码如下所示:
@override public clusterstate execute(clusterstate currentstate) throws exception { boolean indexcreated = false; string removalreason = null; try { //检查request的合法性,1.5版本主要检查index名字是否合法,如不能含有某些字符,另外就是集群节点版本 validate(request, currentstate); for (alias alias : request.aliases()) {//检查别称是否合法 aliasvalidator.validatealias(alias, request.index(), currentstate.metadata()); } // 查找索引模板 list<indextemplatemetadata> templates = findtemplates(request, currentstate, indextemplatefilter); map<string, custom> customs = maps.newhashmap(); // add the request mapping map<string, map<string, object>http://www.cppcns.com> mappings = maps.newhashmap(); map<string, aliasmetadata> templatesaliases = maps.newhashmap(); list<string> templatenames = lists.newarraylist(); //取出request中的mapping配置,虽然mapping可以后面添加,多数情况创建索引的时候还是会附带着mapping,在request中mapping是一个map for (map.entry<string, string> entry : request.mappings().entryset()) { mappings.put(entry.getkey(), parsemapping(entry.getvalue())); } //一些预设如warm等 for (map.entry<string, custom> entry : request.customs().entryset()) { customs.put(entry.getkey(), entry.getvalue()); } // 将找到的template和request中的mapping合并 for (indextemplatemetadata template : templates) { templatenames.add(template.getname()); for (objectobjectcursor<string, compressedstring> cursor : template.mappings()) { if (mappings.containskey(cursor.key)) { xcontenthelper.mergedefaults(mappings.get(cursor.key), parsemapping(cursor.value.string())); } else { mappings.put(cursor.key, parsemapping(cursor.value.string())); } } // 合并custom for (objectobjectcursor<string, custom> cursor : template.customs()) { string type = cursor.key; indexmetadata.custom custom = cursor.value; indexmetadata.custom existing = customs.get(type); if (existing == null) { customs.put(type, custom); } else { indexmetadata.custom merged = indexmetadata.lookupfactorysafe(type).merge(existing, custom); customs.put(type, merged); } } //处理合并别名 for (objectobjectcursor<string, aliasmetadata> cursor : template.aliases()) { aliasmetadata aliasmetadata = cursor.value; //if an alias with same name came with the create index request itself, // ignore this one taken from the index template if (request.aliases().contains(new alias(aliasmetadata.alias()))) { continue; } //if an alias with same name was already processed, ignore this one if (templatesaliases.containskey(cursor.key)) { continue; } //allow templatesaliases to be templated by replacing a token with the name of the index that we are applying it to if (aliasmetadata.alias().contains("{index}")) { string templatedalias = aliasmetadata.alias().replace("{index}", request.index()); aliasmetadata = aliasmetadata.newaliasmetadata(aliasmetadata, templatedalias); } aliasvalidator.validatealiasmetadata(aliasmetadata, request.index(), currentstate.metadata()); templatesaliases.put(aliasmetadata.alias(), aliasmetadata); } } // 合并完template和request,现在开始处理配置基本的mapping,合并逻辑跟之前相同,只是mapping来源不同 file mappingsdir = new file(environment.configfile(), "mappings"); if (mappingsdir.isdirectory()) { // first index level file indexmappingsdir = new file(mappingsdir, request.index()); if (indexmappingsdir.isdirectory()) { addmappings(mappings, indexmappingsdir); } // second is the _default mapping file defaultmappingsdir = new file(mappingsdir, "_default"); if (defaultmappingsdir.isdirectory()) { addmappings(mappings, defaultmappingsdir); } } //处理index的配置(setting) immutablesettings.builder indexsettingsbuilder = settingsbuilder(); //加入模板中的setting for (int i = templates.size() - 1; i >= 0; i--) { indexsettingsbuilder.put(templates.get(i).settings()); } // 加入request中的mapping,request中设置会覆盖模板中的设置 indexsettingsbuilder.put(request.settings()); //处理shard,shard数量不能小于1,因此这里需要特殊处理,如果没有则要使用默认值 if (request.index().equals(scriptservice.script_index)) { indexsettingsbuilder.put(setting_number_of_shards, settings.getasint(setting_number_of_shards, 1)); } else { if (indexsettingsbuilder.get(setting_number_of_shards) == null) { http://www.cppcns.com if (request.index().equals(riverindexname)) { indexsettingsbuilder.put(setting_number_of_shards, settings.getasint(setting_number_of_shards, 1)); } else { indexsettingsbuilder.put(setting_number_of_shards, settings.getasint(setting_number_of_shards, 5)); } } } if (request.index().equals(scriptservice.script_index)) { indexsettingsbuilder.put(setting_number_of_replicas, settings.getasint(setting_number_of_replicas, 0)); indexsettingsbuilder.put(setting_auto_expand_replicas, "0-all"); } else { if (indexsettingsbuilder.get(setting_number_of_replicas) == null) { if (request.index().equals(riverindexname)) { indexsettingsbuilder.put(setting_number_of_replicas, settings.getasint(setting_number_of_replicas, 1)); } else { indexsettingsbuilder.put(setting_number_of_replicas, settings.getasint(setting_number_of_replicas, 1)); } } } //处理副本 if (settings.get(setting_auto_expand_replicas) != null && indexsettingsbuilder.get(setting_auto_expand_replicas) == null) { indexsettingsbuilder.put(setting_auto_expand_replicas, settings.get(setting_auto_expand_replicas)); } if (indexsettingsbuilder.get(setting_version_created) == null) { discoverynodes nodes = currentstate.nodes(); final version createdversion = version.smallest(version, nodes.smallestnonclientnodeversion()); indexsettingsbuilder.put(setting_version_created, createdversion); } if (indexsettingsbuilder.get(setting_creation_date) == null) { indexsettingsbuilder.put(setting_creation_date, syshttp://www.cppcns.comtem.currenttimemillis()); } indexsettingsbuilder.put(setting_uuid, strings.randombase64uuid()); //创建setting settings actualindexsettings = indexsettingsbuilder.build(); // 通过indiceservice创建索引 indicesservice.createindex(request.index(), actualindexsettings, clusterservice.localnode().id()); indexcreated = true; //如果创建成功这里就可以获取到对应的indexservice,否则会抛出异常 indexservice indexservice = indicesservice.indexservicesafe(request.index()); //获取mappingservice试图放置mapping mapperservice mapperservice = indexservice.mapperservice(); // 为索引添加mapping,首先是默认mapping if (mappings.containskey(mapperservice.default_mapping)) { try { mapperservice.merge(mapperservice.default_mapping, new compressedstring(xcontentfactory.jsonbuilder().map(mappings.get(mapperservice.default_mapping)).string()), false); } catch (exception e) { removalreason = "failed on parsing default mapping on index creation"; throw new mapperparsingexception("mapping [" mapperservice.default_mapping "]", e); } } for (map.entry<string, map<string, object>> entry : mappings.entryset()) { if (entry.getkey().equals(mapperservice.default_mapping)) { continue; } try { // apply the default here, its the first time we parse it mapperservice.merge(entry.getkey(), new compressedstring(xcontentfactory.jsonbuilder().map(entry.getvalue()).string()), true); } catch (exception e) { removalreason = "failed on parsing mappings on index creation"; throw new mapperparsingexception("mapping [" entry.getkey() "]", e); } } //添加request中的别称 indexqueryparserservice indexqueryparserservice = indexservice.queryparserservice(); for (alias alias : request.aliases()) { if (strings.haslength(alias.filter())) { aliasvalidator.validatealiasfilter(alias.name(), alias.filter(), indexqueryparserservice); } } for (aliasmetadata aliasmetadata : templatesaliases.values()) { if (aliasmetadata.filter() != null) { aliasvalidator.validatealiasfilter(aliasmetadata.alias(), aliasmetadata.filter().uncompressed(), indexqueryparserservice); } } // 以下更新index的matedata, map<string, mappingmetadata> mappingsmetadata = maps.newhashmap(); for (documentmapper mapper : mapperservice.docmappers(true)) { mappingmetadata mappingmd = new mappingmetadata(mapper); mappingsmetadata.put(mapper.type(), mappingmd); } final indexmetadata.builder indexmetadatabuilder = indexmetadata.builder(request.index()).settings(actualindexsettings); for (mappingmetadata mappingmd : mappingsmetadata.values()) { indexmetadatabuilder.putmapping(mappingmd); } for (aliasmetadata aliasmetadata : templatesaliases.values()) { indexmetadatabuilder.putalias(aliasmetadata); } for (alias alias : request.aliases()) { aliasmetadata aliasmetadata = aliasmetadata.builder(alias.name()).filter(alias.filter()) .indexrouting(alias.indexrouting()).searchrouting(alias.searchrouting()).build(); indexmetadatabuilder.putalias(aliasmetadata); } for (map.entry<string, custom> customentry : customs.entryset()) { indexmetadatabuilder.putcustom(customentry.getkey(), customentry.getvalue()); } indexmetadatabuilder.state(request.state()); //matedata更新完毕,build新的matedata final indexmetadata indexmetadata; try { indexmetadata = indexmetadatabuilder.build(); } catch (exception e) { removalreason = "failed to build index metadata"; throw e; } indexservice.indiceslifecycle().beforeindexaddedtocluster(new index(request.index()), indexmetadata.settings()); //更新集群的matedata,将新build的indexmatadata加入到metadata中 metadata newmetadata = metadata.builder(currentstate.metadata()) .put(indexmetadata, false) .build(); logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}", request.index(), request.cause(), templatenames, indexmetadata.numberofshards(), indexmetadata.numberofreplicas(), mappings.keyset()); //阻塞集群,更新matadata clusterblocks.builder blocks = clusterblocks.builder().blocks(currentstate.blocks()); if (!request.blocks().isempty()) { for (clusterblock block : request.blocks()) { blocks.addindexblock(request.index(), block); } } if (request.state() == state.close) { blocks.addindexblock(request.index(), metadataindexstateservice.index_closed_block); } clusterstate updatedstate = clusterstate.builder(currentstate).blocks(blocks).metadata(newmetadata).build(); if (request.state() == state.open) { routingtable.builder routingtablebuilder = routingtable.builder(updatedstate.routingtable()) .addasnew(updatedstate.metadata().index(request.index())); routingallocation.result routingresult = allocationservice.reroute(clusterstate.builder(updatedstate).routingtable(routingtablebuilder).build()); updatedstate = clusterstate.builder(updatedstate).routingresult(routingresult).build(); } removalreason = "cleaning up after validating index on master"; return updatedstate; } finally { if (indexcreated) { // index was already partially created - need to clean up indicesservice.removeindex(request.index(), removalreason != null ? removalreason : "failed to create index"); } } } }); }
以上就是创建index的create方法,方法中主要进行了两个动作:合并更新index的matadata和创建index。更新合并matadata的过程都在上面的代码中体现了。
从indice中获取对应的indexservice
创建索引是调用indiceserivice构建一个guice的injector,这个injector包含了index的所有功能(如分词,相似度等)。同时会将其存储到indiceservice中,以一个map的格式存储map
这一部分代码如下所示:
public synchronized indexservice createindex(string sindexname, @indexsettings settings settings, string localnodeid) throws elasticsearchexception { if (!lifecycle.started()) { throw new elasticsearchillegalstateexception("can't create an index [" sindexname "], node is closed"); } index index = new index(sindexname); //检测index是否已经存在 if (indices.containskey(index.name())) { throw new indexalreadyexistsexception(index); } indiceslifecycle.beforeindexcreated(index, settings); logger.debug("creating index [{}], shards [{}]/[{}]", sindexname, settings.get(setting_number_of_shards), settings.get(setting_number_of_replicas)); settings indexsettings = settingsbuilder() .put(this.settings) .put(settings) .classloader(settings.getclassloader()) .build(); //构建index对应的injector modulesbuilder modules = new modhttp://www.cppcns.comulesbuilder(); modules.add(new indexnamemodule(index)); modules.add(new localnodeidmodule(localnodeid)); modules.add(new indexsettingsmodule(index, indexsettings)); modules.add(new indexpluginsmodule(indexsettings, pluginsservice)); modules.add(new indexstoremodule(indexsettings)); modules.add(new indexenginemodule(indexsettings)); modules.add(new analysismodule(indexsettings, indicesanalysisservice)); modules.add(new similaritymodule(indexsettings)); modules.add(new indexcachemodule(indexsettings)); modules.add(new indexfielddatamodule(indexsettings)); modules.add(new codecmodule(indexsettings)); modules.add(new mapperservicemodule()); modules.add(new indexqueryparsermodule(indexsettings)); modules.add(new indexaliasesservicemodule()); modules.add(new indexgatewaymodule(indexsettings, injector.getinstance(gateway.class))); modules.add(new indexmodule(indexsettings)); injector indexinjector; try { indexinjector = modules.createchildinjector(injector); } catch (creationexception e) { throw new indexcreationexception(index, injectors.getfirsterrorfailure(e)); } catch (throwable e) { throw new indexcreationexception(index, e); } indexservice indexservice = indexinjector.getinstance(indexservice.class); indiceslifecycle.afterindexcreated(indexservice); //将indexservice和indexinjector加入到indice map中 indices = newmapbuilder(indices).put(index.name(), new tuple<>(indexservice, indexinjector)).immutablemap(); return indexservice; }
以上方法就是具体创建索引的过程,它是在master上操作的,同时它是同步方法。这样才能保证集群的index创建一致性,因此这也会导致之前所说的大量创建创建索引时候的速度瓶颈。但是创建大量索引的动作是不常见的,需要尽量避免。创建一个索引对于一个集群来说就是开启对于该索引的各种操作,因此这里通过guice将索引的各个功能模块注入,并获得index操作的接口类indexservice。如果这个方法执行成功,则可以合并template及request中的mapping,并且向刚创建的索引添加合并后的mapping,最后构建新的matadata,并将集群新的matadata发送给各个节点完成索引创建。
总结
索引创建的过程包括三步:更新集群matadata,调用indiceservice中创建索引,向新创建的索引中放置(合并到index对应的mappingservice中)mapping。这三步都在以上的两个方法中。完成这三步,集群中就保存了新索引的信息,同时索引配置和mapping放置也完成。索引就可以正常使用。
以上就是elasticsearch索引创建create index集群matedata更新的详细内容,更多关于elasticsearch索引create index matedata更新的资料请关注mile米乐体育其它相关文章!