← Index
NYTProf Performance Profile   « line view »
For flows_to_es.pl
  Run on Mon May 9 23:27:59 2016
Reported on Mon May 9 23:28:09 2016

Filename/opt/flows/lib/lib/perl5/Search/Elasticsearch/Bulk.pm
StatementsExecuted 16 statements in 1.07ms
Subroutines
Calls P F Exclusive
Time
Inclusive
Time
Subroutine
11118µs351µsSearch::Elasticsearch::Bulk::::BEGIN@3Search::Elasticsearch::Bulk::BEGIN@3
11112µs14µsSearch::Elasticsearch::Bulk::::flushSearch::Elasticsearch::Bulk::flush
11111µs49µsSearch::Elasticsearch::Bulk::::BEGIN@7Search::Elasticsearch::Bulk::BEGIN@7
11110µs235µsSearch::Elasticsearch::Bulk::::BEGIN@6Search::Elasticsearch::Bulk::BEGIN@6
11110µs255µsSearch::Elasticsearch::Bulk::::BEGIN@8Search::Elasticsearch::Bulk::BEGIN@8
1117µs48µsSearch::Elasticsearch::Bulk::::__ANON__[:3]Search::Elasticsearch::Bulk::__ANON__[:3]
0000s0sSearch::Elasticsearch::Bulk::::__ANON__[:56]Search::Elasticsearch::Bulk::__ANON__[:56]
0000s0sSearch::Elasticsearch::Bulk::::__ANON__[:63]Search::Elasticsearch::Bulk::__ANON__[:63]
0000s0sSearch::Elasticsearch::Bulk::::__ANON__[:89]Search::Elasticsearch::Bulk::__ANON__[:89]
0000s0sSearch::Elasticsearch::Bulk::::add_actionSearch::Elasticsearch::Bulk::add_action
0000s0sSearch::Elasticsearch::Bulk::::reindexSearch::Elasticsearch::Bulk::reindex
Call graph for these subroutines as a Graphviz dot language file.
Line State
ments
Time
on line
Calls Time
in subs
Code
1package Search::Elasticsearch::Bulk;
21400ns$Search::Elasticsearch::Bulk::VERSION = '2.02';
3459µs3726µs
# spent 48µs (7+41) within Search::Elasticsearch::Bulk::__ANON__[/opt/flows/lib/lib/perl5/Search/Elasticsearch/Bulk.pm:3] which was called: # once (7µs+41µs) by import::into at line 34 of Import/Into.pm # spent 351µs (18+334) within Search::Elasticsearch::Bulk::BEGIN@3 which was called: # once (18µs+334µs) by Module::Runtime::require_module at line 3
use Moo;
# spent 351µs making 1 call to Search::Elasticsearch::Bulk::BEGIN@3 # spent 334µs making 1 call to Moo::import # spent 41µs making 1 call to strictures::import
411µs15.89mswith 'Search::Elasticsearch::Role::Bulk',
# spent 5.89ms making 1 call to Moo::with
5 'Search::Elasticsearch::Role::Is_Sync';
6238µs2459µs
# spent 235µs (10+224) within Search::Elasticsearch::Bulk::BEGIN@6 which was called: # once (10µs+224µs) by Module::Runtime::require_module at line 6
use Search::Elasticsearch::Util qw(parse_params throw);
7231µs287µs
# spent 49µs (11+38) within Search::Elasticsearch::Bulk::BEGIN@7 which was called: # once (11µs+38µs) by Module::Runtime::require_module at line 7
use Try::Tiny;
# spent 49µs making 1 call to Search::Elasticsearch::Bulk::BEGIN@7 # spent 38µs making 1 call to Exporter::import
82911µs2501µs
# spent 255µs (10+246) within Search::Elasticsearch::Bulk::BEGIN@8 which was called: # once (10µs+246µs) by Module::Runtime::require_module at line 8
use namespace::clean;
# spent 255µs making 1 call to Search::Elasticsearch::Bulk::BEGIN@8 # spent 246µs making 1 call to namespace::clean::import
9
10#===================================
11sub add_action {
12#===================================
13 my $self = shift;
14 my $buffer = $self->_buffer;
15 my $max_size = $self->max_size;
16 my $max_count = $self->max_count;
17 my $max_time = $self->max_time;
18
19 while (@_) {
20 my @json = $self->_encode_action( splice( @_, 0, 2 ) );
21
22 push @$buffer, @json;
23
24 my $size = $self->_buffer_size;
25 $size += length($_) + 1 for @json;
26 $self->_buffer_size($size);
27
28 my $count = $self->_buffer_count( $self->_buffer_count + 1 );
29
30 $self->flush
31 if ( $max_size and $size >= $max_size )
32 || ( $max_count and $count >= $max_count )
33 || ( $max_time and time >= $self->_last_flush + $max_time );
34 }
35 return 1;
36}
37
38#===================================
39
# spent 14µs (12+2) within Search::Elasticsearch::Bulk::flush which was called: # once (12µs+2µs) by main::RUNTIME at line 116 of flows_to_es.pl
sub flush {
40#===================================
411300ns my $self = shift;
4217µs11µs $self->_last_flush(time);
# spent 1µs making 1 call to Search::Elasticsearch::Role::Bulk::_last_flush
43
4418µs1600ns return { items => [] }
# spent 600ns making 1 call to Search::Elasticsearch::Role::Bulk::_buffer_size
45 unless $self->_buffer_size;
46
47 if ( $self->verbose ) {
48 local $| = 1;
49 print ".";
50 }
51 my $buffer = $self->_buffer;
52 my $results = try {
53 my $res = $self->es->bulk( %{ $self->_bulk_args }, body => $buffer );
54 $self->clear_buffer;
55 return $res;
56 }
57 catch {
58 my $error = $_;
59 $self->clear_buffer
60 unless $error->is( 'Cxn', 'NoNodes' );
61
62 die $error;
63 };
64 $self->_report( $buffer, $results );
65 return defined wantarray ? $results : undef;
66}
67
68#===================================
69sub reindex {
70#===================================
71 my ( $self, $params ) = parse_params(@_);
72 my $src = $params->{source}
73 or throw( 'Param', "Missing required param <source>" );
74
75 my $transform = $self->_doc_transformer($params);
76
77 if ( ref $src eq 'HASH' ) {
78 $src = {%$src};
79 my $es = delete $src->{es} || $self->es;
80 my $scroll = $es->scroll_helper(
81 search_type => 'scan',
82 size => 500,
83 %$src
84 );
85
86 $src = sub {
87 $scroll->refill_buffer;
88 $scroll->drain_buffer;
89 };
90
91 print "Reindexing " . $scroll->total . " docs\n"
92 if $self->verbose;
93 }
94
95 while ( my @docs = grep {defined} $src->() ) {
96 $self->index( grep {$_} map { $transform->($_) } @docs );
97 }
98 $self->flush;
99 return 1;
100}
101
10215µs1;
103
104=pod
105
106=encoding UTF-8
107
108=head1 NAME
109
110Search::Elasticsearch::Bulk - A helper module for the Bulk API and for reindexing
111
112=head1 VERSION
113
114version 2.02
115
116=head1 SYNOPSIS
117
118 use Search::Elasticsearch;
119
120 my $es = Search::Elasticsearch->new;
121 my $bulk = $es->bulk_helper(
122 index => 'my_index',
123 type => 'my_type'
124 );
125
126 # Index docs:
127 $bulk->index({ id => 1, source => { foo => 'bar' }});
128 $bulk->add_action( index => { id => 1, source => { foo=> 'bar' }});
129
130 # Create docs:
131 $bulk->create({ id => 1, source => { foo => 'bar' }});
132 $bulk->add_action( create => { id => 1, source => { foo=> 'bar' }});
133 $bulk->create_docs({ foo => 'bar' })
134
135 # Delete docs:
136 $bulk->delete({ id => 1});
137 $bulk->add_action( delete => { id => 1 });
138 $bulk->delete_ids(1,2,3)
139
140 # Update docs:
141 $bulk->update({ id => 1, script => '...' });
142 $bulk->add_action( update => { id => 1, script => '...' });
143
144 # Manual flush
145 $bulk->flush;
146
147 # Reindex docs:
148 my $bulk = $es->bulk_helper(
149 index => 'new_index',
150 verbose => 1
151 );
152
153 $bulk->reindex( source => { index => 'old_index' });
154
155=head1 DESCRIPTION
156
157This module provides a wrapper for the L<Search::Elasticsearch::Client::2_0::Direct/bulk()>
158method which makes it easier to run multiple create, index, update or delete
159actions in a single request. It also provides a simple interface
160for L<reindexing documents|/REINDEXING DOCUMENTS>.
161
162The L<Search::Elasticsearch::Bulk> module acts as a queue, buffering up actions
163until it reaches a maximum count of actions, or a maximum size of JSON request
164body, at which point it issues a C<bulk()> request.
165
166Once you have finished adding actions, call L</flush()> to force the final
167C<bulk()> request on the items left in the queue.
168
169This class does L<Search::Elasticsearch::Role::Bulk> and
170L<Search::Elasticsearch::Role::Is_Sync>.
171
172=head1 CREATING A NEW INSTANCE
173
174=head2 C<new()>
175
176 my $bulk = $es->bulk_helper(
177
178 index => 'default_index', # optional
179 type => 'default_type', # optional
180 %other_bulk_params # optional
181
182 max_count => 1_000, # optional
183 max_size => 1_000_000, # optional
184 max_time => 5, # optional
185
186 verbose => 0 | 1, # optional
187
188 on_success => sub {...}, # optional
189 on_error => sub {...}, # optional
190 on_conflict => sub {...}, # optional
191
192
193 );
194
195The C<new()> method returns a new C<$bulk> object. You must pass your
196Search::Elasticsearch client as the C<es> argument.
197
198The C<index> and C<type> parameters provide default values for
199C<index> and C<type>, which can be overridden in each action.
200You can also pass any other values which are accepted
201by the L<bulk()|Search::Elasticsearch::Client::2_0::Direct/bulk()> method.
202
203See L</flush()> for more information about the other parameters.
204
205=head1 FLUSHING THE BUFFER
206
207=head2 C<flush()>
208
209 $result = $bulk->flush;
210
211The C<flush()> method sends all buffered actions to Elasticsearch using
212a L<bulk()|Search::Elasticsearch::Client::2_0::Direct/bulk()> request.
213
214=head2 Auto-flushing
215
216An automatic L</flush()> is triggered whenever the C<max_count>, C<max_size>,
217or C<max_time> threshold is breached. This causes all actions in the buffer to be
218sent to Elasticsearch.
219
220=over
221
222=item * C<max_count>
223
224The maximum number of actions to allow before triggering a L</flush()>.
225This can be disabled by setting C<max_count> to C<0>. Defaults to
226C<1,000>.
227
228=item * C<max_size>
229
230The maximum size of JSON request body to allow before triggering a
231L</flush()>. This can be disabled by setting C<max_size> to C<0>. Defaults
232to C<1_000,000> bytes.
233
234=item * C<max_time>
235
236The maximum number of seconds to wait before triggering a flush. Defaults
237to C<0> seconds, which means that it is disabled. B<Note:> This timeout
238is only triggered when new items are added to the queue, not in the background.
239
240=back
241
242=head2 Errors when flushing
243
244There are two types of error which can be thrown when L</flush()>
245is called, either manually or automatically.
246
247=over
248
249=item * Temporary Elasticsearch errors
250
251A C<Cxn> error like a C<NoNodes> error which indicates that your cluster is down.
252These errors do not clear the buffer, as they can be retried later on.
253
254=item * Action errors
255
256Individual actions may fail. For instance, a C<create> action will fail
257if a document with the same C<index>, C<type> and C<id> already exists.
258These action errors are reported via L<callbacks|/Using callbacks>.
259
260=back
261
262=head2 Using callbacks
263
264By default, any I<Action errors> (see above) cause warnings to be
265written to C<STDERR>. However, you can use the C<on_error>, C<on_conflict>
266and C<on_success> callbacks for more fine-grained control.
267
268All callbacks receive the following arguments:
269
270=over
271
272=item C<$action>
273
274The name of the action, ie C<index>, C<create>, C<update> or C<delete>.
275
276=item C<$response>
277
278The response that Elasticsearch returned for this action.
279
280=item C<$i>
281
282The index of the action, ie the first action in the flush request
283will have C<$i> set to C<0>, the second will have C<$i> set to C<1> etc.
284
285=back
286
287=head3 C<on_success>
288
289 my $bulk = $es->bulk_helper(
290 on_success => sub {
291 my ($action,$response,$i) = @_;
292 # do something
293 },
294 );
295
296The C<on_success> callback is called for every action that has a successful
297response.
298
299=head3 C<on_conflict>
300
301 my $bulk = $es->bulk_helper(
302 on_conflict => sub {
303 my ($action,$response,$i,$version) = @_;
304 # do something
305 },
306 );
307
308The C<on_conflict> callback is called for actions that have triggered
309a C<Conflict> error, eg trying to C<create> a document which already
310exists. The C<$version> argument will contain the version number
311of the document currently stored in Elasticsearch (if found).
312
313=head3 C<on_error>
314
315 my $bulk = $es->bulk_helper(
316 on_error => sub {
317 my ($action,$response,$i) = @_;
318 # do something
319 },
320 );
321
322The C<on_error> callback is called for any error (unless the C<on_conflict>)
323callback has already been called).
324
325=head2 Disabling callbacks and autoflush
326
327If you want to be in control of flushing, and you just want to receive
328the raw response that Elasticsearch sends instead of using callbacks,
329then you can do so as follows:
330
331 my $bulk = $es->bulk_helper(
332 max_count => 0,
333 max_size => 0,
334 on_error => undef
335 );
336
337 $bulk->add_actions(....);
338 $response = $bulk->flush;
339
340=head1 CREATE, INDEX, UPDATE, DELETE
341
342=head2 C<add_action()>
343
344 $bulk->add_action(
345 create => { ...params... },
346 index => { ...params... },
347 update => { ...params... },
348 delete => { ...params... }
349 );
350
351The C<add_action()> method allows you to add multiple C<create>, C<index>,
352C<update> and C<delete> actions to the queue. The first value is the action
353type, and the second value is the parameters that describe that action.
354See the individual helper methods below for details.
355
356B<Note:> Parameters like C<index> or C<type> can be specified as C<index> or as
357C<_index>, so the following two lines are equivalent:
358
359 index => { index => 'index', type => 'type', id => 1, source => {...}},
360 index => { _index => 'index', _type => 'type', _id => 1, _source => {...}},
361
362B<Note:> The C<index> and C<type> parameters can be specified in the
363params for any action, but if not specified, will default to the C<index>
364and C<type> values specified in L</new()>. These are required parameters:
365they must be specified either in L</new()> or in every action.
366
367=head2 C<create()>
368
369 $bulk->create(
370 { index => 'custom_index', source => { doc body }},
371 { type => 'custom_type', id => 1, source => { doc body }},
372 ...
373 );
374
375The C<create()> helper method allows you to add multiple C<create> actions.
376It accepts the same parameters as L<Search::Elasticsearch::Client::2_0::Direct/create()>
377except that the document body should be passed as the C<source> or C<_source>
378parameter, instead of as C<body>.
379
380=head2 C<create_docs()>
381
382 $bulk->create_docs(
383 { doc body },
384 { doc body },
385 ...
386 );
387
388The C<create_docs()> helper is a shorter form of L</create()> which can be used
389when you are using the default C<index> and C<type> as set in L</new()>
390and you are not specifying a custom C<id> per document. In this case,
391you can just pass the individual document bodies.
392
393=head2 C<index()>
394
395 $bulk->index(
396 { index => 'custom_index', source => { doc body }},
397 { type => 'custom_type', id => 1, source => { doc body }},
398 ...
399 );
400
401The C<index()> helper method allows you to add multiple C<index> actions.
402It accepts the same parameters as L<Search::Elasticsearch::Client::2_0::Direct/index()>
403except that the document body should be passed as the C<source> or C<_source>
404parameter, instead of as C<body>.
405
406=head2 C<delete()>
407
408 $bulk->delete(
409 { index => 'custom_index', id => 1},
410 { type => 'custom_type', id => 2},
411 ...
412 );
413
414The C<delete()> helper method allows you to add multiple C<delete> actions.
415It accepts the same parameters as L<Search::Elasticsearch::Client::2_0::Direct/delete()>.
416
417=head2 C<delete_ids()>
418
419 $bulk->delete_ids(1,2,3...)
420
421The C<delete_ids()> helper method can be used when all of the documents you
422want to delete have the default C<index> and C<type> as set in L</new()>.
423In this case, all you have to do is to pass in a list of IDs.
424
425=head2 C<update()>
426
427 $bulk->update(
428 { id => 1,
429 doc => { partial doc },
430 doc_as_upsert => 1
431 },
432 { id => 2,
433 lang => 'mvel',
434 script => { script }
435 upsert => { upsert doc }
436 },
437 ...
438 );
439
440The C<update()> helper method allows you to add multiple C<update> actions.
441It accepts the same parameters as L<Search::Elasticsearch::Client::2_0::Direct/update()>.
442An update can either use a I<partial doc> which gets merged with an existing
443doc (example 1 above), or can use a C<script> to update an existing doc
444(example 2 above). More information on C<script> can be found here: L<Search::Elasticsearch::Client::2_0::Direct/update()>.
445
446=head1 REINDEXING DOCUMENTS
447
448A common use case for bulk indexing is to reindex a whole index when
449changing the type mappings or analysis chain. This typically
450combines bulk indexing with L<scrolled searches|Search::Elasticsearch::Scroll>:
451the scrolled search pulls all of the data from the source index, and
452the bulk indexer indexes the data into the new index.
453
454=head2 C<reindex()>
455
456 $bulk->reindex(
457 source => $source, # required
458 transform => \&transform, # optional
459 version_type => 'external|internal', # optional
460 );
461
462The C<reindex()> method requires a C<$source> parameter, which provides
463the source for the documents which are to be reindexed.
464
465=head2 Reindexing from another index
466
467If the C<source> argument is a HASH ref, then the hash is passed to
468L<Search::Elasticsearch::Scroll/new()> to create a new scrolled search.
469
470 my $bulk = $es->bulk_helper(
471 index => 'new_index',
472 verbose => 1
473 );
474
475 $bulk->reindex(
476 source => {
477 index => 'old_index',
478 size => 500, # default
479 search_type => 'scan' # default
480 }
481 );
482
483If a default C<index> or C<type> has been specified in the call to
484L</new()>, then it will replace the C<index> and C<type> values for
485the docs returned from the scrolled search. In the example above,
486all docs will be retrieved from C<"old_index"> and will be bulk indexed
487into C<"new_index">.
488
489=head2 Reindexing from a generic source
490
491The C<source> parameter also accepts a coderef or an anonymous sub,
492which should return one or more new documents every time it is executed.
493This allows you to pass any iterator, wrapped in an anonymous sub:
494
495 my $iter = get_iterator_from_somewhere();
496
497 $bulk->reindex(
498 source => sub { $iter->next }
499 );
500
501=head2 Transforming docs on the fly
502
503The C<transform> parameter allows you to change documents on the fly,
504using a callback. The callback receives the document as the only argument,
505and should return the updated document, or C<undef> if the document should
506not be indexed:
507
508 $bulk->reindex(
509 source => { index => 'old_index' },
510 transform => sub {
511 my $doc = shift;
512
513 # don't index doc marked as valid:false
514 return undef unless $doc->{_source}{valid};
515
516 # convert $tag to @tags
517 $doc->{_source}{tags} = [ delete $doc->{_source}{tag}];
518 return $doc
519 }
520 );
521
522=head2 Reindexing from another cluster
523
524By default, L</reindex()> expects the source and destination indices
525to be in the same cluster. To pull data from one cluster and index it into
526another, you can use two separate C<$es> objects:
527
528 $es_target = Search::Elasticsearch->new( nodes => 'localhost:9200' );
529 $es_source = Search::Elasticsearch->new( nodes => 'search1:9200' );
530
531 my $bulk = $es_targert->bulk_helper(
532 verbose => 1
533 )
534 -> reindex(
535 source => {
536 es => $es_source,
537 index => 'my_index'
538 }
539 );
540
541=head2 Parents and routing
542
543If you are using parent-child relationships or custom C<routing> values,
544and you want to preserve these when you reindex your documents, then
545you will need to request these values specifically, as follows:
546
547 $bulk->reindex(
548 source => {
549 index => 'old_index',
550 fields => ['_source','_parent','_routing']
551 }
552 );
553
554=head2 Working with version numbers
555
556Every document in Elasticsearch has a current C<version> number, which
557is used for L<optimistic concurrency control|http://en.wikipedia.org/wiki/Optimistic_concurrency_control>,
558that is, to ensure that you don't overwrite changes that have been made
559by another process.
560
561All CRUD operations accept a C<version> parameter and a C<version_type>
562parameter which tells Elasticsearch that the change should only be made
563if the current document corresponds to these parameters. The
564C<version_type> parameter can have the following values:
565
566=over
567
568=item * C<internal>
569
570Use Elasticsearch version numbers. Documents are only changed if the
571document in Elasticsearch has the B<same> C<version> number that is
572specified in the CRUD operation. After the change, the new
573version number is C<version+1>.
574
575=item * C<external>
576
577Use an external versioning system, such as timestamps or version numbers
578from an external database. Documents are only changed if the document
579in Elasticsearch has a B<lower> C<version> number than the one
580specified in the CRUD operation. After the change, the new version
581number is C<version>.
582
583=back
584
585If you would like to reindex documents from one index to another, preserving
586the C<version> numbers from the original index, then you need the following:
587
588 $bulk->reindex(
589 source => {
590 index => 'old_index',
591 version => 1, # retrieve version numbers in search
592 },
593 version_type => 'external' # use these "external" version numbers
594 );
595
596=head1 AUTHOR
597
598Clinton Gormley <drtech@cpan.org>
599
600=head1 COPYRIGHT AND LICENSE
601
602This software is Copyright (c) 2016 by Elasticsearch BV.
603
604This is free software, licensed under:
605
606 The Apache License, Version 2.0, January 2004
607
608=cut
609
610112µs15.87ms__END__