Filename | /data/flows/bin/flows_to_es.pl |
Statements | Executed 92 statements in 32.5ms |
Calls | P | F | Exclusive Time |
Inclusive Time |
Subroutine |
---|---|---|---|---|---|
4 | 3 | 1 | 16.6ms | 16.6ms | CORE:backtick (opcode) | main::
1 | 1 | 1 | 6.30ms | 8.78ms | BEGIN@9 | main::
1 | 1 | 1 | 5.02ms | 22.8ms | BEGIN@7 | main::
1 | 1 | 1 | 3.31ms | 3.67ms | BEGIN@124 | main::
1 | 1 | 1 | 3.30ms | 17.3ms | BEGIN@5 | main::
11 | 11 | 8 | 2.77ms | 81.8ms | with | Moo::
1 | 1 | 1 | 2.36ms | 2.39ms | BEGIN@4 | main::
1 | 1 | 1 | 2.33ms | 6.45ms | BEGIN@123 | main::
1 | 1 | 1 | 2.16ms | 3.44ms | BEGIN@207 | main::
2 | 2 | 1 | 1.38ms | 1.65ms | before | Moo::Role::
1 | 1 | 1 | 1.31ms | 2.90ms | BEGIN@10 | main::
1 | 1 | 1 | 1.03ms | 10.3ms | BEGIN@8 | main::
69 | 69 | 9 | 949µs | 15.9ms | has | Moo::Role::
1 | 1 | 1 | 921µs | 4.81ms | BEGIN@11 | main::
1 | 1 | 1 | 769µs | 34.2ms | BEGIN@147 | main::
312 | 24 | 13 | 615µs | 615µs | can (xsub) | UNIVERSAL::
1 | 1 | 1 | 386µs | 424µs | BEGIN@3 | main::
1 | 1 | 1 | 337µs | 337µs | CORE:unlink (opcode) | main::
32 | 22 | 17 | 304µs | 304µs | VERSION (xsub) | UNIVERSAL::
5 | 5 | 2 | 102µs | 17.8ms | has | Moo::
1 | 1 | 1 | 83µs | 38.7ms | load_storable_file | main::
9 | 9 | 9 | 76µs | 93µs | requires | Moo::Role::
1 | 1 | 1 | 64µs | 64µs | return_mapping | main::
1 | 1 | 1 | 60µs | 6.46ms | try {...} | Search::Elasticsearch::Transport::
2 | 2 | 1 | 59µs | 6.48ms | clear_flows | main::
1 | 1 | 1 | 56µs | 5.92ms | retrieve_flows | main::
1 | 1 | 1 | 53µs | 4.35ms | retrieve_flow_duration | main::
1 | 1 | 1 | 50µs | 183ms | create_es_bulk | main::
1 | 1 | 1 | 44µs | 1.91ms | try {...} | Search::Elasticsearch::Role::Cxn::
16 | 3 | 2 | 31µs | 31µs | isa (xsub) | UNIVERSAL::
2 | 2 | 1 | 30µs | 30µs | CORE:ftis (opcode) | main::
2 | 2 | 1 | 26µs | 39µs | return_estimestamp | main::
1 | 1 | 1 | 24µs | 92µs | try {...} | Search::Elasticsearch::Role::Client::Direct::
2 | 2 | 1 | 24µs | 65µs | return_esdate | main::
24 | 1 | 1 | 22µs | 22µs | SvREADONLY (xsub) | Internals::
3 | 3 | 3 | 22µs | 8.37ms | with | Moo::Role::
1 | 1 | 1 | 12µs | 49µs | BEGIN@125 | main::
2 | 2 | 2 | 11µs | 14µs | around | Moo::Role::
2 | 1 | 1 | 10µs | 2.49ms | try {...} | Module::Implementation::
1 | 1 | 1 | 10µs | 1.28ms | BEGIN@230 | main::
1 | 1 | 1 | 9µs | 42µs | BEGIN@381 | main::
1 | 1 | 1 | 8µs | 48µs | BEGIN@235 | main::
1 | 1 | 1 | 8µs | 15.9ms | create_bulk | main::
6 | 3 | 3 | 7µs | 7µs | downgrade (xsub) | utf8::
1 | 1 | 1 | 5µs | 5µs | (bool (xsub) | version::
1 | 1 | 1 | 5µs | 5µs | (cmp (xsub) | version::
1 | 1 | 1 | 4µs | 4µs | CORE:pack (opcode) | main::
1 | 1 | 1 | 2µs | 2µs | CORE:match (opcode) | main::
1 | 1 | 1 | 1µs | 1µs | encode (xsub) | utf8::
0 | 0 | 0 | 0s | 0s | RUNTIME | main::
0 | 0 | 0 | 0s | 0s | __ANON__[flows_to_es.pl:159] | main::
0 | 0 | 0 | 0s | 0s | __ANON__[flows_to_es.pl:162] | main::
0 | 0 | 0 | 0s | 0s | __ANON__[flows_to_es.pl:393] | main::
0 | 0 | 0 | 0s | 0s | __ANON__[flows_to_es.pl:395] | main::
0 | 0 | 0 | 0s | 0s | __ANON__[flows_to_es.pl:58] | main::
0 | 0 | 0 | 0s | 0s | __ANON__[flows_to_es.pl:61] | main::
0 | 0 | 0 | 0s | 0s | create_es_index | main::
0 | 0 | 0 | 0s | 0s | send_to_influxdb | main::
0 | 0 | 0 | 0s | 0s | transform_tags | main::
Line | State ments |
Time on line |
Calls | Time in subs |
Code |
---|---|---|---|---|---|
0 | 2 | 11µs | Profile data that couldn't be associated with a specific line: # spent 7µs making 1 call to Class::XSAccessor::END
# spent 4µs making 1 call to JSON::XS::DESTROY | ||
1 | #!/usr/bin/perl | ||||
2 | |||||
3 | 2 | 365µs | 2 | 440µs | # spent 424µs (386+38) within main::BEGIN@3 which was called:
# once (386µs+38µs) by main::RUNTIME at line 3 # spent 424µs making 1 call to main::BEGIN@3
# spent 16µs making 1 call to strict::import |
4 | 2 | 2.27ms | 2 | 2.40ms | # spent 2.39ms (2.36+26µs) within main::BEGIN@4 which was called:
# once (2.36ms+26µs) by main::RUNTIME at line 4 # spent 2.39ms making 1 call to main::BEGIN@4
# spent 7µs making 1 call to warnings::import |
5 | 2 | 95µs | 2 | 18.0ms | # spent 17.3ms (3.30+14.0) within main::BEGIN@5 which was called:
# once (3.30ms+14.0ms) by main::RUNTIME at line 5 # spent 17.3ms making 1 call to main::BEGIN@5
# spent 610µs making 1 call to local::lib::import |
6 | |||||
7 | 2 | 145µs | 2 | 23.5ms | # spent 22.8ms (5.02+17.8) within main::BEGIN@7 which was called:
# once (5.02ms+17.8ms) by main::RUNTIME at line 7 # spent 22.8ms making 1 call to main::BEGIN@7
# spent 716µs making 1 call to IO::Socket::import |
8 | 2 | 132µs | 2 | 10.3ms | # spent 10.3ms (1.03+9.25) within main::BEGIN@8 which was called:
# once (1.03ms+9.25ms) by main::RUNTIME at line 8 # spent 10.3ms making 1 call to main::BEGIN@8
# spent 58µs making 1 call to Exporter::import |
9 | 2 | 139µs | 2 | 8.81ms | # spent 8.78ms (6.30+2.49) within main::BEGIN@9 which was called:
# once (6.30ms+2.49ms) by main::RUNTIME at line 9 # spent 8.78ms making 1 call to main::BEGIN@9
# spent 29µs making 1 call to Exporter::import |
10 | 2 | 161µs | 2 | 2.94ms | # spent 2.90ms (1.31+1.58) within main::BEGIN@10 which was called:
# once (1.31ms+1.58ms) by main::RUNTIME at line 10 # spent 2.90ms making 1 call to main::BEGIN@10
# spent 41µs making 1 call to Exporter::import |
11 | 2 | 731µs | 2 | 4.84ms | # spent 4.81ms (921µs+3.89) within main::BEGIN@11 which was called:
# once (921µs+3.89ms) by main::RUNTIME at line 11 # spent 4.81ms making 1 call to main::BEGIN@11
# spent 36µs making 1 call to Exporter::import |
12 | |||||
13 | # | ||||
14 | # This uses the pmacct pipe at $pipe to pull in, process, and store netflow | ||||
15 | # data into InfluxDB. Also a few subs exist for reading cached Storable files | ||||
16 | # for adding more info into InfluxDB. This could easily be expanded to store | ||||
17 | # into ElasticSearch or any other DB as well. This is designed to run fast from | ||||
18 | # cron at every X minutes depending on the resolution you want/need. | ||||
19 | # | ||||
20 | # Also important to note that this uses the InfluxDB UDP "Line Protocol" not | ||||
21 | # the graphite input. The database to insert into is determined by the InfluxDB | ||||
22 | # configuration and each port will go into a different database. | ||||
23 | # | ||||
24 | |||||
25 | # TODO: | ||||
26 | # Finish the transform_tags sub so we can re-use this and just pass multiple | ||||
27 | # hashes into it whenever we need to add/re-write tags into data. | ||||
28 | # Add support for ElasticSearch? Should be trivial, just another sub that takes | ||||
29 | # both %values and %tags. | ||||
30 | |||||
31 | # Global shit | ||||
32 | 1 | 1µs | my $pipe = '/opt/flows/cache/full.pipe'; | ||
33 | 1 | 300ns | my $influxdb_host = '127.0.0.1'; | ||
34 | 1 | 300ns | my $influxdb_port = '9999'; | ||
35 | 1 | 300ns | my $storage_name = 'netflow'; | ||
36 | 1 | 300ns | my $mapping_name = 'netflowdata'; | ||
37 | 1 | 2µs | my @es_nodes = [ '127.0.0.1:9200' ]; | ||
38 | 1 | 200ns | my $influx_enabled = 0; | ||
39 | 1 | 6µs | 1 | 298µs | my $sock = IO::Socket::INET->new( Proto => 'udp', PeerPort => $influxdb_port, PeerAddr => $influxdb_host) or die "Could not create socket $!\n"; # spent 298µs making 1 call to IO::Socket::INET::new |
40 | # | ||||
41 | |||||
42 | 1 | 3µs | 1 | 4.35ms | my $duration = retrieve_flow_duration($pipe); # spent 4.35ms making 1 call to main::retrieve_flow_duration |
43 | 1 | 5µs | 1 | 5.92ms | my @flows = retrieve_flows($pipe); # spent 5.92ms making 1 call to main::retrieve_flows |
44 | 1 | 14µs | 1 | 3.02ms | clear_flows($pipe); # spent 3.02ms making 1 call to main::clear_flows |
45 | |||||
46 | 1 | 7µs | 1 | 38.7ms | my $librenms = load_storable_file('/opt/flows/cache/librenms.stor'); # spent 38.7ms making 1 call to main::load_storable_file |
47 | |||||
48 | 1 | 7µs | 1 | 183ms | my $bulk = create_es_bulk($storage_name); # spent 183ms making 1 call to main::create_es_bulk |
49 | |||||
50 | 1 | 3µs | 1 | 23µs | my $estime = return_estimestamp(); # spent 23µs making 1 call to main::return_estimestamp |
51 | 1 | 2µs | 1 | 11µs | my $esdate = return_esdate(); # spent 11µs making 1 call to main::return_esdate |
52 | |||||
53 | 1 | 1µs | foreach my $line (@flows) { | ||
54 | chomp($line); | ||||
55 | my $obj; | ||||
56 | try { | ||||
57 | $obj = decode_json($line); | ||||
58 | } catch { | ||||
59 | print "Error, Malformed JSON: $line\n"; | ||||
60 | next; | ||||
61 | }; | ||||
62 | my %values; | ||||
63 | $values{pps} = eval { round((($obj->{packets}) / $duration)) } || 0; | ||||
64 | # 368 bits assumes Ethernet Header, IFG, and 2 dot1q tags | ||||
65 | $values{bps} = eval { round((($obj->{bytes} * 8) + ($values{pps} * 368)) / $duration) } || 0; | ||||
66 | $values{bytes} = $obj->{bytes}; | ||||
67 | $values{packets} = $obj->{packets}; | ||||
68 | $values{avg_size} = eval{ round(($obj->{bytes}) / $values{packets}) } || 0; | ||||
69 | if ($obj->{as_path}) { | ||||
70 | my @aspath = split(' ', $obj->{as_path}); | ||||
71 | my $i = 1; | ||||
72 | foreach my $as (@aspath) { | ||||
73 | $values{as_path}{"as_path_" . $i} = $as; | ||||
74 | $i++; | ||||
75 | } | ||||
76 | } | ||||
77 | |||||
78 | my %librenms_info; | ||||
79 | if ($librenms->{devices}{$obj->{tag}}) { | ||||
80 | my $device = $librenms->{devices}{$obj->{tag}}; | ||||
81 | $librenms_info{device_name} = $device->{info}{hostname} || ''; | ||||
82 | $librenms_info{device_location} = $device->{info}{location} || ''; | ||||
83 | $librenms_info{iface_in_descr} = $device->{ports_by_ifIndex}{$obj->{in_iface}}{port_descr_descr} || ''; | ||||
84 | $librenms_info{iface_out_descr} = $device->{ports_by_ifIndex}{$obj->{out_iface}}{port_descr_descr} || ''; | ||||
85 | $librenms_info{iface_in_type} = $device->{ports_by_ifIndex}{$obj->{in_iface}}{port_descr_type} || ''; | ||||
86 | $librenms_info{iface_out_type} = $device->{ports_by_ifIndex}{$obj->{out_iface}}{port_descr_type} || ''; | ||||
87 | } | ||||
88 | |||||
89 | $bulk->index ({ | ||||
90 | index => "$storage_name-$esdate", | ||||
91 | type => "$mapping_name", | ||||
92 | source => { | ||||
93 | '_timestamp' => $estime, | ||||
94 | 'bps' => $values{bps}, | ||||
95 | 'pps' => $values{pps}, | ||||
96 | 'avg_size' => $values{avg_size}, | ||||
97 | 'as_path' => \%{$values{as_path}}, | ||||
98 | 'flow_stats' => $obj, | ||||
99 | 'librenms_info' => \%librenms_info, | ||||
100 | }, | ||||
101 | }); | ||||
102 | |||||
103 | if ($influx_enabled == 1) { | ||||
104 | my %tags; | ||||
105 | foreach my $key (keys %{$obj}) { | ||||
106 | next if $key =~ /bytes/; | ||||
107 | next if $key =~ /packets/; | ||||
108 | next if $obj->{$key} =~ /^$/; | ||||
109 | next if $obj->{$key} =~ /^0$/; | ||||
110 | $tags{$key} = $obj->{$key}; | ||||
111 | } | ||||
112 | send_to_influxdb($sock, 'netflow', \%values, \%tags) || print "Error sending to InfluxDB\n"; | ||||
113 | } | ||||
114 | } | ||||
115 | |||||
116 | 1 | 3µs | 1 | 14µs | $bulk->flush(); # spent 14µs making 1 call to Search::Elasticsearch::Bulk::flush |
117 | 1 | 8.82ms | 1 | 3.46ms | clear_flows($pipe); # spent 3.46ms making 1 call to main::clear_flows |
118 | |||||
119 | # | ||||
120 | # Copies the file, locks, retreives, then deletes. Avoids any concurrency issues. | ||||
121 | # | ||||
122 | # spent 38.7ms (83µs+38.6) within main::load_storable_file which was called:
# once (83µs+38.6ms) by main::RUNTIME at line 46 | ||||
123 | 2 | 182µs | 2 | 10.2ms | # spent 6.45ms (2.33+4.12) within main::BEGIN@123 which was called:
# once (2.33ms+4.12ms) by main::RUNTIME at line 123 # spent 6.45ms making 1 call to main::BEGIN@123
# spent 3.73ms making 1 call to Exporter::import |
124 | 2 | 144µs | 2 | 3.71ms | # spent 3.67ms (3.31+362µs) within main::BEGIN@124 which was called:
# once (3.31ms+362µs) by main::RUNTIME at line 124 # spent 3.67ms making 1 call to main::BEGIN@124
# spent 40µs making 1 call to Exporter::import |
125 | 2 | 162µs | 2 | 86µs | # spent 49µs (12+37) within main::BEGIN@125 which was called:
# once (12µs+37µs) by main::RUNTIME at line 125 # spent 49µs making 1 call to main::BEGIN@125
# spent 37µs making 1 call to Exporter::import |
126 | 1 | 1µs | my $file = shift; | ||
127 | 1 | 400ns | my $thawed; | ||
128 | 1 | 21µs | 1 | 10µs | if (-e $file) { # spent 10µs making 1 call to main::CORE:ftis |
129 | 1 | 5µs | 1 | 109µs | my $dir = dirname($file); # spent 109µs making 1 call to File::Basename::dirname |
130 | 1 | 14µs | 1 | 2.63ms | copy($file, "$dir/$$\.tmp") || return 0; # spent 2.63ms making 1 call to File::Copy::copy |
131 | 1 | 39µs | 1 | 20µs | if (-e "$dir/$$\.tmp") { # spent 20µs making 1 call to main::CORE:ftis |
132 | 1 | 10µs | 1 | 35.5ms | $thawed = lock_retrieve("$dir/$$\.tmp"); # spent 35.5ms making 1 call to Storable::lock_retrieve |
133 | 1 | 351µs | 1 | 337µs | unlink("$dir/$$\.tmp"); # spent 337µs making 1 call to main::CORE:unlink |
134 | 1 | 2µs | if (!$thawed) { | ||
135 | return 0; | ||||
136 | } | ||||
137 | } else { | ||||
138 | return 0; | ||||
139 | } | ||||
140 | } else { | ||||
141 | return 0; | ||||
142 | } | ||||
143 | 1 | 6µs | return $thawed; | ||
144 | } | ||||
145 | |||||
146 | # spent 183ms (50µs+183) within main::create_es_bulk which was called:
# once (50µs+183ms) by main::RUNTIME at line 48 | ||||
147 | 2 | 431µs | 1 | 34.2ms | # spent 34.2ms (769µs+33.5) within main::BEGIN@147 which was called:
# once (769µs+33.5ms) by main::RUNTIME at line 147 # spent 34.2ms making 1 call to main::BEGIN@147 |
148 | 1 | 2µs | my $dbname = shift; | ||
149 | |||||
150 | |||||
151 | 1 | 11µs | 1 | 155ms | my $es = Search::Elasticsearch->new(nodes => @es_nodes); # spent 155ms making 1 call to Search::Elasticsearch::new |
152 | 1 | 5µs | 1 | 54µs | my $esdate = return_esdate(); # spent 54µs making 1 call to main::return_esdate |
153 | 1 | 4µs | 1 | 16µs | my $estime = return_estimestamp(); # spent 16µs making 1 call to main::return_estimestamp |
154 | 1 | 3µs | 1 | 64µs | my ($mapping, $datatype) = return_mapping($dbname); # spent 64µs making 1 call to main::return_mapping |
155 | |||||
156 | 1 | 6µs | 2 | 7.02ms | if (!($es->indices->exists(index => "$dbname-$esdate"))) { # spent 6.72ms making 1 call to Search::Elasticsearch::Role::Client::Direct::__ANON__[/opt/flows/lib/lib/perl5/Search/Elasticsearch/Role/Client/Direct.pm:102]
# spent 298µs making 1 call to Search::Elasticsearch::Role::Client::Direct::Main::indices |
157 | try { | ||||
158 | $es->indices->create( index => "$dbname-$esdate", body => { mappings => $mapping, } ); | ||||
159 | } catch { | ||||
160 | print "Create mapping for index $dbname-$esdate failed\n"; | ||||
161 | return 0; | ||||
162 | }; | ||||
163 | } | ||||
164 | 1 | 8µs | 1 | 15.9ms | return create_bulk($es, "$dbname-$esdate", 'datatype'); # spent 15.9ms making 1 call to main::create_bulk |
165 | } | ||||
166 | |||||
167 | # I think this covers us for most use-cases: | ||||
168 | # | ||||
169 | # $tags = ( | ||||
170 | # key => { | ||||
171 | # operation => 'replace' | ||||
172 | # value => { | ||||
173 | # 100 => 'somename', | ||||
174 | # }, | ||||
175 | # key2 => { | ||||
176 | # operation => 'add', | ||||
177 | # newtag => 'key3' | ||||
178 | # value => { | ||||
179 | # 101 => 'Transit' | ||||
180 | # }, | ||||
181 | # }, | ||||
182 | # ); | ||||
183 | |||||
184 | # | ||||
185 | # Danger, this doesn't do what you think. | ||||
186 | # | ||||
187 | sub transform_tags { | ||||
188 | my $tags = shift; | ||||
189 | my $input_transform = shift; | ||||
190 | |||||
191 | return $tags unless $input_transform; | ||||
192 | |||||
193 | foreach my $key (keys %{$input_transform}) { | ||||
194 | next unless $tags->{$key}; | ||||
195 | if ($input_transform->{$key}{operation} =~ /replace/i) { | ||||
196 | $tags->{$key} = $input_transform->{$key}{value} | ||||
197 | } elsif ($input_transform->{$key}{operation} =~ /add/i) { | ||||
198 | $tags->{$input_transform->{$key}{addkey}} = $input_transform->{$key}{addvalue}; | ||||
199 | } else { | ||||
200 | next; | ||||
201 | } | ||||
202 | } | ||||
203 | return $tags; | ||||
204 | } | ||||
205 | |||||
206 | sub send_to_influxdb { | ||||
207 | 2 | 254µs | 2 | 3.47ms | # spent 3.44ms (2.16+1.29) within main::BEGIN@207 which was called:
# once (2.16ms+1.29ms) by main::RUNTIME at line 207 # spent 3.44ms making 1 call to main::BEGIN@207
# spent 20µs making 1 call to InfluxDB::LineProtocol::import |
208 | my $sock = shift; | ||||
209 | my $metric = shift; | ||||
210 | my $values = shift; | ||||
211 | my $tags = shift; | ||||
212 | my $timestamp = shift || time * 1000000000; | ||||
213 | my $influxline = data2line($metric, $values, $tags, $timestamp) . "\n" || return 0; | ||||
214 | #print $influxline; | ||||
215 | $sock->send($influxline) || return 0; | ||||
216 | return 1; | ||||
217 | } | ||||
218 | |||||
219 | ### Start ES | ||||
220 | |||||
221 | # spent 15.9ms (8µs+15.9) within main::create_bulk which was called:
# once (8µs+15.9ms) by main::create_es_bulk at line 164 | ||||
222 | 1 | 500ns | my $es = shift; | ||
223 | 1 | 400ns | my $index = shift; | ||
224 | 1 | 900ns | my $mapping = shift; | ||
225 | |||||
226 | 1 | 6µs | 1 | 15.9ms | return $es->bulk_helper('index' => $index, 'type' => $mapping) || return 0; # spent 15.9ms making 1 call to Search::Elasticsearch::Role::Client::Direct::Main::bulk_helper |
227 | } | ||||
228 | |||||
229 | sub return_esdate { | ||||
230 | 2 | 69µs | 2 | 2.55ms | # spent 1.28ms (10µs+1.27) within main::BEGIN@230 which was called:
# once (10µs+1.27ms) by main::RUNTIME at line 230 # spent 1.28ms making 1 call to main::BEGIN@230
# spent 1.27ms making 1 call to POSIX::import |
231 | 2 | 66µs | 2 | 40µs | return strftime("%Y-%m-%d-%H", gmtime); # spent 40µs making 2 calls to POSIX::strftime, avg 20µs/call |
232 | } | ||||
233 | |||||
234 | sub return_estimestamp { | ||||
235 | 2 | 323µs | 2 | 88µs | # spent 48µs (8+40) within main::BEGIN@235 which was called:
# once (8µs+40µs) by main::RUNTIME at line 235 # spent 48µs making 1 call to main::BEGIN@235
# spent 40µs making 1 call to POSIX::import |
236 | 2 | 39µs | 2 | 13µs | return strftime("%Y-%m-%d %H:%M:%S.000", localtime); # spent 13µs making 2 calls to POSIX::strftime, avg 6µs/call |
237 | } | ||||
238 | |||||
239 | # spent 64µs within main::return_mapping which was called:
# once (64µs+0s) by main::create_es_bulk at line 154 | ||||
240 | 1 | 1µs | my $dbname = shift; | ||
241 | 1 | 49µs | my %mapping = ( | ||
242 | '_default_' => { | ||||
243 | '_timestamp' => { | ||||
244 | enabled => 'true', | ||||
245 | store => 'true', | ||||
246 | format => 'date_hour_minute_second_fraction', | ||||
247 | index => 'analyzed', | ||||
248 | }, | ||||
249 | dynamic_templates => { | ||||
250 | as_path => { | ||||
251 | path_match => 'as_path.as_path_*', | ||||
252 | mapping => { | ||||
253 | type => 'integer', | ||||
254 | store => 'true', | ||||
255 | }, | ||||
256 | }, | ||||
257 | }, | ||||
258 | properties => { | ||||
259 | pps => { | ||||
260 | type => 'long', | ||||
261 | store => 'true', | ||||
262 | }, | ||||
263 | bps => { | ||||
264 | type => 'long', | ||||
265 | store => 'true', | ||||
266 | }, | ||||
267 | avg_size => { | ||||
268 | type => 'long', | ||||
269 | store => 'true', | ||||
270 | }, | ||||
271 | bytes => { | ||||
272 | type => 'long', | ||||
273 | store => 'true', | ||||
274 | }, | ||||
275 | flow_stats => { | ||||
276 | type => 'object', | ||||
277 | properties => { | ||||
278 | bytes => { | ||||
279 | type => 'long', | ||||
280 | store => 'true', | ||||
281 | }, | ||||
282 | packets => { | ||||
283 | type => 'long', | ||||
284 | store => 'true', | ||||
285 | }, | ||||
286 | mac_dst => { | ||||
287 | type => 'string', | ||||
288 | index => =>'not_analyzed', | ||||
289 | }, | ||||
290 | ip_proto => { | ||||
291 | type => 'string', | ||||
292 | index => 'not_analyzed', | ||||
293 | }, | ||||
294 | ip_src => { | ||||
295 | type => 'ip', | ||||
296 | }, | ||||
297 | ip_dst => { | ||||
298 | type => 'ip', | ||||
299 | }, | ||||
300 | iface_in => { | ||||
301 | type => 'long', | ||||
302 | index => 'not_analyzed', | ||||
303 | }, | ||||
304 | iface_out => { | ||||
305 | type => 'long', | ||||
306 | store => 'true', | ||||
307 | }, | ||||
308 | port_src => { | ||||
309 | type => 'integer', | ||||
310 | store => 'true', | ||||
311 | }, | ||||
312 | port_dst => { | ||||
313 | type => 'integer', | ||||
314 | store => 'true', | ||||
315 | }, | ||||
316 | tos => { | ||||
317 | type => 'integer', | ||||
318 | store => 'true', | ||||
319 | }, | ||||
320 | tcp_flags => { | ||||
321 | type => 'integer', | ||||
322 | store => 'true' | ||||
323 | }, | ||||
324 | as_path => { | ||||
325 | type => 'string', | ||||
326 | index => 'not_analyzed', | ||||
327 | }, | ||||
328 | local_pref => { | ||||
329 | type => 'integer', | ||||
330 | store => 'true', | ||||
331 | }, | ||||
332 | med => { | ||||
333 | type => 'integer', | ||||
334 | store => 'true', | ||||
335 | }, | ||||
336 | tag => { | ||||
337 | type => 'integer', | ||||
338 | store => 'true', | ||||
339 | }, | ||||
340 | tag2 => { | ||||
341 | type => 'integer', | ||||
342 | store => 'true', | ||||
343 | }, | ||||
344 | }, | ||||
345 | }, | ||||
346 | librenms_info => { | ||||
347 | type => 'object', | ||||
348 | properties => { | ||||
349 | iface_in_descr => { | ||||
350 | type => 'string', | ||||
351 | index => 'not_analyzed', | ||||
352 | }, | ||||
353 | iface_in_type => { | ||||
354 | type => 'string', | ||||
355 | index => 'not_analyzed', | ||||
356 | }, | ||||
357 | iface_out_descr => { | ||||
358 | type => 'string', | ||||
359 | index => 'not_analyzed', | ||||
360 | }, | ||||
361 | iface_out_type => { | ||||
362 | type => 'string', | ||||
363 | index => 'not_analyzed', | ||||
364 | }, | ||||
365 | device_name => { | ||||
366 | type => 'string', | ||||
367 | index => 'not_analyzed', | ||||
368 | }, | ||||
369 | device_location => { | ||||
370 | type => 'string', | ||||
371 | index => 'not_analyzed', | ||||
372 | }, | ||||
373 | } | ||||
374 | } | ||||
375 | }, | ||||
376 | }); | ||||
377 | 1 | 14µs | return \%{$mapping{$dbname}}; | ||
378 | } | ||||
379 | |||||
380 | sub create_es_index { | ||||
381 | 2 | 596µs | 2 | 74µs | # spent 42µs (9+32) within main::BEGIN@381 which was called:
# once (9µs+32µs) by main::RUNTIME at line 381 # spent 42µs making 1 call to main::BEGIN@381
# spent 32µs making 1 call to Exporter::import |
382 | my $es = shift; | ||||
383 | my $index = shift; | ||||
384 | my $mapping = shift; | ||||
385 | |||||
386 | return 1 if $es->indices->exists(index => $index); | ||||
387 | try { | ||||
388 | $es->index->create(index => $index, | ||||
389 | body => { | ||||
390 | mappings => $mapping, | ||||
391 | } | ||||
392 | ); | ||||
393 | } catch { | ||||
394 | return 0; | ||||
395 | }; | ||||
396 | return 1; | ||||
397 | } | ||||
398 | |||||
399 | ### End ES | ||||
400 | |||||
401 | # | ||||
402 | sub clear_flows { | ||||
403 | 2 | 3µs | my $pipe = shift; | ||
404 | 2 | 6.46ms | 2 | 6.42ms | my $cleared = `pmacct -p $pipe -l -e`; # spent 6.42ms making 2 calls to main::CORE:backtick, avg 3.21ms/call |
405 | 2 | 28µs | return 1; | ||
406 | } | ||||
407 | |||||
408 | # | ||||
409 | # spent 5.92ms (56µs+5.87) within main::retrieve_flows which was called:
# once (56µs+5.87ms) by main::RUNTIME at line 43 | ||||
410 | 1 | 500ns | my $pipe = shift; | ||
411 | 1 | 5.91ms | 1 | 5.87ms | my @flows = `pmacct -p $pipe -l -O json -s`; # spent 5.87ms making 1 call to main::CORE:backtick |
412 | 1 | 22µs | return @flows; | ||
413 | } | ||||
414 | |||||
415 | # | ||||
416 | # This could probably be done better. | ||||
417 | # | ||||
418 | # spent 4.35ms (53µs+4.30) within main::retrieve_flow_duration which was called:
# once (53µs+4.30ms) by main::RUNTIME at line 42 | ||||
419 | 1 | 300ns | my $pipe = shift; | ||
420 | 1 | 4.34ms | 1 | 4.30ms | my $duration = `pmacct -p $pipe -i`; # spent 4.30ms making 1 call to main::CORE:backtick |
421 | 1 | 1µs | chomp($duration); | ||
422 | 1 | 12µs | 1 | 2µs | if ($duration =~ /never/i) { # spent 2µs making 1 call to main::CORE:match |
423 | $duration = 3600; | ||||
424 | } elsif ($duration > 3600) { | ||||
425 | $duration = 3600; | ||||
426 | } | ||||
427 | 1 | 8µs | return $duration; | ||
428 | } | ||||
# spent 22µs within Internals::SvREADONLY which was called 24 times, avg 908ns/call:
# 24 times (22µs+0s) by constant::import at line 136 of constant.pm, avg 908ns/call | |||||
# spent 304µs within UNIVERSAL::VERSION which was called 32 times, avg 10µs/call:
# 11 times (92µs+0s) by strictures::VERSION at line 23 of strictures.pm, avg 8µs/call
# once (19µs+0s) by Search::Elasticsearch::Serializer::JSON::BEGIN@4 at line 4 of /opt/flows/lib/lib/perl5/Search/Elasticsearch/Serializer/JSON.pm
# once (14µs+0s) by Search::Elasticsearch::Cxn::HTTPTiny::BEGIN@8 at line 8 of /opt/flows/lib/lib/perl5/Search/Elasticsearch/Cxn/HTTPTiny.pm
# once (12µs+0s) by Search::Elasticsearch::Role::Client::Direct::BEGIN@7 at line 7 of /opt/flows/lib/lib/perl5/Search/Elasticsearch/Role/Client/Direct.pm
# once (11µs+0s) by Search::Elasticsearch::BEGIN@3 at line 3 of /opt/flows/lib/lib/perl5/Search/Elasticsearch.pm
# once (11µs+0s) by IO::Socket::BEGIN@12 at line 12 of IO/Socket.pm
# once (11µs+0s) by B::Hooks::EndOfScope::BEGIN@23 at line 26 of /opt/flows/lib/lib/perl5/B/Hooks/EndOfScope.pm
# once (11µs+0s) by B::Hooks::EndOfScope::XS::BEGIN@9 at line 9 of /opt/flows/lib/lib/perl5/B/Hooks/EndOfScope/XS.pm
# once (11µs+0s) by B::Hooks::EndOfScope::BEGIN@16 at line 16 of /opt/flows/lib/lib/perl5/B/Hooks/EndOfScope.pm
# once (11µs+0s) by Sub::Exporter::BEGIN@11 at line 11 of /opt/flows/lib/lib/perl5/Sub/Exporter.pm
# once (11µs+0s) by Data::OptList::BEGIN@8 at line 8 of /opt/flows/lib/lib/perl5/Data/OptList.pm
# once (10µs+0s) by Module::Implementation::BEGIN@12 at line 12 of Module/Implementation.pm
# once (10µs+0s) by Encode::BEGIN@12 at line 12 of Encode.pm
# once (10µs+0s) by B::Hooks::EndOfScope::XS::BEGIN@10 at line 13 of /opt/flows/lib/lib/perl5/B/Hooks/EndOfScope/XS.pm
# once (10µs+0s) by Search::Elasticsearch::Logger::LogAny::BEGIN@8 at line 8 of /opt/flows/lib/lib/perl5/Search/Elasticsearch/Logger/LogAny.pm
# once (10µs+0s) by Package::Stash::BEGIN@15 at line 15 of Package/Stash.pm
# once (9µs+0s) by Sub::Exporter::BEGIN@12 at line 12 of /opt/flows/lib/lib/perl5/Sub/Exporter.pm
# once (9µs+0s) by Try::Tiny::BEGIN@12 at line 12 of Try/Tiny.pm
# once (8µs+0s) by Sub::Exporter::BEGIN@13 at line 13 of /opt/flows/lib/lib/perl5/Sub/Exporter.pm
# once (6µs+0s) by Method::Generate::Accessor::BEGIN@11 at line 17 of Method/Generate/Accessor.pm
# once (5µs+0s) by HTTP::Tiny::_agent at line 580 of /opt/flows/lib/lib/perl5/HTTP/Tiny.pm
# once (4µs+0s) by Method::Generate::Accessor::BEGIN@11 at line 21 of Method/Generate/Accessor.pm | |||||
# spent 615µs within UNIVERSAL::can which was called 312 times, avg 2µs/call:
# 64 times (123µs+0s) by Method::Generate::Accessor::_generate_xs at line 624 of Method/Generate/Accessor.pm, avg 2µs/call
# 48 times (62µs+0s) by Role::Tiny::_install_does at line 395 of Role/Tiny.pm, avg 1µs/call
# 29 times (57µs+0s) by Moo::Role::_inhale_if_moose at line 118 of Moo/Role.pm, avg 2µs/call
# 29 times (55µs+0s) by Sub::Exporter::default_generator at line 411 of /opt/flows/lib/lib/perl5/Sub/Exporter.pm, avg 2µs/call
# 18 times (25µs+0s) by Role::Tiny::_check_requires at line 301 of Role/Tiny.pm, avg 1µs/call
# 16 times (42µs+0s) by Role::Tiny::_install_does at line 393 of Role/Tiny.pm, avg 3µs/call
# 14 times (26µs+0s) by Moo::Role::import at line 73 of Moo/Role.pm, avg 2µs/call
# 10 times (44µs+0s) by Method::Generate::Constructor::generate_method at line 98 of Method/Generate/Constructor.pm, avg 4µs/call
# 10 times (33µs+0s) by Method::Generate::Constructor::generate_method at line 89 of Method/Generate/Constructor.pm, avg 3µs/call
# 10 times (29µs+0s) by Method::Generate::Constructor::generate_method at line 104 of Method/Generate/Constructor.pm, avg 3µs/call
# 10 times (24µs+0s) by Moo::_constructor_maker_for at line 160 of Moo.pm, avg 2µs/call
# 10 times (15µs+0s) by Moo::_accessor_maker_for at line 129 of Moo.pm, avg 1µs/call
# 10 times (10µs+0s) by Moo::_constructor_maker_for at line 162 of Moo.pm, avg 950ns/call
# 8 times (9µs+0s) by Role::Tiny::_install_does at line 397 of Role/Tiny.pm, avg 1µs/call
# 4 times (9µs+0s) by Role::Tiny::apply_roles_to_package at line 223 of Role/Tiny.pm, avg 2µs/call
# 4 times (8µs+0s) by Moo::_Utils::_install_modifier at line 31 of Moo/_Utils.pm, avg 2µs/call
# 4 times (6µs+0s) by Class::Method::Modifiers::install_modifier at line 44 of Class/Method/Modifiers.pm, avg 2µs/call
# 3 times (10µs+0s) by Moo::Object::new at line 13 of Moo/Object.pm, avg 3µs/call
# 3 times (9µs+0s) by Moo::Object::new at line 22 of Moo/Object.pm, avg 3µs/call
# 2 times (5µs+0s) by Log::Any::Manager::_require_dynamic at line 181 of /opt/flows/lib/lib/perl5/Log/Any/Manager.pm, avg 3µs/call
# 2 times (4µs+0s) by HTTP::Tiny::Handle::_get_tid at line 1595 of /opt/flows/lib/lib/perl5/HTTP/Tiny.pm, avg 2µs/call
# 2 times (3µs+0s) by Log::Any::Adapter::Util::require_dynamic at line 182 of /opt/flows/lib/lib/perl5/Log/Any/Adapter/Util.pm, avg 2µs/call
# once (4µs+0s) by attributes::import at line 59 of attributes.pm
# once (4µs+0s) by HTTP::Tiny::Handle::timeout at line 1024 of /opt/flows/lib/lib/perl5/HTTP/Tiny.pm | |||||
# spent 31µs within UNIVERSAL::isa which was called 16 times, avg 2µs/call:
# 14 times (28µs+0s) by base::import at line 73 of base.pm, avg 2µs/call
# once (2µs+0s) by File::Path::mkpath at line 67 of File/Path.pm
# once (1µs+0s) by File::Path::mkpath at line 75 of File/Path.pm | |||||
sub main::CORE:backtick; # opcode | |||||
sub main::CORE:ftis; # opcode | |||||
# spent 2µs within main::CORE:match which was called:
# once (2µs+0s) by main::retrieve_flow_duration at line 422 | |||||
# spent 4µs within main::CORE:pack which was called:
# once (4µs+0s) by main::BEGIN@7 at line 315 of IO/Socket.pm | |||||
# spent 337µs within main::CORE:unlink which was called:
# once (337µs+0s) by main::load_storable_file at line 133 | |||||
# spent 7µs within utf8::downgrade which was called 6 times, avg 1µs/call:
# 2 times (3µs+0s) by URI::_generic::path at line 46 of URI/_generic.pm, avg 1µs/call
# 2 times (2µs+0s) by URI::_uric_escape at line 93 of URI.pm, avg 1µs/call
# 2 times (2µs+0s) by HTTP::Tiny::Handle::write at line 1114 of /opt/flows/lib/lib/perl5/HTTP/Tiny.pm, avg 750ns/call | |||||
# spent 1µs within utf8::encode which was called:
# once (1µs+0s) by Search::Elasticsearch::Util::API::Path::path_handler at line 31 of /opt/flows/lib/lib/perl5/Search/Elasticsearch/Util/API/Path.pm | |||||
# spent 5µs within version::(bool which was called:
# once (5µs+0s) by local::lib::BEGIN@11 at line 59 of Config.pm | |||||
# spent 5µs within version::(cmp which was called:
# once (5µs+0s) by local::lib::BEGIN@11 at line 62 of Config.pm |