use DBI;
use POSIX;
+use DBD::SQLite;
use Commods;
$VERSION = 1.00;
@ISA = qw(Exporter);
@EXPORT = qw(&db_setocean &db_writer &db_connect $dbh
- &db_filename &db_doall &db_onconflict &db_setdatadir);
+ &db_filename &db_doall &db_onconflict
+ &dbr_filename &dbr_connect &db_connect_core
+ &dumptab_head &dumptab_row_hashref
+ &db_chkcommit &db_check_referential_integrity);
%EXPORT_TAGS = ( );
@EXPORT_OK = qw();
}
+sub dbr_filename ($$) {
+ my ($datadir,$oceanname) = @_;
+ return "$datadir/OCEAN-$oceanname.db";
+}
+sub dbr_connect ($$) {
+ my ($datadir,$ocean) = @_;
+ return db_connect_core(dbr_filename($datadir,$ocean));
+}
+
+sub db_connect_core ($) {
+ my ($fn)= @_;
+ my $opts = { AutoCommit=>0,
+ RaiseError=>1, ShowErrorStatement=>1,
+ sqlite_unicode=>1 };
+
+ # DBI now wants to start a transaction whenever we even say
+ # SELECT. But this doesn't work if the DB is readonly. We can
+ # work around this by setting autocommit, in which case there is
+ # no need for a transaction for read-only db commands. Autocommit
+ # is (obviously) safe with readonly operations. But callers in
+ # yarrg do not specify to us whether they intend to write. So we
+ # decide, by looking at the file mode. And as belt-and-braces we
+ # set sqlite's own readonly flag as well.
+ # http://stackoverflow.com/questions/30082008/attempt-to-write-a-readonly-database-but-im-not
+ # http://stackoverflow.com/questions/35208727/can-sqlite-db-files-be-made-read-only
+ # http://cpansearch.perl.org/src/ISHIGAKI/DBD-SQLite-1.39/Changes
+ # (see entry for 1.38_01)
+ # http://stackoverflow.com/questions/17793672/perl-dbi-treats-setting-sqlite-db-cache-size-as-a-write-operation-when-subclassi
+ # https://rt.cpan.org/Public/Bug/Display.html?id=56444#
+ my $readonly =
+ (access $fn, POSIX::W_OK) ? 0 :
+ ($! == EACCES) ? 1 :
+ ($! == ENOENT) ? 0 :
+ die "$fn access(,W_OK) $!";
+ if ($readonly) {
+ $opts->{sqlite_open_flags} = DBD::SQLite::OPEN_READONLY;
+ $opts->{AutoCommit}=1;
+ }
+
+ my $h= DBI->connect("dbi:SQLite:$fn",'','',$opts)
+ or die "$fn $DBI::errstr ?";
+ return $h;
+ # default timeout is 30s which is plenty
+}
+
our $dbfn;
our $dbh;
-our $datadir= '.';
-sub db_setdatadir ($) {
- $datadir= $_[0];
-}
sub db_setocean ($) {
my ($oceanname) = @_;
- $dbfn= "$datadir/OCEAN-$oceanname.db";
+ $dbfn= dbr_filename('.',$oceanname);
}
sub db_filename () {
return $dbfn;
}
sub db_connect () {
- return if $dbh;
- $dbh= DBI->connect("dbi:SQLite:$dbfn",'','',
- { AutoCommit=>0,
- RaiseError=>1, ShowErrorStatement=>1,
- unicode=>1 })
- or die "$dbfn $DBI::errstr ?";
- # default timeout is 30s which is plenty
+ $dbh= db_connect_core($dbfn);
}
sub db_doall ($) {
}
}
+#---------- table dump helper ----------
+
+sub dumptab_head ($$$) {
+ my ($fh,$w,$cols) = @_;
+ printf $fh "|%-${w}s", $_ foreach @$cols; print $fh "|\n";
+ print $fh "+",('-'x$w) foreach @$cols; print $fh "+\n";
+}
+
+sub dumptab_row_hashref ($$$$) {
+ my ($fh,$w,$cols,$row) = @_;
+ printf $fh "|%-$w.${w}s",
+ (defined $row->{$_} ? $row->{$_} : 'NULL')
+ foreach @$cols;
+ print $fh "\n";
+}
+
+#---------- referential integrity constraints ----------
+
+# SQLite doesn't support foreign key constraints so we do it by steam:
+
+sub nooutput ($) {
+ my ($stmts) = @_;
+ my $ekindcount= 0;
+ my $letxt= '';
+ foreach my $stmt (split /\;/, $stmts) {
+ next unless $stmt =~ /\S/;
+
+ my $etxt= '';
+ $stmt =~ s/^([ \t]*\#.*)$/ $etxt .= $1."\n"; ''; /mge;
+ $etxt= $letxt unless length $etxt;
+ $letxt= $etxt;
+
+ $stmt =~ s/^\s+//; $stmt =~ s/\s+$//;
+ my $sth= $dbh->prepare($stmt);
+ $sth->execute();
+ my $row;
+ my $ecount= 0;
+ my @cols= @{ $sth->{NAME_lc} };
+ my $w= 11;
+ while ($row= $sth->fetchrow_hashref) {
+ if (!$ecount++) {
+ print STDERR "REFERENTIAL INTEGRITY ERROR\n";
+ print STDERR "\n$etxt\n $stmt\n\n";
+ dumptab_head(\*STDERR,$w,\@cols);
+ }
+ if ($ecount>5) { print STDERR "...\n"; last; }
+ dumptab_row_hashref(\*STDERR,$w,\@cols,$row);
+ }
+ next unless $ecount;
+
+ $ekindcount++;
+ print STDERR "\n\n";
+ }
+ die "REFERENTIAL INTEGRITY ERRORS $ekindcount\n"
+ if $ekindcount;
+}
+
+sub db_check_referential_integrity ($) {
+ my ($full) = @_;
+ # non-full is done only for market data updates; it avoids
+ # detecting errors which are essentially missing metadata and
+ # old schemas, etc.
+
+ foreach my $bs (qw(buy sell)) {
+ nooutput(<<END);
+
+ # Every buy/sell must refer to an entry in commods, islands, and stalls:
+ SELECT * FROM $bs LEFT JOIN commods USING (commodid) WHERE commodname IS NULL;
+ SELECT * FROM $bs LEFT JOIN islands USING (islandid) WHERE islandname IS NULL;
+ SELECT * FROM $bs LEFT JOIN stalls USING (stallid, islandid)
+ WHERE stallname IS NULL;
+
+ # Every buy/sell must be part of an upload:
+ SELECT * FROM $bs LEFT JOIN uploads USING (islandid) WHERE timestamp IS NULL;
+
+ # The islandid in stalls must be the same as the islandid in buy/sell:
+ SELECT * FROM $bs JOIN stalls USING (stallid)
+ WHERE $bs.islandid != stalls.islandid;
+
+END
+ }
+
+ nooutput(<<END);
+
+ # Every stall and upload must refer to an island:
+ SELECT * FROM stalls LEFT JOIN islands USING (islandid)
+ WHERE islandname IS NULL;
+ SELECT * FROM uploads LEFT JOIN islands USING (islandid)
+ WHERE islandname IS NULL;
+
+END
+ if ($full) {
+ foreach my $end (qw(aiid biid)) {
+ foreach my $tab (qw(dists routes)) {
+ nooutput(<<END);
+
+ # Every row in dists and routes must refer to two existing rows in islands:
+ SELECT * FROM $tab d LEFT JOIN islands ON d.$end=islandid
+ WHERE islandname IS NULL;
+
+END
+ }
+ }
+ nooutput(<<END);
+
+ # Every pair of islands must have an entry in dists:
+ SELECT * FROM islands ia JOIN islands ib LEFT JOIN dists
+ ON ia.islandid=aiid and ib.islandid=biid
+ WHERE dist IS NULL;
+
+ # Every commod must refers to a commodclass and vice versa:
+ SELECT * FROM commods LEFT JOIN commodclasses USING (commodclassid)
+ WHERE commodclass IS NULL;
+ SELECT * FROM commodclasses LEFT JOIN commods USING (commodclassid)
+ WHERE commodname IS NULL;
+
+ # Ordvals which are not commodclass ordvals are unique:
+ SELECT ordval,count(*),commodname,commodid,posinclass
+ FROM commods
+ WHERE posinclass > 0
+ GROUP BY ordval
+ HAVING count(*) > 1;
+
+ # For every class, posinclass is dense from 1 to maxposinclass,
+ # apart from the commods for which it is zero.
+ SELECT commodclass,commodclassid,posinclass,count(*)
+ FROM commods JOIN commodclasses USING (commodclassid)
+ WHERE posinclass > 0
+ GROUP BY commodclassid,posinclass
+ HAVING count(*) > 1;
+ SELECT commodclass,commodclassid,count(*)
+ FROM commods JOIN commodclasses USING (commodclassid)
+ WHERE posinclass > 0
+ GROUP BY commodclassid
+ HAVING count(*) != maxposinclass;
+ SELECT *
+ FROM commods JOIN commodclasses USING (commodclassid)
+ WHERE posinclass < 0 OR posinclass > maxposinclass;
+
+END
+ }
+}
+
+sub db_chkcommit ($) {
+ my ($full) = @_;
+ db_check_referential_integrity($full);
+ $dbh->commit();
+}
+
1;