|
1 | 1 | # frozen_string_literal: true |
2 | 2 |
|
3 | 3 | require 'rsolr' |
| 4 | +require 'faraday/retry' |
4 | 5 | require 'faraday/net_http_persistent' |
| 6 | +require 'geo_combine/logger' |
5 | 7 |
|
6 | 8 | module GeoCombine |
7 | 9 | # Indexes Geoblacklight documents into Solr |
8 | 10 | class Indexer |
9 | 11 | attr_reader :solr |
10 | 12 |
|
11 | | - def self.solr(url: ENV.fetch('SOLR_URL', 'http://127.0.0.1:8983/solr/blacklight-core')) |
12 | | - RSolr.connect url: url, adapter: :net_http_persistent |
| 13 | + def initialize(solr: nil, logger: GeoCombine::Logger.logger) |
| 14 | + @logger = logger |
| 15 | + @batch_size = ENV.fetch('SOLR_BATCH_SIZE', 100).to_i |
| 16 | + |
| 17 | + # If SOLR_URL is set, use it; if in a Geoblacklight app, use its solr core |
| 18 | + solr_url = ENV.fetch('SOLR_URL', nil) |
| 19 | + solr_url ||= Blacklight.default_index.connection.base_uri.to_s if defined? Blacklight |
| 20 | + |
| 21 | + # If neither, warn and try to use local Blacklight default solr core |
| 22 | + if solr_url.nil? |
| 23 | + @logger.warn 'SOLR_URL not set; using Blacklight default' |
| 24 | + solr_url = 'http://localhost:8983/solr/blacklight-core' |
| 25 | + end |
| 26 | + |
| 27 | + @solr = solr || RSolr.connect(client, url: solr_url) |
13 | 28 | end |
14 | 29 |
|
15 | | - def initialize(solr: GeoCombine::Indexer.solr) |
16 | | - @solr = solr |
| 30 | + # Index everything and return the number of docs successfully indexed |
| 31 | + def index(docs) |
| 32 | + # Track total indexed and time spent |
| 33 | + @logger.info "indexing into #{solr_url}" |
| 34 | + total_indexed = 0 |
| 35 | + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 36 | + |
| 37 | + # Index in batches; set batch size via BATCH_SIZE |
| 38 | + batch = [] |
| 39 | + docs.each do |doc, path| |
| 40 | + if batch.size < @batch_size |
| 41 | + batch << [doc, path] |
| 42 | + else |
| 43 | + total_indexed += index_batch(batch) |
| 44 | + batch = [] |
| 45 | + end |
| 46 | + end |
| 47 | + total_indexed += index_batch(batch) unless batch.empty? |
| 48 | + |
| 49 | + # Issue a commit to make sure all documents are indexed |
| 50 | + @solr.commit |
| 51 | + end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 52 | + sec = end_time - start_time |
| 53 | + @logger.info format('indexed %<total_indexed>d documents in %<sec>.2f seconds', total_indexed:, sec:) |
| 54 | + total_indexed |
17 | 55 | end |
18 | 56 |
|
| 57 | + # URL to the solr instance being used |
19 | 58 | def solr_url |
20 | 59 | @solr.options[:url] |
21 | 60 | end |
22 | 61 |
|
23 | | - # Index everything and return the number of docs successfully indexed |
24 | | - def index(docs, commit_within: ENV.fetch('SOLR_COMMIT_WITHIN', 5000).to_i) |
25 | | - indexed_count = 0 |
26 | | - |
27 | | - docs.each do |record, path| |
28 | | - # log the unique identifier for the record for debugging |
29 | | - id = record['id'] || record['dc_identifier_s'] |
30 | | - puts "Indexing #{id}: #{path}" if $DEBUG |
31 | | - |
32 | | - # index the record into solr |
33 | | - @solr.update params: { commitWithin: commit_within, overwrite: true }, |
34 | | - data: [record].to_json, |
35 | | - headers: { 'Content-Type' => 'application/json' } |
36 | | - |
37 | | - # count the number of records successfully indexed |
38 | | - indexed_count += 1 |
39 | | - rescue RSolr::Error::Http => e |
40 | | - puts e |
41 | | - end |
| 62 | + private |
42 | 63 |
|
43 | | - @solr.commit |
44 | | - indexed_count |
| 64 | + # Index a batch of documents; if we fail, index them all individually |
| 65 | + def index_batch(batch) |
| 66 | + docs = batch.map(&:first) |
| 67 | + @solr.update(data: batch_json(docs), params:, headers:) |
| 68 | + @logger.debug "indexed batch (#{batch.size} docs)" |
| 69 | + batch.size |
| 70 | + rescue RSolr::Error::Http => e |
| 71 | + @logger.error "error indexing batch (#{batch.size} docs): #{format_error(e)}" |
| 72 | + @logger.warn 'retrying documents individually' |
| 73 | + batch.map { |doc, path| index_single(doc, path) }.compact.size |
| 74 | + end |
| 75 | + |
| 76 | + # Index a single document; if it fails, log the error and continue |
| 77 | + def index_single(doc, path) |
| 78 | + @solr.add(doc, params:, headers:) |
| 79 | + @logger.debug "indexed #{path}" |
| 80 | + doc |
| 81 | + rescue RSolr::Error::Http => e |
| 82 | + @logger.error "error indexing #{path}: #{format_error(e)}" |
| 83 | + nil |
| 84 | + end |
| 85 | + |
| 86 | + # Generate a JSON string to send to solr update API for a batch of documents |
| 87 | + def batch_json(batch) |
| 88 | + batch.map { |doc| "add: { doc: #{doc.to_json} }" }.join(",\n").prepend('{ ').concat(' }') |
| 89 | + end |
| 90 | + |
| 91 | + # Generate a friendly error message for logging including status code and message |
| 92 | + def format_error(error) |
| 93 | + code = error.response[:status] |
| 94 | + status_info = "#{code} #{RSolr::Error::Http::STATUS_CODES[code.to_i]}" |
| 95 | + error_info = parse_solr_error(error) |
| 96 | + [status_info, error_info].compact.join(' - ') |
| 97 | + end |
| 98 | + |
| 99 | + # Extract the specific error message from a solr JSON error response, if any |
| 100 | + def parse_solr_error(error) |
| 101 | + JSON.parse(error.response[:body]).dig('error', 'msg') |
| 102 | + rescue StandardError |
| 103 | + nil |
| 104 | + end |
| 105 | + |
| 106 | + def headers |
| 107 | + { 'Content-Type' => 'application/json' } |
| 108 | + end |
| 109 | + |
| 110 | + def params |
| 111 | + { overwrite: true } |
| 112 | + end |
| 113 | + |
| 114 | + def client |
| 115 | + @client ||= Faraday.new do |conn| |
| 116 | + conn.request :retry, max: 3, interval: 1, backoff_factor: 2, exceptions: [ |
| 117 | + Faraday::TimeoutError, |
| 118 | + Faraday::ConnectionFailed, |
| 119 | + Faraday::TooManyRequestsError |
| 120 | + ] |
| 121 | + conn.response :raise_error |
| 122 | + conn.adapter :net_http_persistent |
| 123 | + end |
45 | 124 | end |
46 | 125 | end |
47 | 126 | end |
0 commit comments