Struggles when migrating from ES 6.8 to ES 7.5

What problems I encounter when migrating from ES 6.8 to ES 7.5 and how to avoid them.

April 26, 2020 - 7 minute read -
F# ElasticSearch

Hi,

in an application that I develop from day to day, I encounter a problem with the migration of data from ES 6.8.2 to ES 7.5.1. The normal attitude in our application would be to run re-indexation, which fetches data from multiple sources and index them in a cluster. But because our application works 24/7 because of clients all around the globe. There were situations that re-indexation fails for some companies, for some reason. Because we want to have an option to react immediately in a situation like that, we decided to write an ad hoc migrator. Which takes data from the old cluster (6.8.2) and “move” them to the new one (7.5.2). Worth noting is also that the new cluster was located on a new server, not the same as the old one.

This isn’t a hard task, but we have to keep in mind that there would be some breaking changes between major releases of ES. Because of that, we decided that the easiest solution would be to create an application that would consist of 2 services. The first one would be responsible for gathering data from old elastic -> mapping them to some DTO -> sending them to the second service. While the second service would receive them, map them, and push them to the new cluster. Sounds easy right? So let’s look at the code.

The structure of a project looks as follows. We have 2 subfolders named ES6 and ES7 which were aligned to the services responsible for gathering/inserting data from/to clusters. We started from a service whose main task was to fetch data. We have here a Fetcher module:

module Fetcher =
  type Type =
    | Init
    | Next of string
    | Last

  let private client conString =
    let url = Uri(conString)
    let pool = new SingleNodeConnectionPool(url)
    let builder = new ConnectionSettings(pool)
    let con = builder
      .ThrowExceptions()
      .RequestTimeout(TimeSpan(0, 0, 30))
      .MaxRetryTimeout(TimeSpan(0, 0, 62))
      .MaximumRetries(2)
      .EnableHttpCompression()
      .EnableHttpPipelining(false)
      .PrettyJson()
      .EnableTcpKeepAlive(TimeSpan(0, 10, 0), TimeSpan(0, 0, 10))
    ElasticClient con

  let fetchFor conString companyId ``type`` =
    let resolveType (docs: IReadOnlyCollection<SomeModel>) scrollId =
      if docs.Count > 0 then
        Next scrollId
      else Last

    match ``type`` with
    | Init ->
      async {
        let! result =
          (client conString)
            .SearchAsync<SomeModel>(
              fun (s:SearchDescriptor<SomeModel>) ->
                new SearchRequest(
                  Size = (Nullable<int> 1000),
                  From = (Nullable<int>0),
                  Scroll = Time(TimeSpan.FromMinutes(1.)),
                  Query = new QueryContainer(query = BoolQuery(Should = [
                    new QueryContainer(query = new TermQuery(
                      Field = new Field("companyId"), Value = companyId))
                    ])
                  )
                ) :> ISearchRequest) |> Async.AwaitTask

        let t = resolveType result.Documents result.ScrollId
        return (t, result.Documents)
      }
      | Next scrollId ->
        async {
          let! result =
            (client conString)
              .ScrollAsync<SomeModel>(Time(TimeSpan.FromMinutes(1.)), scrollId)
              |> Async.AwaitTask

          let t = resolveType result.Documents result.ScrollId
          return (t, result.Documents)
        }
        | Last -> async { return (Last, new List<SomeModel>() :> IReadOnlyCollection<SomeModel>) }

As we can see above. In case of an initial request, we use a SearchAsync method, which would return n first records and a scrollId needed for gathering the next bunch of data.

When we have all the data, we check if we would need another call for data, or it is the last call for them.

If more data needs to be fetched we are using a ScrollAsync method to which we are passing previously gathered scrollId.

That’s all for the Fetcher module. The next one is a module responsible for mapping data to DTO.

But we gonna skip code example here, since it’s a normal mapping without any logic inside. The next block is a Migration module.

let run (client: HttpClient) data =
  let serialized = JsonConvert.SerializeObject(data)
    async {
      let! result = client.PostAsync("http://someaddress/migration", new StringContent(serialized, Text.Encoding.UTF8, "application/json")) |> Async.AwaitTask
      if result.StatusCode = HttpStatusCode.OK then
        return Microsoft.FSharp.Core.Result.Ok ()
      else
        let! content = result.Content.ReadAsStringAsync() |> Async.AwaitTask
        return Microsoft.FSharp.Core.Result.Error (content)
    }

Which send mapped data via HTTP to the second service and return error information if there is some. The whole code is combined in an Application module. As you can see we don’t combine all of the data related to a company, because we could get a couple of thousands/millions records and that could result in some performance issues.

Going to the ES7 service. The first thing we are doing is mapping from a DTO to our domain model in a Mapping module.

As previously, there is no logic here, so we gonna skip the borring stuff. Only assigning values to properties/fields. After we are done with the mapping we “send” them to the Uploader module.

Which is responsible for doing a bulk operation on an ES cluster and based on a result from a bulk operation, return information to an ES6 service.

module Uploader =
  let private client conString =
    let url = Uri(conString)
    use pool = new SingleNodeConnectionPool(url)
    let builder = new ConnectionSettings(pool)
    let con =
      builder
        .ThrowExceptions()
        .RequestTimeout(TimeSpan(0, 0, 30))
        .MaxRetryTimeout(TimeSpan(0, 0, 62))
        .MaximumRetries(2)
        .EnableHttpCompression()
        .EnableHttpPipelining(false)
        .PrettyJson()
        .EnableTcpKeepAlive(TimeSpan(0, 10, 0), TimeSpan(0, 0, 10))
        .DefaultIndex("dashboard")
        .EnableDebugMode()
    ElasticClient con

  let upload conString docs =
    let toUpload =
      docs
        |> Array.map (
          fun (d: ES7.Model.SomeModel) ->
            let b = BulkCreateOperation<ES7.Model.SomeModel>(d)
            b.Id <- Id(d.IndexId)
            b.RetriesOnConflict <- Nullable<int>(1)
            b :> IBulkOperation
        )
        |> fun x -> new BulkOperationsCollection<IBulkOperation>(x)
      let request = BulkRequest()
      request.Operations <- toUpload
      async {
        let! result = (client conString).BulkAsync(request) |> Async.AwaitTask
        if result.Errors then
          return Some (result.ItemsWithErrors |> Seq.map (fun x -> x.Error.Reason))
        else
          return None
      }

To sum up, in a simple way we could write an application that would transmit some data from one to another cluster is fairly time (a couple of seconds for companies with hundreds of thousands of records). Is this a solution I would recommend to anyone? No, there is a build-in snapshot option that would be perfect for such occasion, then we could simply reindex all “companies” only for missing data. Beyond of snapshot option there is also a built-in reindex. Also, the reindex application should be stable and shouldn’t fail. But thanks to that small bastard we were able to move data for 2 companies on a production environment in a couple of seconds because the reindex process fails for them ~3 times and a fix could take up to 10-15 minutes to release it.

Thanks for reading!