Filename | /opt/flows/lib/lib/perl5/Search/Elasticsearch/Bulk.pm |
Statements | Executed 16 statements in 1.07ms |
Calls | P | F | Exclusive Time |
Inclusive Time |
Subroutine |
---|---|---|---|---|---|
1 | 1 | 1 | 18µs | 351µs | BEGIN@3 | Search::Elasticsearch::Bulk::
1 | 1 | 1 | 12µs | 14µs | flush | Search::Elasticsearch::Bulk::
1 | 1 | 1 | 11µs | 49µs | BEGIN@7 | Search::Elasticsearch::Bulk::
1 | 1 | 1 | 10µs | 235µs | BEGIN@6 | Search::Elasticsearch::Bulk::
1 | 1 | 1 | 10µs | 255µs | BEGIN@8 | Search::Elasticsearch::Bulk::
1 | 1 | 1 | 7µs | 48µs | __ANON__[:3] | Search::Elasticsearch::Bulk::
0 | 0 | 0 | 0s | 0s | __ANON__[:56] | Search::Elasticsearch::Bulk::
0 | 0 | 0 | 0s | 0s | __ANON__[:63] | Search::Elasticsearch::Bulk::
0 | 0 | 0 | 0s | 0s | __ANON__[:89] | Search::Elasticsearch::Bulk::
0 | 0 | 0 | 0s | 0s | add_action | Search::Elasticsearch::Bulk::
0 | 0 | 0 | 0s | 0s | reindex | Search::Elasticsearch::Bulk::
Line | State ments |
Time on line |
Calls | Time in subs |
Code |
---|---|---|---|---|---|
1 | package Search::Elasticsearch::Bulk; | ||||
2 | 1 | 400ns | $Search::Elasticsearch::Bulk::VERSION = '2.02'; | ||
3 | 4 | 59µs | 3 | 726µs | # spent 48µs (7+41) within Search::Elasticsearch::Bulk::__ANON__[/opt/flows/lib/lib/perl5/Search/Elasticsearch/Bulk.pm:3] which was called:
# once (7µs+41µs) by import::into at line 34 of Import/Into.pm
# spent 351µs (18+334) within Search::Elasticsearch::Bulk::BEGIN@3 which was called:
# once (18µs+334µs) by Module::Runtime::require_module at line 3 # spent 351µs making 1 call to Search::Elasticsearch::Bulk::BEGIN@3
# spent 334µs making 1 call to Moo::import
# spent 41µs making 1 call to strictures::import |
4 | 1 | 1µs | 1 | 5.89ms | with 'Search::Elasticsearch::Role::Bulk', # spent 5.89ms making 1 call to Moo::with |
5 | 'Search::Elasticsearch::Role::Is_Sync'; | ||||
6 | 2 | 38µs | 2 | 459µs | # spent 235µs (10+224) within Search::Elasticsearch::Bulk::BEGIN@6 which was called:
# once (10µs+224µs) by Module::Runtime::require_module at line 6 # spent 235µs making 1 call to Search::Elasticsearch::Bulk::BEGIN@6
# spent 224µs making 1 call to Sub::Exporter::__ANON__[/opt/flows/lib/lib/perl5/Sub/Exporter.pm:337] |
7 | 2 | 31µs | 2 | 87µs | # spent 49µs (11+38) within Search::Elasticsearch::Bulk::BEGIN@7 which was called:
# once (11µs+38µs) by Module::Runtime::require_module at line 7 # spent 49µs making 1 call to Search::Elasticsearch::Bulk::BEGIN@7
# spent 38µs making 1 call to Exporter::import |
8 | 2 | 911µs | 2 | 501µs | # spent 255µs (10+246) within Search::Elasticsearch::Bulk::BEGIN@8 which was called:
# once (10µs+246µs) by Module::Runtime::require_module at line 8 # spent 255µs making 1 call to Search::Elasticsearch::Bulk::BEGIN@8
# spent 246µs making 1 call to namespace::clean::import |
9 | |||||
10 | #=================================== | ||||
11 | sub add_action { | ||||
12 | #=================================== | ||||
13 | my $self = shift; | ||||
14 | my $buffer = $self->_buffer; | ||||
15 | my $max_size = $self->max_size; | ||||
16 | my $max_count = $self->max_count; | ||||
17 | my $max_time = $self->max_time; | ||||
18 | |||||
19 | while (@_) { | ||||
20 | my @json = $self->_encode_action( splice( @_, 0, 2 ) ); | ||||
21 | |||||
22 | push @$buffer, @json; | ||||
23 | |||||
24 | my $size = $self->_buffer_size; | ||||
25 | $size += length($_) + 1 for @json; | ||||
26 | $self->_buffer_size($size); | ||||
27 | |||||
28 | my $count = $self->_buffer_count( $self->_buffer_count + 1 ); | ||||
29 | |||||
30 | $self->flush | ||||
31 | if ( $max_size and $size >= $max_size ) | ||||
32 | || ( $max_count and $count >= $max_count ) | ||||
33 | || ( $max_time and time >= $self->_last_flush + $max_time ); | ||||
34 | } | ||||
35 | return 1; | ||||
36 | } | ||||
37 | |||||
38 | #=================================== | ||||
39 | # spent 14µs (12+2) within Search::Elasticsearch::Bulk::flush which was called:
# once (12µs+2µs) by main::RUNTIME at line 116 of flows_to_es.pl | ||||
40 | #=================================== | ||||
41 | 1 | 300ns | my $self = shift; | ||
42 | 1 | 7µs | 1 | 1µs | $self->_last_flush(time); # spent 1µs making 1 call to Search::Elasticsearch::Role::Bulk::_last_flush |
43 | |||||
44 | 1 | 8µs | 1 | 600ns | return { items => [] } # spent 600ns making 1 call to Search::Elasticsearch::Role::Bulk::_buffer_size |
45 | unless $self->_buffer_size; | ||||
46 | |||||
47 | if ( $self->verbose ) { | ||||
48 | local $| = 1; | ||||
49 | print "."; | ||||
50 | } | ||||
51 | my $buffer = $self->_buffer; | ||||
52 | my $results = try { | ||||
53 | my $res = $self->es->bulk( %{ $self->_bulk_args }, body => $buffer ); | ||||
54 | $self->clear_buffer; | ||||
55 | return $res; | ||||
56 | } | ||||
57 | catch { | ||||
58 | my $error = $_; | ||||
59 | $self->clear_buffer | ||||
60 | unless $error->is( 'Cxn', 'NoNodes' ); | ||||
61 | |||||
62 | die $error; | ||||
63 | }; | ||||
64 | $self->_report( $buffer, $results ); | ||||
65 | return defined wantarray ? $results : undef; | ||||
66 | } | ||||
67 | |||||
68 | #=================================== | ||||
69 | sub reindex { | ||||
70 | #=================================== | ||||
71 | my ( $self, $params ) = parse_params(@_); | ||||
72 | my $src = $params->{source} | ||||
73 | or throw( 'Param', "Missing required param <source>" ); | ||||
74 | |||||
75 | my $transform = $self->_doc_transformer($params); | ||||
76 | |||||
77 | if ( ref $src eq 'HASH' ) { | ||||
78 | $src = {%$src}; | ||||
79 | my $es = delete $src->{es} || $self->es; | ||||
80 | my $scroll = $es->scroll_helper( | ||||
81 | search_type => 'scan', | ||||
82 | size => 500, | ||||
83 | %$src | ||||
84 | ); | ||||
85 | |||||
86 | $src = sub { | ||||
87 | $scroll->refill_buffer; | ||||
88 | $scroll->drain_buffer; | ||||
89 | }; | ||||
90 | |||||
91 | print "Reindexing " . $scroll->total . " docs\n" | ||||
92 | if $self->verbose; | ||||
93 | } | ||||
94 | |||||
95 | while ( my @docs = grep {defined} $src->() ) { | ||||
96 | $self->index( grep {$_} map { $transform->($_) } @docs ); | ||||
97 | } | ||||
98 | $self->flush; | ||||
99 | return 1; | ||||
100 | } | ||||
101 | |||||
102 | 1 | 5µs | 1; | ||
103 | |||||
104 | =pod | ||||
105 | |||||
106 | =encoding UTF-8 | ||||
107 | |||||
108 | =head1 NAME | ||||
109 | |||||
110 | Search::Elasticsearch::Bulk - A helper module for the Bulk API and for reindexing | ||||
111 | |||||
112 | =head1 VERSION | ||||
113 | |||||
114 | version 2.02 | ||||
115 | |||||
116 | =head1 SYNOPSIS | ||||
117 | |||||
118 | use Search::Elasticsearch; | ||||
119 | |||||
120 | my $es = Search::Elasticsearch->new; | ||||
121 | my $bulk = $es->bulk_helper( | ||||
122 | index => 'my_index', | ||||
123 | type => 'my_type' | ||||
124 | ); | ||||
125 | |||||
126 | # Index docs: | ||||
127 | $bulk->index({ id => 1, source => { foo => 'bar' }}); | ||||
128 | $bulk->add_action( index => { id => 1, source => { foo=> 'bar' }}); | ||||
129 | |||||
130 | # Create docs: | ||||
131 | $bulk->create({ id => 1, source => { foo => 'bar' }}); | ||||
132 | $bulk->add_action( create => { id => 1, source => { foo=> 'bar' }}); | ||||
133 | $bulk->create_docs({ foo => 'bar' }) | ||||
134 | |||||
135 | # Delete docs: | ||||
136 | $bulk->delete({ id => 1}); | ||||
137 | $bulk->add_action( delete => { id => 1 }); | ||||
138 | $bulk->delete_ids(1,2,3) | ||||
139 | |||||
140 | # Update docs: | ||||
141 | $bulk->update({ id => 1, script => '...' }); | ||||
142 | $bulk->add_action( update => { id => 1, script => '...' }); | ||||
143 | |||||
144 | # Manual flush | ||||
145 | $bulk->flush; | ||||
146 | |||||
147 | # Reindex docs: | ||||
148 | my $bulk = $es->bulk_helper( | ||||
149 | index => 'new_index', | ||||
150 | verbose => 1 | ||||
151 | ); | ||||
152 | |||||
153 | $bulk->reindex( source => { index => 'old_index' }); | ||||
154 | |||||
155 | =head1 DESCRIPTION | ||||
156 | |||||
157 | This module provides a wrapper for the L<Search::Elasticsearch::Client::2_0::Direct/bulk()> | ||||
158 | method which makes it easier to run multiple create, index, update or delete | ||||
159 | actions in a single request. It also provides a simple interface | ||||
160 | for L<reindexing documents|/REINDEXING DOCUMENTS>. | ||||
161 | |||||
162 | The L<Search::Elasticsearch::Bulk> module acts as a queue, buffering up actions | ||||
163 | until it reaches a maximum count of actions, or a maximum size of JSON request | ||||
164 | body, at which point it issues a C<bulk()> request. | ||||
165 | |||||
166 | Once you have finished adding actions, call L</flush()> to force the final | ||||
167 | C<bulk()> request on the items left in the queue. | ||||
168 | |||||
169 | This class does L<Search::Elasticsearch::Role::Bulk> and | ||||
170 | L<Search::Elasticsearch::Role::Is_Sync>. | ||||
171 | |||||
172 | =head1 CREATING A NEW INSTANCE | ||||
173 | |||||
174 | =head2 C<new()> | ||||
175 | |||||
176 | my $bulk = $es->bulk_helper( | ||||
177 | |||||
178 | index => 'default_index', # optional | ||||
179 | type => 'default_type', # optional | ||||
180 | %other_bulk_params # optional | ||||
181 | |||||
182 | max_count => 1_000, # optional | ||||
183 | max_size => 1_000_000, # optional | ||||
184 | max_time => 5, # optional | ||||
185 | |||||
186 | verbose => 0 | 1, # optional | ||||
187 | |||||
188 | on_success => sub {...}, # optional | ||||
189 | on_error => sub {...}, # optional | ||||
190 | on_conflict => sub {...}, # optional | ||||
191 | |||||
192 | |||||
193 | ); | ||||
194 | |||||
195 | The C<new()> method returns a new C<$bulk> object. You must pass your | ||||
196 | Search::Elasticsearch client as the C<es> argument. | ||||
197 | |||||
198 | The C<index> and C<type> parameters provide default values for | ||||
199 | C<index> and C<type>, which can be overridden in each action. | ||||
200 | You can also pass any other values which are accepted | ||||
201 | by the L<bulk()|Search::Elasticsearch::Client::2_0::Direct/bulk()> method. | ||||
202 | |||||
203 | See L</flush()> for more information about the other parameters. | ||||
204 | |||||
205 | =head1 FLUSHING THE BUFFER | ||||
206 | |||||
207 | =head2 C<flush()> | ||||
208 | |||||
209 | $result = $bulk->flush; | ||||
210 | |||||
211 | The C<flush()> method sends all buffered actions to Elasticsearch using | ||||
212 | a L<bulk()|Search::Elasticsearch::Client::2_0::Direct/bulk()> request. | ||||
213 | |||||
214 | =head2 Auto-flushing | ||||
215 | |||||
216 | An automatic L</flush()> is triggered whenever the C<max_count>, C<max_size>, | ||||
217 | or C<max_time> threshold is breached. This causes all actions in the buffer to be | ||||
218 | sent to Elasticsearch. | ||||
219 | |||||
220 | =over | ||||
221 | |||||
222 | =item * C<max_count> | ||||
223 | |||||
224 | The maximum number of actions to allow before triggering a L</flush()>. | ||||
225 | This can be disabled by setting C<max_count> to C<0>. Defaults to | ||||
226 | C<1,000>. | ||||
227 | |||||
228 | =item * C<max_size> | ||||
229 | |||||
230 | The maximum size of JSON request body to allow before triggering a | ||||
231 | L</flush()>. This can be disabled by setting C<max_size> to C<0>. Defaults | ||||
232 | to C<1_000,000> bytes. | ||||
233 | |||||
234 | =item * C<max_time> | ||||
235 | |||||
236 | The maximum number of seconds to wait before triggering a flush. Defaults | ||||
237 | to C<0> seconds, which means that it is disabled. B<Note:> This timeout | ||||
238 | is only triggered when new items are added to the queue, not in the background. | ||||
239 | |||||
240 | =back | ||||
241 | |||||
242 | =head2 Errors when flushing | ||||
243 | |||||
244 | There are two types of error which can be thrown when L</flush()> | ||||
245 | is called, either manually or automatically. | ||||
246 | |||||
247 | =over | ||||
248 | |||||
249 | =item * Temporary Elasticsearch errors | ||||
250 | |||||
251 | A C<Cxn> error like a C<NoNodes> error which indicates that your cluster is down. | ||||
252 | These errors do not clear the buffer, as they can be retried later on. | ||||
253 | |||||
254 | =item * Action errors | ||||
255 | |||||
256 | Individual actions may fail. For instance, a C<create> action will fail | ||||
257 | if a document with the same C<index>, C<type> and C<id> already exists. | ||||
258 | These action errors are reported via L<callbacks|/Using callbacks>. | ||||
259 | |||||
260 | =back | ||||
261 | |||||
262 | =head2 Using callbacks | ||||
263 | |||||
264 | By default, any I<Action errors> (see above) cause warnings to be | ||||
265 | written to C<STDERR>. However, you can use the C<on_error>, C<on_conflict> | ||||
266 | and C<on_success> callbacks for more fine-grained control. | ||||
267 | |||||
268 | All callbacks receive the following arguments: | ||||
269 | |||||
270 | =over | ||||
271 | |||||
272 | =item C<$action> | ||||
273 | |||||
274 | The name of the action, ie C<index>, C<create>, C<update> or C<delete>. | ||||
275 | |||||
276 | =item C<$response> | ||||
277 | |||||
278 | The response that Elasticsearch returned for this action. | ||||
279 | |||||
280 | =item C<$i> | ||||
281 | |||||
282 | The index of the action, ie the first action in the flush request | ||||
283 | will have C<$i> set to C<0>, the second will have C<$i> set to C<1> etc. | ||||
284 | |||||
285 | =back | ||||
286 | |||||
287 | =head3 C<on_success> | ||||
288 | |||||
289 | my $bulk = $es->bulk_helper( | ||||
290 | on_success => sub { | ||||
291 | my ($action,$response,$i) = @_; | ||||
292 | # do something | ||||
293 | }, | ||||
294 | ); | ||||
295 | |||||
296 | The C<on_success> callback is called for every action that has a successful | ||||
297 | response. | ||||
298 | |||||
299 | =head3 C<on_conflict> | ||||
300 | |||||
301 | my $bulk = $es->bulk_helper( | ||||
302 | on_conflict => sub { | ||||
303 | my ($action,$response,$i,$version) = @_; | ||||
304 | # do something | ||||
305 | }, | ||||
306 | ); | ||||
307 | |||||
308 | The C<on_conflict> callback is called for actions that have triggered | ||||
309 | a C<Conflict> error, eg trying to C<create> a document which already | ||||
310 | exists. The C<$version> argument will contain the version number | ||||
311 | of the document currently stored in Elasticsearch (if found). | ||||
312 | |||||
313 | =head3 C<on_error> | ||||
314 | |||||
315 | my $bulk = $es->bulk_helper( | ||||
316 | on_error => sub { | ||||
317 | my ($action,$response,$i) = @_; | ||||
318 | # do something | ||||
319 | }, | ||||
320 | ); | ||||
321 | |||||
322 | The C<on_error> callback is called for any error (unless the C<on_conflict>) | ||||
323 | callback has already been called). | ||||
324 | |||||
325 | =head2 Disabling callbacks and autoflush | ||||
326 | |||||
327 | If you want to be in control of flushing, and you just want to receive | ||||
328 | the raw response that Elasticsearch sends instead of using callbacks, | ||||
329 | then you can do so as follows: | ||||
330 | |||||
331 | my $bulk = $es->bulk_helper( | ||||
332 | max_count => 0, | ||||
333 | max_size => 0, | ||||
334 | on_error => undef | ||||
335 | ); | ||||
336 | |||||
337 | $bulk->add_actions(....); | ||||
338 | $response = $bulk->flush; | ||||
339 | |||||
340 | =head1 CREATE, INDEX, UPDATE, DELETE | ||||
341 | |||||
342 | =head2 C<add_action()> | ||||
343 | |||||
344 | $bulk->add_action( | ||||
345 | create => { ...params... }, | ||||
346 | index => { ...params... }, | ||||
347 | update => { ...params... }, | ||||
348 | delete => { ...params... } | ||||
349 | ); | ||||
350 | |||||
351 | The C<add_action()> method allows you to add multiple C<create>, C<index>, | ||||
352 | C<update> and C<delete> actions to the queue. The first value is the action | ||||
353 | type, and the second value is the parameters that describe that action. | ||||
354 | See the individual helper methods below for details. | ||||
355 | |||||
356 | B<Note:> Parameters like C<index> or C<type> can be specified as C<index> or as | ||||
357 | C<_index>, so the following two lines are equivalent: | ||||
358 | |||||
359 | index => { index => 'index', type => 'type', id => 1, source => {...}}, | ||||
360 | index => { _index => 'index', _type => 'type', _id => 1, _source => {...}}, | ||||
361 | |||||
362 | B<Note:> The C<index> and C<type> parameters can be specified in the | ||||
363 | params for any action, but if not specified, will default to the C<index> | ||||
364 | and C<type> values specified in L</new()>. These are required parameters: | ||||
365 | they must be specified either in L</new()> or in every action. | ||||
366 | |||||
367 | =head2 C<create()> | ||||
368 | |||||
369 | $bulk->create( | ||||
370 | { index => 'custom_index', source => { doc body }}, | ||||
371 | { type => 'custom_type', id => 1, source => { doc body }}, | ||||
372 | ... | ||||
373 | ); | ||||
374 | |||||
375 | The C<create()> helper method allows you to add multiple C<create> actions. | ||||
376 | It accepts the same parameters as L<Search::Elasticsearch::Client::2_0::Direct/create()> | ||||
377 | except that the document body should be passed as the C<source> or C<_source> | ||||
378 | parameter, instead of as C<body>. | ||||
379 | |||||
380 | =head2 C<create_docs()> | ||||
381 | |||||
382 | $bulk->create_docs( | ||||
383 | { doc body }, | ||||
384 | { doc body }, | ||||
385 | ... | ||||
386 | ); | ||||
387 | |||||
388 | The C<create_docs()> helper is a shorter form of L</create()> which can be used | ||||
389 | when you are using the default C<index> and C<type> as set in L</new()> | ||||
390 | and you are not specifying a custom C<id> per document. In this case, | ||||
391 | you can just pass the individual document bodies. | ||||
392 | |||||
393 | =head2 C<index()> | ||||
394 | |||||
395 | $bulk->index( | ||||
396 | { index => 'custom_index', source => { doc body }}, | ||||
397 | { type => 'custom_type', id => 1, source => { doc body }}, | ||||
398 | ... | ||||
399 | ); | ||||
400 | |||||
401 | The C<index()> helper method allows you to add multiple C<index> actions. | ||||
402 | It accepts the same parameters as L<Search::Elasticsearch::Client::2_0::Direct/index()> | ||||
403 | except that the document body should be passed as the C<source> or C<_source> | ||||
404 | parameter, instead of as C<body>. | ||||
405 | |||||
406 | =head2 C<delete()> | ||||
407 | |||||
408 | $bulk->delete( | ||||
409 | { index => 'custom_index', id => 1}, | ||||
410 | { type => 'custom_type', id => 2}, | ||||
411 | ... | ||||
412 | ); | ||||
413 | |||||
414 | The C<delete()> helper method allows you to add multiple C<delete> actions. | ||||
415 | It accepts the same parameters as L<Search::Elasticsearch::Client::2_0::Direct/delete()>. | ||||
416 | |||||
417 | =head2 C<delete_ids()> | ||||
418 | |||||
419 | $bulk->delete_ids(1,2,3...) | ||||
420 | |||||
421 | The C<delete_ids()> helper method can be used when all of the documents you | ||||
422 | want to delete have the default C<index> and C<type> as set in L</new()>. | ||||
423 | In this case, all you have to do is to pass in a list of IDs. | ||||
424 | |||||
425 | =head2 C<update()> | ||||
426 | |||||
427 | $bulk->update( | ||||
428 | { id => 1, | ||||
429 | doc => { partial doc }, | ||||
430 | doc_as_upsert => 1 | ||||
431 | }, | ||||
432 | { id => 2, | ||||
433 | lang => 'mvel', | ||||
434 | script => { script } | ||||
435 | upsert => { upsert doc } | ||||
436 | }, | ||||
437 | ... | ||||
438 | ); | ||||
439 | |||||
440 | The C<update()> helper method allows you to add multiple C<update> actions. | ||||
441 | It accepts the same parameters as L<Search::Elasticsearch::Client::2_0::Direct/update()>. | ||||
442 | An update can either use a I<partial doc> which gets merged with an existing | ||||
443 | doc (example 1 above), or can use a C<script> to update an existing doc | ||||
444 | (example 2 above). More information on C<script> can be found here: L<Search::Elasticsearch::Client::2_0::Direct/update()>. | ||||
445 | |||||
446 | =head1 REINDEXING DOCUMENTS | ||||
447 | |||||
448 | A common use case for bulk indexing is to reindex a whole index when | ||||
449 | changing the type mappings or analysis chain. This typically | ||||
450 | combines bulk indexing with L<scrolled searches|Search::Elasticsearch::Scroll>: | ||||
451 | the scrolled search pulls all of the data from the source index, and | ||||
452 | the bulk indexer indexes the data into the new index. | ||||
453 | |||||
454 | =head2 C<reindex()> | ||||
455 | |||||
456 | $bulk->reindex( | ||||
457 | source => $source, # required | ||||
458 | transform => \&transform, # optional | ||||
459 | version_type => 'external|internal', # optional | ||||
460 | ); | ||||
461 | |||||
462 | The C<reindex()> method requires a C<$source> parameter, which provides | ||||
463 | the source for the documents which are to be reindexed. | ||||
464 | |||||
465 | =head2 Reindexing from another index | ||||
466 | |||||
467 | If the C<source> argument is a HASH ref, then the hash is passed to | ||||
468 | L<Search::Elasticsearch::Scroll/new()> to create a new scrolled search. | ||||
469 | |||||
470 | my $bulk = $es->bulk_helper( | ||||
471 | index => 'new_index', | ||||
472 | verbose => 1 | ||||
473 | ); | ||||
474 | |||||
475 | $bulk->reindex( | ||||
476 | source => { | ||||
477 | index => 'old_index', | ||||
478 | size => 500, # default | ||||
479 | search_type => 'scan' # default | ||||
480 | } | ||||
481 | ); | ||||
482 | |||||
483 | If a default C<index> or C<type> has been specified in the call to | ||||
484 | L</new()>, then it will replace the C<index> and C<type> values for | ||||
485 | the docs returned from the scrolled search. In the example above, | ||||
486 | all docs will be retrieved from C<"old_index"> and will be bulk indexed | ||||
487 | into C<"new_index">. | ||||
488 | |||||
489 | =head2 Reindexing from a generic source | ||||
490 | |||||
491 | The C<source> parameter also accepts a coderef or an anonymous sub, | ||||
492 | which should return one or more new documents every time it is executed. | ||||
493 | This allows you to pass any iterator, wrapped in an anonymous sub: | ||||
494 | |||||
495 | my $iter = get_iterator_from_somewhere(); | ||||
496 | |||||
497 | $bulk->reindex( | ||||
498 | source => sub { $iter->next } | ||||
499 | ); | ||||
500 | |||||
501 | =head2 Transforming docs on the fly | ||||
502 | |||||
503 | The C<transform> parameter allows you to change documents on the fly, | ||||
504 | using a callback. The callback receives the document as the only argument, | ||||
505 | and should return the updated document, or C<undef> if the document should | ||||
506 | not be indexed: | ||||
507 | |||||
508 | $bulk->reindex( | ||||
509 | source => { index => 'old_index' }, | ||||
510 | transform => sub { | ||||
511 | my $doc = shift; | ||||
512 | |||||
513 | # don't index doc marked as valid:false | ||||
514 | return undef unless $doc->{_source}{valid}; | ||||
515 | |||||
516 | # convert $tag to @tags | ||||
517 | $doc->{_source}{tags} = [ delete $doc->{_source}{tag}]; | ||||
518 | return $doc | ||||
519 | } | ||||
520 | ); | ||||
521 | |||||
522 | =head2 Reindexing from another cluster | ||||
523 | |||||
524 | By default, L</reindex()> expects the source and destination indices | ||||
525 | to be in the same cluster. To pull data from one cluster and index it into | ||||
526 | another, you can use two separate C<$es> objects: | ||||
527 | |||||
528 | $es_target = Search::Elasticsearch->new( nodes => 'localhost:9200' ); | ||||
529 | $es_source = Search::Elasticsearch->new( nodes => 'search1:9200' ); | ||||
530 | |||||
531 | my $bulk = $es_targert->bulk_helper( | ||||
532 | verbose => 1 | ||||
533 | ) | ||||
534 | -> reindex( | ||||
535 | source => { | ||||
536 | es => $es_source, | ||||
537 | index => 'my_index' | ||||
538 | } | ||||
539 | ); | ||||
540 | |||||
541 | =head2 Parents and routing | ||||
542 | |||||
543 | If you are using parent-child relationships or custom C<routing> values, | ||||
544 | and you want to preserve these when you reindex your documents, then | ||||
545 | you will need to request these values specifically, as follows: | ||||
546 | |||||
547 | $bulk->reindex( | ||||
548 | source => { | ||||
549 | index => 'old_index', | ||||
550 | fields => ['_source','_parent','_routing'] | ||||
551 | } | ||||
552 | ); | ||||
553 | |||||
554 | =head2 Working with version numbers | ||||
555 | |||||
556 | Every document in Elasticsearch has a current C<version> number, which | ||||
557 | is used for L<optimistic concurrency control|http://en.wikipedia.org/wiki/Optimistic_concurrency_control>, | ||||
558 | that is, to ensure that you don't overwrite changes that have been made | ||||
559 | by another process. | ||||
560 | |||||
561 | All CRUD operations accept a C<version> parameter and a C<version_type> | ||||
562 | parameter which tells Elasticsearch that the change should only be made | ||||
563 | if the current document corresponds to these parameters. The | ||||
564 | C<version_type> parameter can have the following values: | ||||
565 | |||||
566 | =over | ||||
567 | |||||
568 | =item * C<internal> | ||||
569 | |||||
570 | Use Elasticsearch version numbers. Documents are only changed if the | ||||
571 | document in Elasticsearch has the B<same> C<version> number that is | ||||
572 | specified in the CRUD operation. After the change, the new | ||||
573 | version number is C<version+1>. | ||||
574 | |||||
575 | =item * C<external> | ||||
576 | |||||
577 | Use an external versioning system, such as timestamps or version numbers | ||||
578 | from an external database. Documents are only changed if the document | ||||
579 | in Elasticsearch has a B<lower> C<version> number than the one | ||||
580 | specified in the CRUD operation. After the change, the new version | ||||
581 | number is C<version>. | ||||
582 | |||||
583 | =back | ||||
584 | |||||
585 | If you would like to reindex documents from one index to another, preserving | ||||
586 | the C<version> numbers from the original index, then you need the following: | ||||
587 | |||||
588 | $bulk->reindex( | ||||
589 | source => { | ||||
590 | index => 'old_index', | ||||
591 | version => 1, # retrieve version numbers in search | ||||
592 | }, | ||||
593 | version_type => 'external' # use these "external" version numbers | ||||
594 | ); | ||||
595 | |||||
596 | =head1 AUTHOR | ||||
597 | |||||
598 | Clinton Gormley <drtech@cpan.org> | ||||
599 | |||||
600 | =head1 COPYRIGHT AND LICENSE | ||||
601 | |||||
602 | This software is Copyright (c) 2016 by Elasticsearch BV. | ||||
603 | |||||
604 | This is free software, licensed under: | ||||
605 | |||||
606 | The Apache License, Version 2.0, January 2004 | ||||
607 | |||||
608 | =cut | ||||
609 | |||||
610 | 1 | 12µs | 1 | 5.87ms | __END__ # spent 5.87ms making 1 call to B::Hooks::EndOfScope::XS::__ANON__[/opt/flows/lib/lib/perl5/B/Hooks/EndOfScope/XS.pm:17] |