← Index
NYTProf Performance Profile   « line view »
For flows_to_es.pl
  Run on Mon May 9 23:27:59 2016
Reported on Mon May 9 23:28:09 2016

Filename/opt/flows/lib/lib/perl5/Search/Elasticsearch/Role/Bulk.pm
StatementsExecuted 43 statements in 1.86ms
Subroutines
Calls P F Exclusive
Time
Inclusive
Time
Subroutine
11117µs341µsSearch::Elasticsearch::Role::Bulk::::BEGIN@3Search::Elasticsearch::Role::Bulk::BEGIN@3
11112µs18µsSearch::Elasticsearch::Role::Bulk::::BUILDARGSSearch::Elasticsearch::Role::Bulk::BUILDARGS
1119µs219µsSearch::Elasticsearch::Role::Bulk::::BEGIN@6Search::Elasticsearch::Role::Bulk::BEGIN@6
1118µs228µsSearch::Elasticsearch::Role::Bulk::::BEGIN@7Search::Elasticsearch::Role::Bulk::BEGIN@7
1116µs42µsSearch::Elasticsearch::Role::Bulk::::__ANON__[:3]Search::Elasticsearch::Role::Bulk::__ANON__[:3]
1112µs2µsSearch::Elasticsearch::Role::Bulk::::__ANON__[:24]Search::Elasticsearch::Role::Bulk::__ANON__[:24]
1112µs2µsSearch::Elasticsearch::Role::Bulk::::__ANON__[:19]Search::Elasticsearch::Role::Bulk::__ANON__[:19]
1111µs1µsSearch::Elasticsearch::Role::Bulk::::_last_flushSearch::Elasticsearch::Role::Bulk::_last_flush (xsub)
111600ns600nsSearch::Elasticsearch::Role::Bulk::::_buffer_sizeSearch::Elasticsearch::Role::Bulk::_buffer_size (xsub)
0000s0sSearch::Elasticsearch::Role::Bulk::::__ANON__[:268]Search::Elasticsearch::Role::Bulk::__ANON__[:268]
0000s0sSearch::Elasticsearch::Role::Bulk::::__ANON__[:51]Search::Elasticsearch::Role::Bulk::__ANON__[:51]
0000s0sSearch::Elasticsearch::Role::Bulk::::_build__serializerSearch::Elasticsearch::Role::Bulk::_build__serializer
0000s0sSearch::Elasticsearch::Role::Bulk::::_build_on_errorSearch::Elasticsearch::Role::Bulk::_build_on_error
0000s0sSearch::Elasticsearch::Role::Bulk::::_doc_transformerSearch::Elasticsearch::Role::Bulk::_doc_transformer
0000s0sSearch::Elasticsearch::Role::Bulk::::_encode_actionSearch::Elasticsearch::Role::Bulk::_encode_action
0000s0sSearch::Elasticsearch::Role::Bulk::::_is_conflict_errorSearch::Elasticsearch::Role::Bulk::_is_conflict_error
0000s0sSearch::Elasticsearch::Role::Bulk::::_reportSearch::Elasticsearch::Role::Bulk::_report
0000s0sSearch::Elasticsearch::Role::Bulk::::clear_bufferSearch::Elasticsearch::Role::Bulk::clear_buffer
0000s0sSearch::Elasticsearch::Role::Bulk::::createSearch::Elasticsearch::Role::Bulk::create
0000s0sSearch::Elasticsearch::Role::Bulk::::create_docsSearch::Elasticsearch::Role::Bulk::create_docs
0000s0sSearch::Elasticsearch::Role::Bulk::::deleteSearch::Elasticsearch::Role::Bulk::delete
0000s0sSearch::Elasticsearch::Role::Bulk::::delete_idsSearch::Elasticsearch::Role::Bulk::delete_ids
0000s0sSearch::Elasticsearch::Role::Bulk::::indexSearch::Elasticsearch::Role::Bulk::index
0000s0sSearch::Elasticsearch::Role::Bulk::::updateSearch::Elasticsearch::Role::Bulk::update
Call graph for these subroutines as a Graphviz dot language file.
Line State
ments
Time
on line
Calls Time
in subs
Code
1package Search::Elasticsearch::Role::Bulk;
21600ns$Search::Elasticsearch::Role::Bulk::VERSION = '2.02';
3458µs3700µs
# spent 341µs (17+324) within Search::Elasticsearch::Role::Bulk::BEGIN@3 which was called: # once (17µs+324µs) by Module::Runtime::require_module at line 3 # spent 42µs (6+36) within Search::Elasticsearch::Role::Bulk::__ANON__[/opt/flows/lib/lib/perl5/Search/Elasticsearch/Role/Bulk.pm:3] which was called: # once (6µs+36µs) by import::into at line 34 of Import/Into.pm
use Moo::Role;
# spent 341µs making 1 call to Search::Elasticsearch::Role::Bulk::BEGIN@3 # spent 324µs making 1 call to Moo::Role::import # spent 36µs making 1 call to strictures::import
412µs110µsrequires 'add_action', 'flush';
# spent 10µs making 1 call to Moo::Role::requires
5
6236µs2429µs
# spent 219µs (9+210) within Search::Elasticsearch::Role::Bulk::BEGIN@6 which was called: # once (9µs+210µs) by Module::Runtime::require_module at line 6
use Search::Elasticsearch::Util qw(parse_params throw);
721.67ms2448µs
# spent 228µs (8+220) within Search::Elasticsearch::Role::Bulk::BEGIN@7 which was called: # once (8µs+220µs) by Module::Runtime::require_module at line 7
use namespace::clean;
# spent 228µs making 1 call to Search::Elasticsearch::Role::Bulk::BEGIN@7 # spent 220µs making 1 call to namespace::clean::import
8
911µs1158µshas 'es' => ( is => 'ro', required => 1 );
# spent 158µs making 1 call to Moo::Role::has
1011µs1131µshas 'max_count' => ( is => 'rw', default => 1_000 );
# spent 131µs making 1 call to Moo::Role::has
1111µs1118µshas 'max_size' => ( is => 'rw', default => 1_000_000 );
# spent 118µs making 1 call to Moo::Role::has
1211µs1117µshas 'max_time' => ( is => 'rw', default => 0 );
# spent 117µs making 1 call to Moo::Role::has
13
1411µs1111µshas 'on_success' => ( is => 'ro', default => 0 );
# spent 111µs making 1 call to Moo::Role::has
1511µs1171µshas 'on_error' => ( is => 'lazy' );
# spent 171µs making 1 call to Moo::Role::has
1611µs1118µshas 'on_conflict' => ( is => 'ro', default => 0 );
# spent 118µs making 1 call to Moo::Role::has
1711µs1120µshas 'verbose' => ( is => 'rw' );
# spent 120µs making 1 call to Moo::Role::has
18
1927µs1127µs
# spent 2µs within Search::Elasticsearch::Role::Bulk::__ANON__[/opt/flows/lib/lib/perl5/Search/Elasticsearch/Role/Bulk.pm:19] which was called: # once (2µs+0s) by Search::Elasticsearch::Bulk::new at line 30 of (eval 113)[Sub/Quote.pm:5]
has '_buffer' => ( is => 'ro', default => sub { [] } );
# spent 127µs making 1 call to Moo::Role::has
2012µs1121µshas '_buffer_size' => ( is => 'rw', default => 0 );
# spent 121µs making 1 call to Moo::Role::has
2111µs1118µshas '_buffer_count' => ( is => 'rw', default => 0 );
# spent 118µs making 1 call to Moo::Role::has
2211µs1159µshas '_serializer' => ( is => 'lazy' );
# spent 159µs making 1 call to Moo::Role::has
2311µs1114µshas '_bulk_args' => ( is => 'ro' );
# spent 114µs making 1 call to Moo::Role::has
2427µs1132µs
# spent 2µs within Search::Elasticsearch::Role::Bulk::__ANON__[/opt/flows/lib/lib/perl5/Search/Elasticsearch/Role/Bulk.pm:24] which was called: # once (2µs+0s) by Search::Elasticsearch::Bulk::new at line 48 of (eval 113)[Sub/Quote.pm:5]
has '_last_flush' => ( is => 'rw', default => sub {time} );
# spent 132µs making 1 call to Moo::Role::has
25
2612µsour %Actions = (
27 'index' => 1,
28 'create' => 1,
29 'update' => 1,
30 'delete' => 1
31);
32
3312µsour @Metadata_Keys = (
34 'index', 'type', 'id', 'fields',
35 'routing', 'parent', 'timestamp', 'ttl',
36 'version', 'version_type'
37);
38
39#===================================
40sub _build__serializer { shift->es->transport->serializer }
41#===================================
42
43#===================================
44sub _build_on_error {
45#===================================
46 my $self = shift;
47 my $serializer = $self->_serializer;
48 return sub {
49 my ( $action, $result, $src ) = @_;
50 warn( "Bulk error [$action]: " . $serializer->encode($result) );
51 };
52}
53
54#===================================
55
# spent 18µs (12+6) within Search::Elasticsearch::Role::Bulk::BUILDARGS which was called: # once (12µs+6µs) by Search::Elasticsearch::Bulk::new at line 24 of (eval 113)[Sub/Quote.pm:5]
sub BUILDARGS {
56#===================================
5712µs16µs my ( $class, $params ) = parse_params(@_);
# spent 6µs making 1 call to Search::Elasticsearch::Util::parse_params
581200ns my %args;
5911µs for (qw(index type consistency fields refresh replication routing timeout))
60 {
6184µs $args{$_} = $params->{$_}
62 if exists $params->{$_};
63 }
641600ns $params->{_bulk_args} = \%args;
6514µs return $params;
66}
67
68#===================================
69sub index {
70#===================================
71 shift->add_action( map { ( 'index' => $_ ) } @_ );
72}
73
74#===================================
75sub create {
76#===================================
77 shift->add_action( map { ( 'create' => $_ ) } @_ );
78}
79
80#===================================
81sub delete {
82#===================================
83 shift->add_action( map { ( 'delete' => $_ ) } @_ );
84}
85
86#===================================
87sub update {
88#===================================
89 shift->add_action( map { ( 'update' => $_ ) } @_ );
90}
91
92#===================================
93sub create_docs {
94#===================================
95 my $self = shift;
96 $self->add_action( map { ( 'create' => { _source => $_ } ) } @_ );
97}
98
99#===================================
100sub delete_ids {
101#===================================
102 my $self = shift;
103 $self->add_action( map { ( 'delete' => { _id => $_ } ) } @_ );
104}
105
10612µsour @Update_Params = qw(
107 doc upsert doc_as_upsert fields scripted_upsert
108 script script_id script_file
109 params lang detect_noop
110);
111
112#===================================
113sub _encode_action {
114#===================================
115 my $self = shift;
116 my $action = shift || '';
117 my $orig = shift;
118
119 throw( 'Param', "Unrecognised action <$action>" )
120 unless $Actions{$action};
121
122 throw( 'Param', "Missing <params> for action <$action>" )
123 unless ref($orig) eq 'HASH';
124
125 my %metadata;
126 my $params = {%$orig};
127 my $serializer = $self->_serializer;
128
129 for (@Metadata_Keys) {
130 my $val
131 = exists $params->{$_} ? delete $params->{$_}
132 : exists $params->{"_$_"} ? delete $params->{"_$_"}
133 : next;
134 $metadata{"_$_"} = $val;
135 }
136
137 throw( 'Param', "Missing required param <index>" )
138 unless $metadata{_index} || $self->_bulk_args->{index};
139 throw( 'Param', "Missing required param <type>" )
140 unless $metadata{_type} || $self->_bulk_args->{type};
141
142 my $source;
143 if ( $action eq 'update' ) {
144 for (@Update_Params) {
145 $source->{$_} = delete $params->{$_}
146 if exists $params->{$_};
147 }
148 }
149 elsif ( $action ne 'delete' ) {
150 $source
151 = delete $params->{_source}
152 || delete $params->{source}
153 || throw(
154 'Param',
155 "Missing <source> for action <$action>: "
156 . $serializer->encode($orig)
157 );
158 }
159
160 throw( "Unknown params <"
161 . ( join ',', sort keys %$params )
162 . "> in <$action>: "
163 . $serializer->encode($orig) )
164 if keys %$params;
165
166 return map { $serializer->encode($_) }
167 grep {$_} ( { $action => \%metadata }, $source );
168}
169
170#===================================
171sub _report {
172#===================================
173 my ( $self, $buffer, $results ) = @_;
174 my $on_success = $self->on_success;
175 my $on_error = $self->on_error;
176 my $on_conflict = $self->on_conflict;
177
178 # assume errors if key not present, bwc
179 $results->{errors} = 1 unless exists $results->{errors};
180
181 return
182 unless $on_success
183 || ( $results->{errors} and $on_error || $on_conflict );
184
185 my $serializer = $self->_serializer;
186
187 my $j = 0;
188
189 for my $item ( @{ $results->{items} } ) {
190 my ( $action, $result ) = %$item;
191 my @args = ($action);
192 if ( my $error = $result->{error} ) {
193 if ($on_conflict) {
194 my ( $is_conflict, $version )
195 = $self->_is_conflict_error($error);
196 if ($is_conflict) {
197 $on_conflict->( $action, $result, $j, $version );
198 next;
199 }
200 }
201 $on_error && $on_error->( $action, $result, $j );
202 }
203 else {
204 $on_success && $on_success->( $action, $result, $j );
205 }
206 $j++;
207 }
208}
209
210#===================================
211sub _is_conflict_error {
212#===================================
213 my ( $self, $error ) = @_;
214 my $version;
215 if ( ref($error) eq 'HASH' ) {
216 return 1 if $error->{type} eq 'document_already_exists_exception';
217 return unless $error->{type} eq 'version_conflict_engine_exception';
218 $error->{reason} =~ /version.conflict,.current.\[(\d+)\]/;
219 return ( 1, $1 );
220 }
221 return unless $error =~ /
222 DocumentAlreadyExistsException
223 |version.conflict,.current.\[(\d+)\]
224 /x;
225 return ( 1, $1 );
226}
227
228#===================================
229sub clear_buffer {
230#===================================
231 my $self = shift;
232 @{ $self->_buffer } = ();
233 $self->_buffer_size(0);
234 $self->_buffer_count(0);
235}
236
237#===================================
238sub _doc_transformer {
239#===================================
240 my ( $self, $params ) = @_;
241
242 my $bulk_args = $self->_bulk_args;
243 my %allowed = map { $_ => 1, "_$_" => 1 } ( @Metadata_Keys, 'source' );
244 $allowed{fields} = 1;
245
246 delete @allowed{ 'index', '_index' } if $bulk_args->{index};
247 delete @allowed{ 'type', '_type' } if $bulk_args->{type};
248
249 my $version_type = $params->{version_type};
250 my $transform = $params->{transform};
251
252 return sub {
253 my %doc = %{ shift() };
254 for ( keys %doc ) {
255 delete $doc{$_} unless $allowed{$_};
256 }
257
258 if ( my $fields = delete $doc{fields} ) {
259 for (qw(_routing routing _parent parent)) {
260 $doc{$_} = $fields->{$_}
261 if exists $fields->{$_};
262 }
263 }
264 $doc{_version_type} = $version_type if $version_type;
265
266 return \%doc unless $transform;
267 return $transform->( \%doc );
268 };
269}
270
271119µs1;
272
273# ABSTRACT: Provides common functionality to L<Elasticseach::Bulk> and L<Search::Elasticsearch::Async::Bulk>
274
275126µs1359µs__END__
 
# spent 600ns within Search::Elasticsearch::Role::Bulk::_buffer_size which was called: # once (600ns+0s) by Search::Elasticsearch::Bulk::flush at line 44 of /opt/flows/lib/lib/perl5/Search/Elasticsearch/Bulk.pm
sub Search::Elasticsearch::Role::Bulk::_buffer_size; # xsub
# spent 1µs within Search::Elasticsearch::Role::Bulk::_last_flush which was called: # once (1µs+0s) by Search::Elasticsearch::Bulk::flush at line 42 of /opt/flows/lib/lib/perl5/Search/Elasticsearch/Bulk.pm
sub Search::Elasticsearch::Role::Bulk::_last_flush; # xsub