#!/usr/bin/perl use strict; use Search::Elasticsearch; use JSON; use Storable qw(lock_retrieve lock_store nstore store_fd nstore_fd freeze thaw dclone store retrieve); use LockFile::Simple qw(lock trylock unlock); use Data::Dumper; use POSIX qw(strftime); use Try::Tiny; my $statefile = '/opt/pmacct/cache/es.stor'; my $lockfile = '/opt/pmacct/cache/flows_to_es'; lock($lockfile) || die "Cannot lock $lockfile"; unless (-e $statefile) { ts($statefile) } my $state = lock_retrieve($statefile); my $duration = time - $state->{lastupdated}; $state->{lastupdated} = time; # do shit my $es = Search::Elasticsearch->new( nodes => [ '127.0.0.1:9200' ]); my $esdate = strftime("%Y-%m-%d", localtime); my $timestamp = strftime("%Y-%m-%d %H:%M:%S.000", localtime); if (!($es->indices->exists(index => "flow-$esdate"))) { try { $es->indices->create( index => "flow-$esdate", body => { mappings => { 'flowdata' => { '_timestamp' => { 'enabled' => 'true', 'store' => 'true', 'format' => 'date_hour_minute_second_fraction', 'index' => 'analyzed' }, 'properties' => { 'stats' => { 'type' => 'object', 'properties' => { 'ip_src' => { 'enabled' => 'true', 'store' => 'true', 'type' => 'ip', 'index' => 'analyzed', }, 'ip_dst' => { 'enabled' => 'true', 'store' => 'true', 'type' => 'ip', 'index' => 'analyzed', }, }, }, }, } } }); } catch { print "$_\n"; print "Create mapping for index flow-$esdate failed\n"; unlock($lockfile); exit(1); }; } my @flows = retrieve_flows("/tmp/nfacctd-full.pipe"); foreach my $line (@flows) { chomp $line; my $obj; try { $obj = decode_json($line); } catch { print "Error, Malformed JSON\n"; next; }; my $bps = (($obj->{bytes} * 8) / $duration); my $pps = (($obj->{packets}) / $duration); # print "$timestamp - $bps - $pps \n"; # print Dumper($obj); try { $es->index(index => "flow-$esdate", type => 'flowdata', body => { '_timestamp' => $timestamp, 'bps' => $bps, 'pps' => $pps, 'stats' => $obj }); }; } clear_flows("/tmp/nfacctd-full.pipe"); # lock_store($state, $statefile); unlock($lockfile); sub ts { my $file = shift; my $x = {}; lock_store($x, $file) or die "Can't create $file: $!"; exit; }; sub retrieve_flows { my $pipe = shift; my @flows = `/usr/local/bin/pmacct -p $pipe -l -O json -s`; return @flows; } sub clear_flows { my $pipe = shift; my $flows = `/usr/local/bin/pmacct -l -p $pipe -e` }