优秀的编程知识分享平台

网站首页 > 技术文章 正文

Go 程序利用 ElasticSearch 游标 Scroll 实现海量数据分页查询

nanyue 2024-10-14 11:34:39 技术文章 8 ℃

关注我的头条号:@Wooola,10 年 Java 软件开发及架构设计经验,专注于 Java、Go 语言、微服务架构,致力于每天分享原创文章、快乐编码和开源技术。

环境准备

  • ElasticSearch v6 集群环境
  • ES 第三方包
    1. github.com/olivere/elastic 文档以及测试用例比 go-elasticsearch 丰富
    2. github.com/elastic/go-elasticsearch

    本文 olivere/elastic 包进行 es 开发。

    ElasticSearch 配置读取

    编写 conf.ini 配置文件,添加 elasticsearch 集群地址

    [ES]
    EsAddrs = http://node1:9200,http://node2:9200,http://node3:9200

    定义 EsConf 结构体

    EsConf struct {
    		EsAddrs string
    }

    init 方法中,利用 goconfig 读取 confPath 参数指定的 conf.ini 配置文件路径。

    func init() {
    	var (
    		iniF string
    	)
    	flag.StringVar(&iniF, "confPath", "d:/conf.ini", "Set Configuration File")
    	flag.Parse()
    	var err error
    	IniF, err = goconfig.LoadConfigFile(iniF)
    	if err != nil {
    		os.Exit(1)
    	}
    	log.Println("Load conf.ini Success!")
    } 

    main 方法中读取 EsAddrs 参数值。

    EsConf := &conf.EsConf{
    		EsAddrs: GetStringValue(EsSection, EsAddrs, ""),
    	}
    
    func GetStringValue(section string, key string, defValue string) (r string) {
    	val, err := IniF.GetValue(section, key)
    	if err != nil {
    		return defValue
    	}
    	return val
    }

    初始化 elasticsearch 客户端实例。

    EsClient, err = NewClient()
    if err != nil {
    		panic(err)
    }
    
    func NewClient() (*elastic.Client, error) {
    	esConf := EsConf
    	if len(esConf.EsAddrs) == 0 {
    		panic("EsAddrs is Empty!")
    	}
    	esAddrs := strings.Split(esConf.EsAddrs, ",")
    	client, err := elastic.NewClient(elastic.SetURL(esAddrs...))
    	log.Println("ES initial successful!")
    	return client, err
    }

    ElasticSearch 游标(Scroll)分页

    时间查询需求:

    例如,查询  到 

    对应 es 时间格式为:

    # 开始时间
    st := 20171027000000
    # 结束时间
    et :=20200328235959

    初始化游标查询,设置游标每次查询 size 大小:5000 条,注意 sort 排序时,必须是 字段 + .keyword,否则查询会报错。

    boolQry := elastic.NewBoolQuery()
    boolQry.Must(elastic.NewRangeQuery("updateDate").From(st).To(et))
    res, err := EsClient.Scroll().Index(EsHisPendDB).Type(EsHisPendType).
    			Query(boolQry).
    			Sort("updateDate.keyword", false).
    			Scroll("5m").
    			Size(Conf.ExpConf.ExpPageSize).
    			Do(context.Background())
    
    # 抽取数据
    
    
    pendingList := extractPendList(*res.Hits)

    使用 for 循环读取所有数据,直到 len(res.Hits.Hits) 大小为 0,说明 elasticsearch 查询读取完毕,停止 for 循环。

    for {
    			res, err := EsClient.Scroll("1m").ScrollId(scrollId).Do(context.TODO())		 
    			pendingList := extractPendList(*res.Hits)			 
    }
    if len(res.Hits.Hits) <= 0 {
    				break
    			}

    输出效果

    其中使用 Scroll 游标第一次查询会产生 SrcollID 值。

    DnF1ZXJ5VGhlbkZldGNoBQAAAAAABDKbFldaYi1nOUhBVF8yZHNfZkJISllSMEEAAAAAAAMrlhZMRWZNdVlNaFFiQ1pjdkFiNHBrMW1BAAAAAAAEMpwWV1piLWc5SEFUXzJkc19mQkhKWVIwQQAAAAAABDKdFldaYi1nOUhBVF8yZHNfZkJISllSMEEAAAAAAAMwQhY4YUFDNVNVNlJsRzc4NFVYRnRyZ2xB
    最近发表
    标签列表