consolidate_logs.pl 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. #!/usr/bin/env perl
  2. use strict;
  3. use warnings;
  4. use FindBin::libs;
  5. use Trog::SQLite;
  6. use POSIX ":sys_wait_h";
  7. use Time::HiRes qw{usleep};
  8. # Every recorded request is fully finished, so we can treat them as such.
  9. my $cons_dbh = Trog::SQLite::dbh( 'schema/log.schema', "logs/consolidated.db" );
  10. opendir(my $dh, "logs/db");
  11. my @pids;
  12. foreach my $db (readdir($dh)) {
  13. next unless $db =~ m/\.db$/;
  14. die "AAAGH" unless -f "logs/db/$db";
  15. my $dbh = Trog::SQLite::dbh( 'schema/log.schema', "logs/db/$db" );
  16. my $pid = fork();
  17. if (!$pid) {
  18. do_row_migration($dbh);
  19. exit 0;
  20. }
  21. push(@pids, $pid);
  22. }
  23. while (@pids) {
  24. my $pid = shift(@pids);
  25. my $status = waitpid($pid, WNOHANG);
  26. push(@pids, $pid) if $status == 0;
  27. usleep(100);
  28. }
  29. sub do_row_migration {
  30. my ($dbh) = @_;
  31. my $query = "select * from all_requests";
  32. my $sth = $dbh->prepare($query);
  33. $sth->execute();
  34. while (my @rows = @{ $sth->fetchall_arrayref({}, 100000) || [] }) {
  35. my @bind = sort keys(%{$rows[0]});
  36. my @rows_bulk = map { my $subj = $_; map { $subj->{$_} } @bind } @rows;
  37. Trog::SQLite::bulk_insert($cons_dbh, 'all_requests', \@bind, 'IGNORE', @rows_bulk);
  38. # Now that we've migrated the rows from the per-fork DBs, murder these rows
  39. my $binder = join(',', (map { '?' } @rows));
  40. $dbh->do("DELETE FROM requests WHERE uuid IN ($binder)", undef, map { $_->{uuid} } @rows);
  41. }
  42. }