侧边栏壁纸
博主头像
qingtian博主等级

喜欢是一件细水流长的事,是永不疲惫的双向奔赴~!

  • 累计撰写 104 篇文章
  • 累计创建 48 个标签
  • 累计收到 1 条评论

分布式搜索引擎 Elasticsearch

qingtian
2022-09-17 / 0 评论 / 0 点赞 / 482 阅读 / 23,710 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-09-27,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

分布式搜索引擎 Elasticsearch

ES 核心术语

  • 索引 index ---------------------------------------表
  • 类型 type -----------------------------------------表逻辑类型
  • 文档 document -----------------------------------行
  • 字段 fields ----------------------------------------列
  • 映射 mapping ------------------------------------表结构定义
  • 近实时 NRT ---------------------------------------Near real time
  • 节点 node ----------------------------------------每一个服务器
  • shard replica ------------------------------------数据分片和备份

ES集群架构原理

shard replica

倒排索引

Elasticsearch 的核心配置文件

image-20220907080444347

cd config
// 核心配置文件
ll
elasticsearch.yml
vim elasticsearch.yml
cluster.name------集群名称
node.name-------节点名称
path.data-------数据存储位置
path.logs-------日志存储位置
network.host------本机地址 0.0.0.0
http.port------对外的http端口号
cluster.initial_master_nodes:[""]---------------设置初始的master节点 填node.name

vim jvm.options
-Xms
-Xmx
// 实体机搭建es 不允许root用户启动
useradd esuser
chown -R esuser

一些命令

1. 创建索引
PUT index_temp
{
  "settings": {
    "index": {
      "number_of_shards": "3",
      "number_of_replicas": "0"
    }
  }
}

2. 查看es健康
GET _cluster/health

3. 获取索引信息
GET index_temp

4. 创建索引的同时创建结构
PUT index_mapping
{
  "mappings": {
    "properties": {
      "realname" : {
        "type": "text",
        "index": true
      },
      "username" : {
        "type": "keyword",
        "index": false
      }
    }
  }
}
5. 分词分析
GET index_mapping/_analyze
{
  "field": "realname",
  "text": "guank is very good"
}
--------------------------
分词响应格式
{
  "tokens" : [
    {
      "token" : "guank",
      "start_offset" : 0,
      "end_offset" : 5,
      "type" : "<ALPHANUM>",
      "position" : 0
    },
    {
      "token" : "is",
      "start_offset" : 6,
      "end_offset" : 8,
      "type" : "<ALPHANUM>",
      "position" : 1
    },
    {
      "token" : "very",
      "start_offset" : 9,
      "end_offset" : 13,
      "type" : "<ALPHANUM>",
      "position" : 2
    },
    {
      "token" : "good",
      "start_offset" : 14,
      "end_offset" : 18,
      "type" : "<ALPHANUM>",
      "position" : 3
    }
  ]
}
一旦索引中的属性的数据结构被定义好就不允许修改
6. 为已存在的索引添加 mapping 映射
POST index_mapping/_mapping
{
  
  "properties": {
    "id" : {
      "type": "long"
    },
    "age" : {
      "type": "integer"
    }
  }
  
}

文档的基本操作

1. 在索引下创建一个文档
POST my_doc/_doc/1
{
  "id": 1001,
  "name" : "guank",
  "desc": "guank is very good",
  "ceate_date":"2022-09-08"
}

2. 删除文档
DELETE my_doc/_doc/1

3. 修改文档
POST my_doc/_update/2
{
  "doc": {
    "name" : "guank02"
  }
}

4. 根据 _id 查询
GET my_doc/_doc/2
------------------------
返回参数
{
  "_index" : "my_doc",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 3,
  "_seq_no" : 4,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "id" : 1002,
    "name" : "guank02",
    "desc" : "guank is very good",
    "ceate_date" : "2022-09-08"
  }
}

5. 查询索引下所有数据
GET my_doc/_search
------------------------
返回参数
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my_doc",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 1002,
          "name" : "guank02",
          "desc" : "guank is very good",
          "ceate_date" : "2022-09-08"
        }
      }
    ]
  }
}

6. 定制返回结果集中包含的属性
GET my_doc/_doc/2?_source=id,name
--------------------------
返回参数
{
  "_index" : "my_doc",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 3,
  "_seq_no" : 4,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "name" : "guank02",
    "id" : 1002
  }
}

文档乐观锁控制

// 在更新的时候使用 if_seq_no if_primary_term 通过乐观锁控制

POST my_doc/_doc/1002?if_seq_no=5&if_primary_term=1
{
  "doc" : {
    "name" : "guank04"
  }
}

分词器

POST _analyze
{
  "analyzer": "standard",
  "text": "i study in guank.com"
}

// es 内置的分词器
simple
standard
whitespace
keyword
stop

使用 ik 分词器

使用自定义的词典

cd elasticsearch/plugins/ik/config
// 文件名
IKAnalyzer.cfg.xml

DSL语法入门

1.根据条件查询

QueryString 方式

GET my_doc/_search?q=desc:guank&q=name:guank

JSON 方式

POST my_doc/_search
{
  "query":{
    "match":{
      "desc":"guank"
    }
  }
}

POST my_doc/_search
{
  "query": {
    "exists": {
      "field": "name"
    }
  }
}

查询所有

POST my_doc/_search
{
  "query": {
    "match_all": {}
  }
}

// 查询
POST my_doc/_search
{
  "query": {
    "match_all": {}
  },
  "_source": [
      "id",
      "name",
      "desc"
    ]
    , "from": 0
    , "size": 20
}

POST my_doc/_search
{
  "query": {
    //"term": {
    //	"desc":"guank"  term关键字作用搜索条件不分词  match关键字作用搜索条件是分词的 
    //}
    //"terms":{
    //	"desc":["guank","is"]  terms关键字作用可以多个关键词匹配
    //}
    //"match_phrase": {
    //  "desc": {
    //    "query": "guank is"  match_phrase 关键字作用 先将查询条件进行分词 查询的结果必须包含查询条件 并且有顺序之分 guank 后面必须是 is 否则不会被查询出来
    //    "slop":3 slop 关键字的作用 在使用 match_phrase 下 词之间能跳过的词的数量 如 guank 和 is 之间存在3个其他词 
    //  }
    //"match":{
    //	"desc":{
    //		"query":"guank is",
    //		//"operator":"or" // operator 关键字 or: 或 and:guank 和 is 两者必须都满足
    //      "minimum_should_match":"60%" minimum_should_match 关键字 满足匹配度 60% 可以为整数 1 2 代表匹配其中的一个词语即可
    //	}	
    //}
    //{
    //	"ids":{
    //		"type":"_doc",
    //		"values":["1001","1002","1003"] // 批量查询 Ids
    //	}
    //}
  },
  "_source": [
      "id",
      "name",
      "desc"
    ]
    , "from": 0
    , "size": 20
}

条件查询

POST my_doc/_search
{
  "query": {
    "multi_match": {
      "query": "guank",
      "fields": [
        "desc" ,"nickname^10"  // 多个字段匹配 desc 或 nickname 关键字 ^10 提升相应字段的权重
      ]
    }
  },
  "_source": [
      "id",
      "name",
      "desc"
    ]
}

bool查询

POST my_doc/_search
{
  "query": {
    "bool": {
      "must": [ // must 关键字 must 中的多个条件都必须同时满足 should 关键字 should 中的条件是或关系  must_not 关键字条件中都不满足
        {
          "multi_match": { 
            "query": "guank",
            "fields": [
              "desc" ,"nickname" 
            ]
          }
        }
      ]
    }
  },
  "_source": [
      "id",
      "name",
      "desc"
    ]
}

为查询的条件加权重

POST my_doc/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "match": {
            "desc": {
              "query": "guank",
              "boost": 1
            }
          }
          
        },
        {
          "match": {
            "name": {
              "query": "guank"
            }
          }
        }
        
      ]
    }
  },
  "_source": [
      "id",
      "name",
      "desc"
    ]
}

过滤器

POST my_doc/_search
{
  "query": {
    "match": {
      "desc": "guank"
    }
  }
  , "post_filter": {
    "range": {
      "id": {
        "gte": 10,
        "lte": 2000
      }
    }
  }
}

排序

POST my_doc/_search
{
  "query": {
    "match": {
      "desc": "guank"
    }
  }
  , 
  "sort": [
    {
      "id": {
        "order": "desc"
      }
    }
  ]
  
}

关键字高亮

POST my_doc/_search
{
  "query": {
    "match": {
      "desc": "guank"
    }
  }
  , 
  "sort": [
    {
      "id": {
        "order": "desc"
      }
    }
  ],
  "highlight": {
    "pre_tags": ["<span>"], //自定义前缀标签
    "post_tags": ["</span>"], //自定义后缀标签
    "fields": {
      "desc": {} // 高亮属性
    } 
  }
}

ES 深度分页

在 es 中请求深度分页存在问题

POST my_doc/_search
{
  "query": {
    "match_all": {}
  },
  "_source": [
      "id",
      "name",
      "desc"
    ]
    , "from": 9999
    , "size": 20
}

9999 向后搜索 20 会发生报错信息,这种情况会在数据太过多时出现分页数太多

返回的信息如下:

{
  "error": {
    "root_cause": [
      {
        "type": "illegal_argument_exception",
        "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [10019]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
      }
    ],
    "type": "search_phase_execution_exception",
    "reason": "all shards failed",
    "phase": "query",
    "grouped": true,
    "failed_shards": [
      {
        "shard": 0,
        "index": "my_doc",
        "node": "WzLANTsAS5Ke_04rsfTqMw",
        "reason": {
          "type": "illegal_argument_exception",
          "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [10019]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
        }
      }
    ],
    "caused_by": {
      "type": "illegal_argument_exception",
      "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [10019]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.",
      "caused_by": {
        "type": "illegal_argument_exception",
        "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [10019]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
      }
    }
  },
  "status": 400
}

image-20220912141951144

es 的分页查询机制:

在所有的 shard 中查询出 10009 条数据并且合并成 30027 条数据 然后进行排序 返回 10 条数据

image-20220912145437338

淘宝解决 es深度分页的措施 :每页查询结果60条 最大分页为 80页 用来限制记录数

scroll 滚动搜索

对于大数据量的查询 es 推荐使用 scroll api 进行滚动搜索

注意 : 使用 scroll 进行搜索的时候 都是基于本次的快照数据

POST my_doc/_search?scroll=1m  // scroll=1m 表示此次 scroll 搜索持续时间为 1 min
{
  "query": {
    "match_all": {}
  },
  "_source": [
      "id",
      "name",
      "desc"
    ],
    "sort": [
      "_doc"
    ],
    "size": 5
}
--------------------------------------------------
//返回数据
{
  "_scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAABW0_cWV3pMQU5Uc0FTNUtlXzA0cnNmVHFNdw==",
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [
      {
        "_index" : "my_doc",
        "_type" : "_doc",
        "_id" : "1002",
        "_score" : null,
        "_source" : { },
        "sort" : [
          0
        ]
      },
      {
        "_index" : "my_doc",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : null,
        "_source" : { },
        "sort" : [
          1
        ]
      },
      {
        "_index" : "my_doc",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : null,
        "_source" : {
          "name" : "guank",
          "id" : 1001,
          "desc" : "guank is very good"
        },
        "sort" : [
          2
        ]
      }
    ]
  }
}

// 使用 scroll_id 和 scroll 继续进行本次搜索
POST _search/scroll
{
  "scroll_id" :"DXF1ZXJ5QW5kRmV0Y2gBAAAAAABW2ewWV3pMQU5Uc0FTNUtlXzA0cnNmVHFNdw==",
  "scroll":"3m"
}

批量查询

POST /my_doc/_mget
{
  "ids":["1002","1","2"]
}

批量操作 bulk

bulk操作和以往的普通请求格式有区别。不要格式化 json ,不然不在同一行了

{action: {metadata}}\n
{request body}\n
{action: {metadata}}\n
{request body}\n
  • {action: {metadata}}代表批量操作的类型,可以是新增、删除或修改
  • \n 是每行结尾必须填写的一个规范,每行包括最后一行都要填写,用于 es 的解析
  • {request body} 是请求 Body,增加和修改操作需要,删除操作则不需要
POST /_bulk
{"create":{"_index":"shop2","_type":"_doc","_id":"2001"}}
{"id":"2001","nickname":"guank-2001"}
{"create":{"_index":"shop2","_type":"_doc","_id":"2002"}}
{"id":"2002","nickname":"guank-2002"}
{"create":{"_index":"shop2","_type":"_doc","_id":"2003"}}
{"id":"2003","nickname":"guank-2003"}

注意最后必须用回车结尾

  • create 创建已有id文档在 url 中指定 index 和 type

    POST my_doc/_bulk
    {"create":{"_id":"2003"}}
    {"id":"2003","nickname":"name2003"}
    
  • index 创建文档 已有文档会被覆盖 没有则会新增

    POST my_doc/_bulk
    {"index":{"_id":"2004"}}
    {"id":"2004","nickname":"name2004"}
    
  • update 更新

    POST my_doc/_bulk
    {"update":{"_id":"2004"}}
    {"doc":{"id":"3004"}}
    
  • 批量删除

    POST my_doc/_bulk
    {"delete":{"_id":"2004"}}
    

bulk 批量请求的性能限制

整个批量请求都需要由接收到请求的节点加载到内存中,因此该请求越大,其他请求所能获得的内存就越少。 批量请求的大小有一个最佳值,大于这个值,性能将不再提升,甚至会下降。 但是最佳值不是一个固定的值。它完全取决于硬件、文档的大小和复杂度、索引和搜索的负载的整体情况。

幸运的是,很容易找到这个 最佳点 :通过批量索引典型文档,并不断增加批量大小进行尝试。 当性能开始下降,那么你的批量大小就太大了。一个好的办法是开始时将 1,000 到 5,000 个文档作为一个批次, 如果你的文档非常大,那么就减少批量的文档个数。

密切关注你的批量请求的物理大小往往非常有用,一千个 1KB 的文档是完全不同于一千个 1MB 文档所占的物理大小。 一个好的批量大小在开始处理后所占用的物理大小约为 5-15 MB。

Elasticsearch 集群

image-20220912204128405

同一分片的主分片和副分片不能处在同一个服务器上

配置文件

node.master: true  // 在节点的配置文件里设置为 true 代表该节点有被选举为 master 节点的资格
node.data: true  // 数据节点
discovery.seed_hosts: ["所有节点的 ip 地址","",""]
// 配置文件完成启动 es

Elasticsearch 集群分片

在 Elasticsearch 集群中 一主两从模式 从节点宕机后 该从节点的数据分片丢失 原本在主节点和另一从节点的数据分片会重新分配 ,在该宕机的从节点恢复之后重新加入集群,该从节点的数据分片则全是副本分片。

在主节点宕机的情况下,两个从节点经过选举出一个作为主节点,当宕机的主节点恢复后加入集群后,这个主节点会作为一个从节点而非重新成为主节点

Elasticsearch 集群脑裂问题

所谓脑裂问题(类似于精神分裂),就是同一个集群中的不同节点,对于集群的状态有了不一样的理解。将请求发向不同的节点之后,我却发现即使是总体状态是red的,但是可用的节点数量却不一致。

正常情况下,集群中的所有的节点,应该对集群中master的选择是一致的,这样获得的状态信息也应该是一致的,不一致的状态信息,说明不同的节点对master节点的选择出现了异常——也就是所谓的脑裂问题。这样的脑裂状态直接让节点失去了集群的正确状态,导致集群不能正常工作。

可能导致的原因:

1.网络原因:

由于是内网通信,网络通信问题造成某些节点认为master死掉,而另选master的可能性较小;进而检查Ganglia集群监控,也没有发现异常的内网流量,故此原因可以排除。

2. 节点负载:

由于master节点与data节点都是混合在一起的,所以当工作节点的负载较大(确实也较大)时,导致对应的ES实例停止响应,而这台服务器如果正充当着master节点的身份,那么一部分节点就会认为这个master节点失效了,故重新选举新的节点,这时就出现了脑裂;同时由于data节点上ES进程占用的内存较大,较大规模的内存回收操作也能造成ES进程失去响应。所以,这个原因的可能性应该是最大的。

应对问题的办法:

方法1:

对应于上面的分析,推测出原因应该是由于节点负载导致了master进程停止响应,继而导致了部分节点对于master的选择出现了分歧。为此,一个直观的解决方案便是将master节点与data节点分离。为此,我们添加了三台服务器进入ES集群,不过它们的角色只是master节点,不担任存储和搜索的角色,故它们是相对轻量级的进程。可以通过以下配置来限制其角色:

node.master: true  
node.data: false

当然,其它的节点就不能再担任master了,把上面的配置反过来即可。这样就做到了将master节点与data节点分离。当然,为了使新加入的节点快速确定master位置,可以将data节点的默认的master发现方式有multicast修改为unicast:

discovery.zen.ping.multicast.enabled: false  
discovery.zen.ping.unicast.hosts: ["master1", "master2", "master3"]
还有两个直观的参数可以减缓脑裂问题的出现:

discovery.zen.ping_timeout(默认值是3秒):默认情况下,一个节点会认为,如果master节点在3秒之内没有应答,那么这个节点就是死掉了,而增加这个值,会增加节点等待响应的时间,从一定程度上会减少误判。

discovery.zen.minimum_master_nodes(默认是1):这个参数控制的是,一个节点需要看到的具有master节点资格的最小数量,然后才能在集群中做操作。官方的推荐值是(N/2)+1,其中N是具有master资格的节点的数量(我们的情况是3,因此这个参数设置为2,但对于只有2个节点的情况,设置为2就有些问题了,一个节点DOWN掉后,你肯定连不上2台服务器了,这点需要注意)。

其中,参数 discovery.zen.minimum_master_nodes 在 Elasticsearch 版本 7.x 中被舍弃,这部分配置由 Elasticsearch 自己去实现和使用。

Elasticsearch 集群的文档读写原理

image-20220914002521494

image-20220914002709763

通过 elasticsearchRestTemplateElasticsearchRepository 使用Springboot 操作 Elasticsearch

@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(indexName = "student", shards = 5, replicas = 1)
public class Student {

    @Id
    private String id;

    @Field(store = true, type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
    private String name;

}
----------------------------------------------------------------------------------------------------------
@Repository
public interface StudentESDao extends ElasticsearchRepository<Student,String> {
}

----------------------------------------------------------------------------------------------------------
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class ESTest {

    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;

    @Autowired
    private StudentESDao studentESDao;
    @Test
    public void createIndex() {
        Student student = new Student();
        studentESDao.index(student);
        elasticsearchRestTemplate.createIndex(Student.class, student);
    }

    @Test
    public void deleteIndex() {
        elasticsearchRestTemplate.deleteIndex(Student.class);
    }

    @Test
    public void updateDoc() {
        Student student = new Student();
        student.setId("1002");
        UpdateRequest request = new UpdateRequest();
        request.index("student").id(student.getId());
        request.doc(JSON.toJSONString(student), XContentType.JSON);
        UpdateQuery query = new UpdateQueryBuilder().withClass(Student.class)
                .withId(student.getId())
                .withUpdateRequest(request)
                .build();
        elasticsearchRestTemplate.update(query);
    }

    @Test
    public void query() {
        Student student = new Student();
        student.setId("1002");
        Optional<Student> byId =
                studentESDao.findById(student.getId());
        GetQuery query = new GetQuery();
        query.setId(student.getId());
        Student student1 = elasticsearchRestTemplate.queryForObject(query, Student.class);

        byId.ifPresent(o -> log.info(JSON.toJSONString(o)));
    }

    @Test
    public void delete() {
        Student student = new Student();
        student.setId("1002");

        studentESDao.delete(student);
        elasticsearchRestTemplate.delete(Student.class, student.getId());
    }

    @Test
    public void search() {
        QueryBuilder queryBuilder = QueryBuilders.matchQuery("id","1001");
        //查询全部数据
//        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();

//        精确查询 =
//        QueryBuilder queryBuilder = QueryBuilders.termQuery("name", "lisi");

//        精确查询 多个 in
//        QueryBuilder queryBuilder = QueryBuilders.termsQuery("name", "张三", "lisi");

//        match匹配,会把查询条件进行分词,然后进行查询,多个词条之间是 or 的关系,可以指定分词
//        QueryBuilder queryBuilder = QueryBuilders.matchQuery("name", "张三");
//        QueryBuilder queryBuilder = QueryBuilders.matchQuery("name", "张三").analyzer("ik_max_word");

//        match匹配 查询多个字段
//        QueryBuilder queryBuilder = QueryBuilders.multiMatchQuery("男", "name", "sex");

//        fuzzy 模糊查询,返回包含与搜索字词相似的字词的文档。
//        QueryBuilder  queryBuilder = QueryBuilders.fuzzyQuery("name","lisx");

//        prefix 前缀检索
//        QueryBuilder  queryBuilder = QueryBuilders.prefixQuery("name","张");

//        wildcard 通配符检索
//        QueryBuilder  queryBuilder = QueryBuilders.wildcardQuery("name","张*");

//        regexp 正则查询
//        QueryBuilder queryBuilder = QueryBuilders.regexpQuery("name", "(张三)|(lisi)");

//        boost 评分权重,令满足某个条件的文档的得分更高,从而使得其排名更靠前。
//        queryBuilder.boost(2);
//        多条件构建
//        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
//        并且 and
//        queryBuilder.must(QueryBuilders.termQuery("name", "张三"));
//        queryBuilder.must(QueryBuilders.termQuery("sex", "女"));

//        或者 or
//        queryBuilder.should(QueryBuilders.termQuery("name", "张三"));
//        queryBuilder.should(QueryBuilders.termQuery("name", "lisi"));

//        不等于,去除
//        queryBuilder.mustNot(QueryBuilders.termQuery("name", "lisi"));

//        过滤数据
//        queryBuilder.filter(QueryBuilders.matchQuery("name", "张三"));

//        范围查询
        /*
            gt 大于 >
            gte 大于等于 >=
            lt 小于 <
            lte 小于等于 <=
        */
//        queryBuilder.filter(new RangeQueryBuilder("age").gt(10).lte(50));

//        构建分页,page 从0开始
        Pageable pageable = PageRequest.of(0, 3);
        SearchQuery searchQuery = new NativeSearchQueryBuilder()
                .withQuery(queryBuilder)
                .withPageable(pageable)
                //排序
                .withSort(SortBuilders.fieldSort("_score").order(SortOrder.DESC))
                //投影
                .withFields("name")
                .build();
        Page<Student> search = studentESDao.search(searchQuery);
        
    }

    @Test
    public void highLight() {
        String preTag = "<font color='red'>";
        String postTag = "</font>";
        QueryBuilder queryBuilder = QueryBuilders.matchQuery("id","1001");
        HighlightBuilder.Field field = new HighlightBuilder.Field("name").preTags(preTag).postTags(postTag);
        SearchQuery searchQuery = new NativeSearchQueryBuilder()
                .withQuery(queryBuilder)
                .withHighlightFields(field)
                //排序
                .withSort(SortBuilders.fieldSort("_score").order(SortOrder.DESC))
                //投影
                .withFields("name")
                .build();
        SearchResultMapper searchResultMapper = new SearchResultMapper() {
            @Override
            public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {

                List<Student> list = new ArrayList<>();

                SearchHits hits = response.getHits();
                for (SearchHit hit : hits) {
                    HighlightField highlightField = hit.getHighlightFields().get("name");
                    String name = Optional.ofNullable(highlightField.getFragments())
                            .map(texts -> Arrays.stream(texts).map(Text::toString)
                                    .collect(Collectors.toList()))
                            .orElse(Collections.emptyList()).stream().findFirst().orElse("");
                    Student stuHL = new Student();
                    stuHL.setName(name);
                    String id = (String) hit.getSourceAsMap().get("id");
                    stuHL.setId(id);

                    list.add(stuHL);
                }
                if (!CollectionUtils.isEmpty(list)) {
                    return new AggregatedPageImpl<>((List<T>) list);
                }
                return null;
            }

            @Override
            public <T> T mapSearchHit(SearchHit searchHit, Class<T> type) {
                return null;
            }
        };

        AggregatedPage<Student> list = elasticsearchRestTemplate.queryForPage(searchQuery, Student.class, searchResultMapper);
    }

}

Logstash 数据同步

  • 数据采集
  • 以 id 或 update_time 作为同步边界
  • logstash-input-jdbc 插件
  1. 自定义分词模板

    logstash-ik.json

    {
        "order": 0,
        "version": 1,
        "index_patterns": ["*"],
        "settings": {
            "index": {
                "refresh_interval": "5s"
            }
        },
        "mappings": {
            "_default_": {
                "dynamic_templates": [
                    {
                        "message_field": {
                            "path_match": "message",
                            "match_mapping_type": "string",
                            "mapping": {
                                "type": "text",
                                "norms": false
                            }
                        }
                    },
                    {
                        "string_fields": {
                            "match": "*",
                            "match_mapping_type": "string",
                            "mapping": {
                                "type": "text",
                                "norms": false,
                                "analyzer": "ik_max_word",
                                "fields": {
                                    "keyword": {
                                        "type": "keyword",
                                        "ignore_above": 256
                                    }
                                }
                            }
                        }
                    }
                ],
                "properties": {
                    "@timestamp": {
                        "type": "date"
                    },
                    "@version": {
                        "type": "keyword"
                    },
                    "geoip": {
                        "dynamic": true,
                        "properties": {
                            "ip": {
                                "type": "ip"
                            },
                            "location": {
                                "type": "geo_point"
                            },
                            "latitude": {
                                "type": "half_float"
                            },
                            "longitude": {
                                "type": "half_float"
                            }
                        }
                    }
                }
            }
        },
        "aliases": {}
    }
    
    
  2. 配置 Logstash

    logstash-db-sync.conf

    input {
        jdbc {
            # 设置 MySql/MariaDB 数据库url以及数据库名称
            jdbc_connection_string => "jdbc:mysql://192.168.1.6:3306/foodie-shop-dev?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true"
            # 用户名和密码
            jdbc_user => "root"
            jdbc_password => "root"
            # 数据库驱动所在位置,可以是绝对路径或者相对路径
            jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-5.1.41.jar"
            # 驱动类名
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            # 开启分页
            jdbc_paging_enabled => "true"
            # 分页每页数量,可以自定义
            jdbc_page_size => "1000"
            # 执行的sql文件路径
            statement_filepath => "/usr/local/logstash-6.4.3/sync/foodie-items.sql"
            # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
            schedule => "* * * * *"
            # 索引类型
            type => "_doc"
            # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
            use_column_value => true
            # 记录上一次追踪的结果值
            last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time"
            # 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间
            tracking_column => "updated_time"
            # tracking_column 对应字段的类型
            tracking_column_type => "timestamp"
            # 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
            clean_run => false
            # 数据库字段名称大写转小写
            lowercase_column_names => false
        }
    }
    output {
        elasticsearch {
            # es地址
            hosts => ["192.168.1.187:9200"]
            # 同步的索引名
            index => "foodie-items"
            # 设置_docID和数据相同
            document_id => "%{id}"
            # document_id => "%{itemId}"
    
            # 定义模板名称
            template_name => "myik"
            # 模板所在位置
            template => "/usr/local/logstash/sync/logstash-ik.json"
            # 重写模板
            template_overwirte => true
            # 默认为 true, false 关闭 logstash 自动管理模板功能,如果自定义模板,则设置为 false
            manage_template => false
        }
        # 日志输出
        stdout {
            codec => json_lines
        }
    }
    
  3. 启动Logstash

    cd logstash/bin
    ./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf
    
0

评论区