Filename | /opt/flows/lib/lib/perl5/Search/Elasticsearch/Role/Bulk.pm |
Statements | Executed 43 statements in 1.86ms |
Calls | P | F | Exclusive Time |
Inclusive Time |
Subroutine |
---|---|---|---|---|---|
1 | 1 | 1 | 17µs | 341µs | BEGIN@3 | Search::Elasticsearch::Role::Bulk::
1 | 1 | 1 | 12µs | 18µs | BUILDARGS | Search::Elasticsearch::Role::Bulk::
1 | 1 | 1 | 9µs | 219µs | BEGIN@6 | Search::Elasticsearch::Role::Bulk::
1 | 1 | 1 | 8µs | 228µs | BEGIN@7 | Search::Elasticsearch::Role::Bulk::
1 | 1 | 1 | 6µs | 42µs | __ANON__[:3] | Search::Elasticsearch::Role::Bulk::
1 | 1 | 1 | 2µs | 2µs | __ANON__[:24] | Search::Elasticsearch::Role::Bulk::
1 | 1 | 1 | 2µs | 2µs | __ANON__[:19] | Search::Elasticsearch::Role::Bulk::
1 | 1 | 1 | 1µs | 1µs | _last_flush (xsub) | Search::Elasticsearch::Role::Bulk::
1 | 1 | 1 | 600ns | 600ns | _buffer_size (xsub) | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | __ANON__[:268] | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | __ANON__[:51] | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | _build__serializer | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | _build_on_error | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | _doc_transformer | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | _encode_action | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | _is_conflict_error | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | _report | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | clear_buffer | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | create | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | create_docs | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | delete | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | delete_ids | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | index | Search::Elasticsearch::Role::Bulk::
0 | 0 | 0 | 0s | 0s | update | Search::Elasticsearch::Role::Bulk::
Line | State ments |
Time on line |
Calls | Time in subs |
Code |
---|---|---|---|---|---|
1 | package Search::Elasticsearch::Role::Bulk; | ||||
2 | 1 | 600ns | $Search::Elasticsearch::Role::Bulk::VERSION = '2.02'; | ||
3 | 4 | 58µs | 3 | 700µs | # spent 341µs (17+324) within Search::Elasticsearch::Role::Bulk::BEGIN@3 which was called:
# once (17µs+324µs) by Module::Runtime::require_module at line 3
# spent 42µs (6+36) within Search::Elasticsearch::Role::Bulk::__ANON__[/opt/flows/lib/lib/perl5/Search/Elasticsearch/Role/Bulk.pm:3] which was called:
# once (6µs+36µs) by import::into at line 34 of Import/Into.pm # spent 341µs making 1 call to Search::Elasticsearch::Role::Bulk::BEGIN@3
# spent 324µs making 1 call to Moo::Role::import
# spent 36µs making 1 call to strictures::import |
4 | 1 | 2µs | 1 | 10µs | requires 'add_action', 'flush'; # spent 10µs making 1 call to Moo::Role::requires |
5 | |||||
6 | 2 | 36µs | 2 | 429µs | # spent 219µs (9+210) within Search::Elasticsearch::Role::Bulk::BEGIN@6 which was called:
# once (9µs+210µs) by Module::Runtime::require_module at line 6 # spent 219µs making 1 call to Search::Elasticsearch::Role::Bulk::BEGIN@6
# spent 210µs making 1 call to Sub::Exporter::__ANON__[/opt/flows/lib/lib/perl5/Sub/Exporter.pm:337] |
7 | 2 | 1.67ms | 2 | 448µs | # spent 228µs (8+220) within Search::Elasticsearch::Role::Bulk::BEGIN@7 which was called:
# once (8µs+220µs) by Module::Runtime::require_module at line 7 # spent 228µs making 1 call to Search::Elasticsearch::Role::Bulk::BEGIN@7
# spent 220µs making 1 call to namespace::clean::import |
8 | |||||
9 | 1 | 1µs | 1 | 158µs | has 'es' => ( is => 'ro', required => 1 ); # spent 158µs making 1 call to Moo::Role::has |
10 | 1 | 1µs | 1 | 131µs | has 'max_count' => ( is => 'rw', default => 1_000 ); # spent 131µs making 1 call to Moo::Role::has |
11 | 1 | 1µs | 1 | 118µs | has 'max_size' => ( is => 'rw', default => 1_000_000 ); # spent 118µs making 1 call to Moo::Role::has |
12 | 1 | 1µs | 1 | 117µs | has 'max_time' => ( is => 'rw', default => 0 ); # spent 117µs making 1 call to Moo::Role::has |
13 | |||||
14 | 1 | 1µs | 1 | 111µs | has 'on_success' => ( is => 'ro', default => 0 ); # spent 111µs making 1 call to Moo::Role::has |
15 | 1 | 1µs | 1 | 171µs | has 'on_error' => ( is => 'lazy' ); # spent 171µs making 1 call to Moo::Role::has |
16 | 1 | 1µs | 1 | 118µs | has 'on_conflict' => ( is => 'ro', default => 0 ); # spent 118µs making 1 call to Moo::Role::has |
17 | 1 | 1µs | 1 | 120µs | has 'verbose' => ( is => 'rw' ); # spent 120µs making 1 call to Moo::Role::has |
18 | |||||
19 | 2 | 7µs | 1 | 127µs | # spent 2µs within Search::Elasticsearch::Role::Bulk::__ANON__[/opt/flows/lib/lib/perl5/Search/Elasticsearch/Role/Bulk.pm:19] which was called:
# once (2µs+0s) by Search::Elasticsearch::Bulk::new at line 30 of (eval 113)[Sub/Quote.pm:5] # spent 127µs making 1 call to Moo::Role::has |
20 | 1 | 2µs | 1 | 121µs | has '_buffer_size' => ( is => 'rw', default => 0 ); # spent 121µs making 1 call to Moo::Role::has |
21 | 1 | 1µs | 1 | 118µs | has '_buffer_count' => ( is => 'rw', default => 0 ); # spent 118µs making 1 call to Moo::Role::has |
22 | 1 | 1µs | 1 | 159µs | has '_serializer' => ( is => 'lazy' ); # spent 159µs making 1 call to Moo::Role::has |
23 | 1 | 1µs | 1 | 114µs | has '_bulk_args' => ( is => 'ro' ); # spent 114µs making 1 call to Moo::Role::has |
24 | 2 | 7µs | 1 | 132µs | # spent 2µs within Search::Elasticsearch::Role::Bulk::__ANON__[/opt/flows/lib/lib/perl5/Search/Elasticsearch/Role/Bulk.pm:24] which was called:
# once (2µs+0s) by Search::Elasticsearch::Bulk::new at line 48 of (eval 113)[Sub/Quote.pm:5] # spent 132µs making 1 call to Moo::Role::has |
25 | |||||
26 | 1 | 2µs | our %Actions = ( | ||
27 | 'index' => 1, | ||||
28 | 'create' => 1, | ||||
29 | 'update' => 1, | ||||
30 | 'delete' => 1 | ||||
31 | ); | ||||
32 | |||||
33 | 1 | 2µs | our @Metadata_Keys = ( | ||
34 | 'index', 'type', 'id', 'fields', | ||||
35 | 'routing', 'parent', 'timestamp', 'ttl', | ||||
36 | 'version', 'version_type' | ||||
37 | ); | ||||
38 | |||||
39 | #=================================== | ||||
40 | sub _build__serializer { shift->es->transport->serializer } | ||||
41 | #=================================== | ||||
42 | |||||
43 | #=================================== | ||||
44 | sub _build_on_error { | ||||
45 | #=================================== | ||||
46 | my $self = shift; | ||||
47 | my $serializer = $self->_serializer; | ||||
48 | return sub { | ||||
49 | my ( $action, $result, $src ) = @_; | ||||
50 | warn( "Bulk error [$action]: " . $serializer->encode($result) ); | ||||
51 | }; | ||||
52 | } | ||||
53 | |||||
54 | #=================================== | ||||
55 | # spent 18µs (12+6) within Search::Elasticsearch::Role::Bulk::BUILDARGS which was called:
# once (12µs+6µs) by Search::Elasticsearch::Bulk::new at line 24 of (eval 113)[Sub/Quote.pm:5] | ||||
56 | #=================================== | ||||
57 | 1 | 2µs | 1 | 6µs | my ( $class, $params ) = parse_params(@_); # spent 6µs making 1 call to Search::Elasticsearch::Util::parse_params |
58 | 1 | 200ns | my %args; | ||
59 | 1 | 1µs | for (qw(index type consistency fields refresh replication routing timeout)) | ||
60 | { | ||||
61 | 8 | 4µs | $args{$_} = $params->{$_} | ||
62 | if exists $params->{$_}; | ||||
63 | } | ||||
64 | 1 | 600ns | $params->{_bulk_args} = \%args; | ||
65 | 1 | 4µs | return $params; | ||
66 | } | ||||
67 | |||||
68 | #=================================== | ||||
69 | sub index { | ||||
70 | #=================================== | ||||
71 | shift->add_action( map { ( 'index' => $_ ) } @_ ); | ||||
72 | } | ||||
73 | |||||
74 | #=================================== | ||||
75 | sub create { | ||||
76 | #=================================== | ||||
77 | shift->add_action( map { ( 'create' => $_ ) } @_ ); | ||||
78 | } | ||||
79 | |||||
80 | #=================================== | ||||
81 | sub delete { | ||||
82 | #=================================== | ||||
83 | shift->add_action( map { ( 'delete' => $_ ) } @_ ); | ||||
84 | } | ||||
85 | |||||
86 | #=================================== | ||||
87 | sub update { | ||||
88 | #=================================== | ||||
89 | shift->add_action( map { ( 'update' => $_ ) } @_ ); | ||||
90 | } | ||||
91 | |||||
92 | #=================================== | ||||
93 | sub create_docs { | ||||
94 | #=================================== | ||||
95 | my $self = shift; | ||||
96 | $self->add_action( map { ( 'create' => { _source => $_ } ) } @_ ); | ||||
97 | } | ||||
98 | |||||
99 | #=================================== | ||||
100 | sub delete_ids { | ||||
101 | #=================================== | ||||
102 | my $self = shift; | ||||
103 | $self->add_action( map { ( 'delete' => { _id => $_ } ) } @_ ); | ||||
104 | } | ||||
105 | |||||
106 | 1 | 2µs | our @Update_Params = qw( | ||
107 | doc upsert doc_as_upsert fields scripted_upsert | ||||
108 | script script_id script_file | ||||
109 | params lang detect_noop | ||||
110 | ); | ||||
111 | |||||
112 | #=================================== | ||||
113 | sub _encode_action { | ||||
114 | #=================================== | ||||
115 | my $self = shift; | ||||
116 | my $action = shift || ''; | ||||
117 | my $orig = shift; | ||||
118 | |||||
119 | throw( 'Param', "Unrecognised action <$action>" ) | ||||
120 | unless $Actions{$action}; | ||||
121 | |||||
122 | throw( 'Param', "Missing <params> for action <$action>" ) | ||||
123 | unless ref($orig) eq 'HASH'; | ||||
124 | |||||
125 | my %metadata; | ||||
126 | my $params = {%$orig}; | ||||
127 | my $serializer = $self->_serializer; | ||||
128 | |||||
129 | for (@Metadata_Keys) { | ||||
130 | my $val | ||||
131 | = exists $params->{$_} ? delete $params->{$_} | ||||
132 | : exists $params->{"_$_"} ? delete $params->{"_$_"} | ||||
133 | : next; | ||||
134 | $metadata{"_$_"} = $val; | ||||
135 | } | ||||
136 | |||||
137 | throw( 'Param', "Missing required param <index>" ) | ||||
138 | unless $metadata{_index} || $self->_bulk_args->{index}; | ||||
139 | throw( 'Param', "Missing required param <type>" ) | ||||
140 | unless $metadata{_type} || $self->_bulk_args->{type}; | ||||
141 | |||||
142 | my $source; | ||||
143 | if ( $action eq 'update' ) { | ||||
144 | for (@Update_Params) { | ||||
145 | $source->{$_} = delete $params->{$_} | ||||
146 | if exists $params->{$_}; | ||||
147 | } | ||||
148 | } | ||||
149 | elsif ( $action ne 'delete' ) { | ||||
150 | $source | ||||
151 | = delete $params->{_source} | ||||
152 | || delete $params->{source} | ||||
153 | || throw( | ||||
154 | 'Param', | ||||
155 | "Missing <source> for action <$action>: " | ||||
156 | . $serializer->encode($orig) | ||||
157 | ); | ||||
158 | } | ||||
159 | |||||
160 | throw( "Unknown params <" | ||||
161 | . ( join ',', sort keys %$params ) | ||||
162 | . "> in <$action>: " | ||||
163 | . $serializer->encode($orig) ) | ||||
164 | if keys %$params; | ||||
165 | |||||
166 | return map { $serializer->encode($_) } | ||||
167 | grep {$_} ( { $action => \%metadata }, $source ); | ||||
168 | } | ||||
169 | |||||
170 | #=================================== | ||||
171 | sub _report { | ||||
172 | #=================================== | ||||
173 | my ( $self, $buffer, $results ) = @_; | ||||
174 | my $on_success = $self->on_success; | ||||
175 | my $on_error = $self->on_error; | ||||
176 | my $on_conflict = $self->on_conflict; | ||||
177 | |||||
178 | # assume errors if key not present, bwc | ||||
179 | $results->{errors} = 1 unless exists $results->{errors}; | ||||
180 | |||||
181 | return | ||||
182 | unless $on_success | ||||
183 | || ( $results->{errors} and $on_error || $on_conflict ); | ||||
184 | |||||
185 | my $serializer = $self->_serializer; | ||||
186 | |||||
187 | my $j = 0; | ||||
188 | |||||
189 | for my $item ( @{ $results->{items} } ) { | ||||
190 | my ( $action, $result ) = %$item; | ||||
191 | my @args = ($action); | ||||
192 | if ( my $error = $result->{error} ) { | ||||
193 | if ($on_conflict) { | ||||
194 | my ( $is_conflict, $version ) | ||||
195 | = $self->_is_conflict_error($error); | ||||
196 | if ($is_conflict) { | ||||
197 | $on_conflict->( $action, $result, $j, $version ); | ||||
198 | next; | ||||
199 | } | ||||
200 | } | ||||
201 | $on_error && $on_error->( $action, $result, $j ); | ||||
202 | } | ||||
203 | else { | ||||
204 | $on_success && $on_success->( $action, $result, $j ); | ||||
205 | } | ||||
206 | $j++; | ||||
207 | } | ||||
208 | } | ||||
209 | |||||
210 | #=================================== | ||||
211 | sub _is_conflict_error { | ||||
212 | #=================================== | ||||
213 | my ( $self, $error ) = @_; | ||||
214 | my $version; | ||||
215 | if ( ref($error) eq 'HASH' ) { | ||||
216 | return 1 if $error->{type} eq 'document_already_exists_exception'; | ||||
217 | return unless $error->{type} eq 'version_conflict_engine_exception'; | ||||
218 | $error->{reason} =~ /version.conflict,.current.\[(\d+)\]/; | ||||
219 | return ( 1, $1 ); | ||||
220 | } | ||||
221 | return unless $error =~ / | ||||
222 | DocumentAlreadyExistsException | ||||
223 | |version.conflict,.current.\[(\d+)\] | ||||
224 | /x; | ||||
225 | return ( 1, $1 ); | ||||
226 | } | ||||
227 | |||||
228 | #=================================== | ||||
229 | sub clear_buffer { | ||||
230 | #=================================== | ||||
231 | my $self = shift; | ||||
232 | @{ $self->_buffer } = (); | ||||
233 | $self->_buffer_size(0); | ||||
234 | $self->_buffer_count(0); | ||||
235 | } | ||||
236 | |||||
237 | #=================================== | ||||
238 | sub _doc_transformer { | ||||
239 | #=================================== | ||||
240 | my ( $self, $params ) = @_; | ||||
241 | |||||
242 | my $bulk_args = $self->_bulk_args; | ||||
243 | my %allowed = map { $_ => 1, "_$_" => 1 } ( @Metadata_Keys, 'source' ); | ||||
244 | $allowed{fields} = 1; | ||||
245 | |||||
246 | delete @allowed{ 'index', '_index' } if $bulk_args->{index}; | ||||
247 | delete @allowed{ 'type', '_type' } if $bulk_args->{type}; | ||||
248 | |||||
249 | my $version_type = $params->{version_type}; | ||||
250 | my $transform = $params->{transform}; | ||||
251 | |||||
252 | return sub { | ||||
253 | my %doc = %{ shift() }; | ||||
254 | for ( keys %doc ) { | ||||
255 | delete $doc{$_} unless $allowed{$_}; | ||||
256 | } | ||||
257 | |||||
258 | if ( my $fields = delete $doc{fields} ) { | ||||
259 | for (qw(_routing routing _parent parent)) { | ||||
260 | $doc{$_} = $fields->{$_} | ||||
261 | if exists $fields->{$_}; | ||||
262 | } | ||||
263 | } | ||||
264 | $doc{_version_type} = $version_type if $version_type; | ||||
265 | |||||
266 | return \%doc unless $transform; | ||||
267 | return $transform->( \%doc ); | ||||
268 | }; | ||||
269 | } | ||||
270 | |||||
271 | 1 | 19µs | 1; | ||
272 | |||||
273 | # ABSTRACT: Provides common functionality to L<Elasticseach::Bulk> and L<Search::Elasticsearch::Async::Bulk> | ||||
274 | |||||
275 | 1 | 26µs | 1 | 359µs | __END__ # spent 359µs making 1 call to B::Hooks::EndOfScope::XS::__ANON__[/opt/flows/lib/lib/perl5/B/Hooks/EndOfScope/XS.pm:17] |
# spent 600ns within Search::Elasticsearch::Role::Bulk::_buffer_size which was called:
# once (600ns+0s) by Search::Elasticsearch::Bulk::flush at line 44 of /opt/flows/lib/lib/perl5/Search/Elasticsearch/Bulk.pm | |||||
# spent 1µs within Search::Elasticsearch::Role::Bulk::_last_flush which was called:
# once (1µs+0s) by Search::Elasticsearch::Bulk::flush at line 42 of /opt/flows/lib/lib/perl5/Search/Elasticsearch/Bulk.pm |