`

ElasticSearch2.4.0基于Java API或Java Rest API进行CRUD

阅读更多
 编写不易,转载请注明(http://shihlei.iteye.com/blog/2328656)!
 

规划:

Indics:索引库,相当于RDBMS的 数据库,整体控制分片(shard)和副本(Replica),一旦创建,分片不能改变。
Document Type:索引类型
Document:索引记录,由唯一ID区分,ID决定Shard位置
Filed:一条索引记录中的组成字段,有类型概念,通过Mapping控制类型,是否分词,索引等 
这里规划如下
1)Indics:indexdb
2)Type:docs
3)Document:
     其中tags是文档标签,需要进行分词索引,且检索是占权重的50%


 

一 创建Indices

 
curl -XPUT 'http://localhost:9200/indexdb' -d '{
    "settings" : {
        "index" : {
            "number_of_shards" : 2,
            "number_of_replicas" : 1
        }
    }
}'
 

二 创建Mapping

 
curl -XPUT 'http://localhost:9200/indexdb/_mapping/docs' -d '{
   "properties": {
       "id": {
            "type": "long",
            "index": "no"
       },
       "title": {
            "type": "string",
            "index": "not_analyzed",
            "index_options": "docs"
       },
       "author": {
            "type": "string",
            "index": "not_analyzed",
            "index_options": "docs"
       },
       "tags": {
            "type": "string",
            "boost" : 3.0,
            "index_options": "docs"
       },
       "publishTime": {
            "type": "date",
            "format": "yyyy-MM-dd HH:mm:ss"
       }
   }
}'
 

三 基本类

Document:
package x.search.es.simple;

import java.util.Arrays;
import java.util.Date;

/**
 * Created by shilei on 2016/10/4.
 */
public class Document {
    private long id;
    private String title;
    private String author;
    private String[] tags;
    private Date publishTime;

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String[] getTags() {
        return tags;
    }

    public void setTags(String[] tags) {
        this.tags = tags;
    }

    public Date getPublishTime() {
        return publishTime;
    }

    public void setPublishTime(Date publishTime) {
        this.publishTime = publishTime;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    @Override
    public String toString() {
        return "Document{" +
                "id=" + id +
                ", title='" + title + '\'' +
                ", author='" + author + '\'' +
                ", tags=" + Arrays.toString(tags) +
                ", publishTime=" + publishTime +
                '}';
    }
}
 
IDocumentDao:
package x.search.es.simple;

/**
 * Created by shilei on 2016/10/5.
 */
public interface IDocumentDao {
    //indices 名必须小写
    public static final String INDICES = "indexdb";
    // type 名必须小写
    public static final String TYPE = "docs";


    boolean insert(Document doc);

    boolean replace(Document doc);

    boolean update(Document doc);


    boolean delete(long id);

    Document searchById(long id);
}
 

四 Java Client API CRUD

依赖如下:
        <!--TCP client-->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${es.version}</version>
        </dependency>
 
TCP 实现类:
package x.search.es.simple.tcp;

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import x.search.es.simple.Document;
import x.search.es.simple.IDocumentDao;
import x.search.util.GsonHolder;

/**
 * Created by shilei on 2016/10/5.
 */
public class DocumentTcpDaoImpl implements IDocumentDao {

    //TCP连接客户端:
    private TransportClient client = null;

    public DocumentTcpDaoImpl(TransportClient client) {
        this.client = client;
    }

    @Override
    public boolean insert(Document doc) {
        String json = GsonHolder.gson.toJson(doc);
        IndexResponse response = client.prepareIndex(INDICES, TYPE, String.valueOf(doc.getId())).setSource(json).get();
        return response.isCreated();
    }

    @Override
    public boolean replace(Document doc) {
        return update(doc);
    }

    @Override
    public boolean update(Document doc) {
        String json = GsonHolder.gson.toJson(doc);

        UpdateResponse response = client.prepareUpdate(INDICES, TYPE, String.valueOf(doc.getId()))
                .setDoc(json)
                .get();

        return !response.isCreated();
    }


    @Override
    public boolean delete(long id) {
        DeleteResponse response = client.prepareDelete(INDICES, TYPE, String.valueOf(id)).get();
        return response.isFound();
    }

    @Override
    public Document searchById(long id) {
        GetResponse response = client.prepareGet(INDICES, TYPE, String.valueOf(id)).get();
        if (response.isExists()) {
            String json = response.getSourceAsString();
            return GsonHolder.gson.fromJson(json, Document.class);
        }
        return null;
    }
}
 
连接类:
package x.search.es.simple.tcp;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * Created by shilei on 16/9/23.
 */
public class EsTcpClient {

    private static final String ES_HOST = "localhost";
    private static final int ES_TCP_PORT = 9300;

    private static TransportClient client;

    /**
     * 获取TCP 客户端
     *
     * @return
     */
    public static synchronized TransportClient getClient() {
        if (client == null) {
            build();
        }
        return client;
    }

    /**
     * 关闭客户端
     */
    public static void close(TransportClient client) {
        if (client != null) {
            client.close();
        }
    }

    /**
     * 建立连接
     *
     * @return
     */
    private static void build() {
        try {
            //特别注意:如果cluster 起了名字,需要在连接时指定名字,否则验证客户端连接的不是默认集群elasticsearch,会忽略,则无法找到节点
            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "singlenode_cluster").build();
            client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ES_HOST), ES_TCP_PORT));


//                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

}
 
特别注意:
异常:
NodeAvailableException[None of the configured nodes are available: [{#transport#-1}{127.0.0.1}{127.0.0.1:9300}]
]
	at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:290)
	at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:207)
	at org.elasticsearch.client.transport.support.TransportProxyClient.execute(TransportProxyClient.java:55)
	at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:288)
	at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:359)
	at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:86)
	at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:56)
	at org.elasticsearch.action.ActionRequestBuilder.get(ActionRequestBuilder.java:64)
	at x.search.es.simple.tcp.DocumentTcpDaoImpl.update(DocumentTcpDaoImpl.java:42)
	at x.search.es.simple.tcp.DocumentTcpDaoImpl.replace(DocumentTcpDaoImpl.java:33)
	at x.search.es.simple.DocumentDaoTest.testReplace(DocumentDaoTest.java:68)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
 
解决方法:
1)Java Client API 连接的是TCP 端口,请保证服务器可访问,TCP 9300端口可访问(非9200):
包括:
(1)绑定任意IP:修改配置文件 :elasticsearch.xml  添加 network.host: 0.0.0.0
(2)指定端口,修改配置文件 :elasticsearch.xml  添加  transport.tcp.port: 9300
确保:telnet 127.0.0.1 9300 可行,远程访问,防火墙要做放行。
2)如果配置时指定了cluster.name 需要在创建TransportClient是也传递cluster.name(这个坑我好久,没充分看文档)
cluster.name: singlenode_cluster
  //特别注意:如果cluster 起了名字,需要在连接时指定名字,否则验证客户端连接的不是默认集群elasticsearch,会忽略,则无法找到节点
            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "singlenode_cluster").build();
            client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ES_HOST), ES_TCP_PORT));
 要不是单独创建,看到的如下日志,我也不会解决该问题:
13:41:47.471 [main] INFO  org.elasticsearch.plugins - [Hybrid] modules [], plugins [], sites []
13:41:48.058 [main] WARN  org.elasticsearch.client.transport - [Hybrid] node {#transport#-1}{127.0.0.1}{127.0.0.1:9300} not part of the cluster Cluster [elasticsearch], ignoring...
 
文档也说明了:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html
Note that you have to set the cluster name if you use one different than "elasticsearch":
 

五 Java RestAPI CRUD

基于HttpClient实现的依赖如下:
        <!-- rest -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>rest</artifactId>
            <version>5.0.0-beta1</version>
        </dependency>
 
RestAPI 实现:
package x.search.es.simple.rest;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.util.Strings;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import x.search.es.simple.Document;
import x.search.es.simple.IDocumentDao;
import x.search.util.GsonHolder;

import java.io.IOException;
import java.util.Collections;

/**
 * Created by shilei on 2016/9/26.
 */
public class DocumentRestDaoImpl implements IDocumentDao {

    private RestClient client;

    public DocumentRestDaoImpl(RestClient client) {
        this.client = client;
    }


    @Override
    public boolean insert(Document doc) {
        String json = GsonHolder.gson.toJson(doc);
        HttpEntity entity = new NStringEntity(
                json, ContentType.APPLICATION_JSON);

        try {
            Response response = client.performRequest(
                    "PUT",
                    buildEndPoint(doc.getId()),
                    Collections.<String, String>emptyMap(),
                    entity);
            String responseBody = consumeResponse(response);
            if (Strings.isBlank(responseBody)) {
                return false;
            }

            return GsonHolder.getTopLevelElement(responseBody, "created").getAsBoolean();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }



    //读取json 内容
    private String consumeResponse(Response response) throws IOException {
        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK 
 && response.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED)) {
            return null;
        }
        HttpEntity httpClientEntity = response.getEntity();
        String responseContent = EntityUtils.toString(httpClientEntity);
        EntityUtils.consume(httpClientEntity);

        return responseContent;
    }

    @Override
    public boolean replace(Document doc) {
        return !insert(doc);
    }

    private static class UpdateMessage<T>{
        private T doc;

        public UpdateMessage(T doc){
            this.doc = doc;
        }

        public T getDoc() {
            return doc;
        }

        public void setDoc(T doc) {
            this.doc = doc;
        }
    }
    /**
     * 通过api的方式更新指定数据
     *
     * @param doc
     * @return
     */
    @Override
    public boolean update(Document doc) {
        String json = GsonHolder.gson.toJson(new UpdateMessage<Document>(doc));
        HttpEntity entity = new NStringEntity(
                json, ContentType.APPLICATION_JSON);

        try {
            Response response = client.performRequest(
                    "POST",
                    buildEndPoint(doc.getId())+"/_update",
                    Collections.<String, String>emptyMap(),
                    entity);
            String responseBody = consumeResponse(response);
            if (Strings.isBlank(responseBody)) {
                return false;
            }

            return GsonHolder.getTopLevelElement(responseBody, "error")==null;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean delete(long id) {
        try {
            Response response = client.performRequest(
                    "DELETE",
                    buildEndPoint(id),
                    Collections.<String, String>emptyMap()
            );
            String responseBody = consumeResponse(response);
            if (Strings.isBlank(responseBody)) {
                return false;
            }

            return GsonHolder.getTopLevelElement(responseBody, "found").getAsBoolean();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Document searchById(long id) {
        try {
            Response response = client.performRequest(
                    "GET",
                    buildEndPoint(id),
                    Collections.<String, String>emptyMap()
            );
            String responseBody = consumeResponse(response);

            return toCampaign(responseBody);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    //解析 mapping
    private Document toCampaign(String json){
        JsonObject topObject = GsonHolder.jsonParser.parse(json).getAsJsonObject();
        JsonElement foundObject = topObject.get("found");
        if(foundObject==null && !foundObject.getAsBoolean()){
            return null;
        }

        JsonElement sourceElement = topObject.get("_source");
        if(sourceElement==null){
            return null;
        }
        return GsonHolder.gson.fromJson(sourceElement,Document.class);
    }


    //创建请求地址
    private static String endPointTemplate = "/" + INDICES + "/" + TYPE + "/";

    private String buildEndPoint(long id) {
        return endPointTemplate + id;
    }
}
 
连接类:
package x.search.es.simple.rest;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

import java.io.IOException;

/**
 * Created by shilei on 2016/9/26.
 */
public class EsRestClient {
    private static final String ES_HOST = "127.0.0.1";
    private static final int ES_HTTP_PORT = 9200;

    private static RestClient client;

    /**
     * 获取TCP 客户端
     *
     * @return
     */
    public static synchronized RestClient getClient() {
        if (client == null) {
            build();
        }
        return client;
    }

    /**
     * 关闭客户端
     */
    public static void close(RestClient client) throws IOException {
        if (client != null) {
            client.close();
        }
    }

    /**
     * 建立连接
     *
     * @return
     */
    private static void build() {
         client = RestClient.builder(
                new HttpHost(ES_HOST, ES_HTTP_PORT)).build();
//                    ,
//                    new HttpHost("http", "localhost", 9201))
//                    .build();
    }
}
 

六 其他工具:

Gson:
package x.search.util;

import com.google.gson.*;

/**
 * Created by shilei on 2016/9/27.
 */
public class GsonHolder {
    public static Gson gson;

    public static JsonParser jsonParser;

    static {
        gson = new GsonBuilder()
                .setDateFormat("yyyy-MM-dd HH:mm:ss")
                .create();

        jsonParser = new JsonParser();
    }


    public static JsonElement getTopLevelElement(String json, String fieldName) {
        JsonObject jsonObject = GsonHolder.jsonParser.parse(json).getAsJsonObject();
        return jsonObject.get(fieldName);
    }
}
 
附录:
(一)数据类型
主要数据类型:
 
分组
类型名
类型
说明
核心类型
字符串类型
string
可以整个文本作为索引词,也可以经过分析器分析,分词后每个词以词建立索引
数字类型
long, integer, short, byte, double, float
 
 
日期类型
date
json没有日期类型,传递的都是字符串,需要通过参数format指定日期格式
 
布尔类型
boolean
布尔,false可用值 false,0,0.0,no,off,””
 
二进制类型
binary
默认不存储,不可对该字段条件检索
 
复杂类型
数组类型
无专门的类型
一组相同类型的值集合,可以检索某个词是否在数组中
对象类型
json
内嵌对象
 
嵌套类型
json数组
内嵌对象数组
 
 
其他数据类型:
     Geo,IPV4,Token等,专门场景使用
 
(二)Mapping 参数
 
整体:
参数名称
描述
默认
可选值
dynamic
是否动态创建字段mapping
true
true:检测到新字段,加入到mappingfalse:检测到新字段,忽略strict:检测到新字段抛异常
 
重要的如下:

 

参数名称
描述
默认
可选值
针对类型
index
控制属性值如何被索引和搜索
string类型analyzed
数字类型not_analyzed
analyzed:分析且索引not_analyzed:不分析但索引no:不索引,不能搜索该字段,但结果可以有
string
enabled
只存储不建立索引,用于精确查找
true
true/fasle
object
index_options
控制什么样的信息会加入倒排索引中,用于搜索和高亮
string类型且analyzed默认 positions,其他类型默认 docs
docs:索引文档编号freqs:索引文档编号,词频positions:索引文档编号,词频,词的位置offsets:索引文档编号,词频,词的位置,开始结束偏移量,用于高亮
all
analyzer
指定该字段索引和搜索时使用的分析器,用于对文本进行分词
标准分析器
 
string 类型且index 参数值为analyzed

store
默认索引内容存储在_source中,这里用于指导知否独立存储某个字段,当只查询某些字段且_source特别大时有好处
false
true:存储false:不存储
all
boost
字段权值,查询打分使用,接受float类型值
1.0
float数值
 
format
指定日期类型格式
 
 
date
ignore_above
指定String类型字段上限,超过则该字段不会被分析器分析和索引,即忽略
 
字符长度
string
ignore_malformed
插入忽略错误数据类型时报异常
false
true;忽略错误数据类型
 
include_in_all
ES为每个文档定义一个_all的域,调用该域可以检索所有字段,向在_all中排除,可用该参数
true
true/false
all
 
 
 
其他还有很多,不细列了
coerce,copy_to,doc_values,fielddata,geohash,geohash_precision,geohash_prefix,ignore_malformed,include_in_all,lat_lon
fields,norms,null_value,position_increment_gap,properties,search_analyzer,similarity,store,term_vector
 
  • 大小: 131.9 KB
分享到:
评论
3 楼 ShihLei 2017-06-18  
东辉VS风行 写道
发下一个bug,当使用rest接口的是很,第一次调用put的时候返回的状态码是201.
//httpClient读取json 内容 
    private String consumeResponse(Response response) throws IOException { 
    logger.info("status code:"+response.getStatusLine().getStatusCode() );
        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK  && response.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) { 
            return null; 
        }
        HttpEntity httpClientEntity = response.getEntity(); 
        String responseContent = EntityUtils.toString(httpClientEntity); 
        EntityUtils.consume(httpClientEntity); 
        return responseContent; 
    }
   


  document 在es中不存在的时候,http返回 201 ,引用下你这段,更新了
2 楼 东辉VS风行 2017-05-08  
1 楼 东辉VS风行 2017-05-08  
发下一个bug,当使用rest接口的是很,第一次调用put的时候返回的状态码是201.
//httpClient读取json 内容 
    private String consumeResponse(Response response) throws IOException { 
    logger.info("status code:"+response.getStatusLine().getStatusCode() );
        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK  && response.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) { 
            return null; 
        }
        HttpEntity httpClientEntity = response.getEntity(); 
        String responseContent = EntityUtils.toString(httpClientEntity); 
        EntityUtils.consume(httpClientEntity); 
        return responseContent; 
    }
   

相关推荐

Global site tag (gtag.js) - Google Analytics