#!/usr/bin/env perl
#
# $Id: Pandia.pm,v 1.4 2025/07/01 19:20:47 snw Exp $
# Copyright (C) 2025 Coherent Logic Development LLC
#
# Author: Serena Willis <snw@coherent-logic.com>
#
# Licensed AGPL-3.0
#
# $Log: Pandia.pm,v $
# Revision 1.4 2025/07/01 19:20:47 snw
# Stop crawling and indexing URLs containing page fragments
#
# Revision 1.3 2025/07/01 06:48:03 snw
# Updates
#
# Revision 1.2 2025/06/30 02:18:44 snw
# Updates
#
# Revision 1.1 2025/06/28 23:54:11 snw
# Add new OO module
#
#
package Pandia;
use strict;
use warnings;
use HTTP::Tiny;
use HTML::TreeBuilder;
use URI;
use DBI;
use WWW::RobotRules;
use Fcntl qw(:flock);
use LWP::Simple qw(get);
use Config::IniFiles;
use Thread::Pool;
use HTTP::Date;
use POSIX qw(strftime);
my $indices_waiting : shared;
sub do_index {
my ($url, $domain, $dsn, $dbuser, $dbpass, $reindex) = @_;
if (index($url, '#') != -1) {
print "pandia: URL contains a fragment; skipping\n";
return;
}
print "pandia: thread connecting to MySQL database...";
my $dbh = DBI->connect($dsn, $dbuser, $dbpass, {RaiseError => 0, PrintError => 1});
if(not $dbh) {
print "[FAIL]\n";
goto nodb_cleanup;
}
print "[OK]\n";
my $http = HTTP::Tiny->new(agent => "pandia-crawler/0.0.1", timeout => 60);
my $tree = HTML::TreeBuilder->new();
my $tries;
my $head;
print "pandia: HEAD $url\n";
$head = $http->head($url);
if(not $head->{success}) {
print "pandia: HEAD fail $url\n";
my $sthh = $dbh->prepare("DELETE FROM crawl_queue WHERE url=?");
$sthh->execute($url);
$sthh->finish();
goto nodb_cleanup;
}
else {
print "pandia: HEAD OK $url\n";
}
proc_head:
my $headers = $head->{headers};
my $content_type = $headers->{'content-type'};
my $last_modified;
my $last_modified_sys;
if ($reindex == 1) {
print "pandia: REINDEX $url\n";
my $last_modified_t = $headers->{'last-modified'};
$last_modified_sys = str2time($last_modified_t);
if($last_modified_sys) {
print "pandia: GET_LAST_INDEX_DT $url\n";
my $sth = $dbh->prepare("SELECT last_indexed_dt FROM url_fulltext WHERE url=?");
$sth->execute($url);
print "pandia: GOT_LAST_INDEX_DT $url\n";
if($sth->rows < 1) {
print "pandia: page not indexed\n";
goto nodb_cleanup;
}
my $hashref = $sth->fetchrow_hashref();
my $last_indexed = str2time($hashref->{last_indexed_dt});
if($last_modified_sys > $last_indexed) {
print "pandia: $url has been modified since the last time it was indexed\n";
my $sth = $dbh->prepare("DELETE FROM url_fulltext WHERE url=?");
$sth->execute($url);
print "pandia: INDEXDELETE $url\n";
}
else {
print "pandia: $url is still up-to-date in the index\n";
goto cleanup;
}
}
else {
print "pandia: no modify info; skipping $url\n";
goto nodb_cleanup;
}
}
else {
print "pandia: INDEX $url\n";
$last_modified = strftime("%Y-%m-%d %H:%M", localtime);
}
my $title = "";
my $fulltext = "";
my $fullhtml = "";
if($content_type ne 'text/plain' && substr($content_type, 0, 9) ne 'text/html') {
print "pandia: content type $content_type not indexable; skipping $url\n";
my $sth = $dbh->prepare("DELETE FROM crawl_queue WHERE url=?");
$sth->execute($url);
$sth->finish();
$dbh->disconnect();
goto nodb_cleanup;
}
my $response = $http->get($url);
if(not $response->{success}) {
print "pandia: http failure; skipping $url\n";
my $sth = $dbh->prepare("DELETE FROM crawl_queue WHERE url=?");
$sth->execute($url);
$sth->finish();
$dbh->disconnect();
goto nodb_cleanup;
}
my $pagedata = $response->{content};
if($response) {
$tree->parse($pagedata);
$title = $tree->look_down('_tag', 'title')->as_text;
$title =~ s/[^\x00-\x7F]//g;
print "pandia: processing $url [$title]\n";
$fulltext = $tree->as_text;
$fulltext =~ s/[^\x00-\x7F]//g;
$fullhtml = $tree->as_HTML;
$fullhtml =~ s/[^\x00-\x7F]//g;
my $sth = $dbh->prepare("SELECT url FROM url_fulltext WHERE url=?");
$sth->execute($url);
if($sth->rows > 0) {
print "pandia: we already have the full text of $url recorded\n";
$sth->finish();
goto cleanup;
}
$sth = $dbh->prepare("INSERT INTO url_fulltext(url, url_domain, page_title, body, body_html) VALUES (?, ?, ?, ?, ?)");
my $tries = 0;
while(1) {
print "pandia: INSERTINDEX $url\n";
$sth->execute($url, $domain, $title, $fulltext, $fullhtml);
if($DBI::err) {
if($tries > 5) {
print "pandia: giving up inserting fulltext on $url\n";
last;
}
$tries = $tries + 1;
print "pandia: error inserting fulltext on $url; retrying\n";
next;
}
else {
last;
}
}
$sth->finish();
}
print "pandia: $url has been processed\n";
cleanup:
my $sthuc = $dbh->prepare("UPDATE crawl_queue SET analyzed=1 WHERE url=?");
$tries = 0;
while(1) {
$sthuc->execute($url);
if($DBI::err) {
$tries = $tries + 1;
if($tries > 2) {
print "pandia: giving up updating crawl_queue for $url\n";
last;
}
print "pandia: DBI deadlock; retrying crawl queue update\n";
next;
}
else {
last;
}
}
$sthuc->finish();
$dbh->disconnect();
nodb_cleanup:
$indices_waiting = $indices_waiting - 1;
}
sub blacklist_add {
my ($self, $domain) = @_;
print "pandia: connecting to database...";
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
die "pandia: failed to connect to MySQL database: DBI->errstr()" unless $dbh;
print "[OK]\n";
print "pandia: blacklisting domain $domain...";
my $sth = $dbh->prepare("INSERT INTO blacklist (url_domain) VALUES (?)");
$sth->execute($domain);
print "[OK]\n";
print "pandia: removing blacklisted items from crawl queue...";
$sth = $dbh->prepare("DELETE crawl_queue FROM crawl_queue JOIN blacklist ON crawl_queue.url_domain=blacklist.url_domain");
$sth->execute();
print "[OK]\n";
print "pandia: removing blacklisted items from index...";
$sth = $dbh->prepare("DELETE url_fulltext FROM url_fulltext JOIN blacklist ON url_fulltext.url_domain=blacklist.url_domain");
$sth->execute();
print "[OK]\n";
$sth->finish();
$dbh->disconnect();
}
sub blacklist_remove {
my ($self, $domain) = @_;
print "pandia: connecting to database...";
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
die "pandia: failed to connect to MySQL database: DBI->errstr()" unless $dbh;
print "[OK]\n";
my $sth = $dbh->prepare("DELETE FROM blacklist WHERE url_domain=?");
$sth->execute($domain);
$sth->finish();
$dbh->disconnect();
}
sub index_serial {
my ($self) = @_;
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE analyzed=0");
$sth->execute();
while (my $hashref = $sth->fetchrow_hashref()) {
do_index $hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0;
}
$sth->finish();
$dbh->disconnect();
}
sub index_one {
my ($self, $url) = @_;
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE url=? LIMIT 1");
$sth->execute($url);
while (my $hashref = $sth->fetchrow_hashref()) {
do_index $url, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0;
}
$sth->finish();
$dbh->disconnect();
}
sub index_domain {
my ($self, $domain) = @_;
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE url_domain=?");
$sth->execute($domain);
while (my $hashref = $sth->fetchrow_hashref()) {
do_index $hashref->{url}, $domain, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0;
}
$sth->finish();
$dbh->disconnect();
}
sub run_index_batch {
my ($self) = @_;
# open my $file, ">", "/tmp/pandia_indexer.lock" or die $!;
# flock $file, LOCK_EX|LOCK_NB or die "Unable to lock file $!";
print "pandia: creating $self->{index_workers} indexer threads\n";
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
my $sth = $dbh->prepare("SELECT * FROM crawl_queue WHERE analyzed=0 LIMIT ?");
$sth->execute($self->{index_workers});
$indices_waiting = $sth->rows;
if($indices_waiting == 0) {
print "pandia: nothing to index\n";
goto done;
}
my $tmpi = 0;
while (my $hashref = $sth->fetchrow_hashref()) {
$tmpi = $tmpi + 1;
print "pandia: sending $hashref->{url} to worker thread\n";
$self->{index_pool}->job($hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0);
}
print "pandia: $indices_waiting total pages to be processed\n";
done:
$sth->finish();
$dbh->disconnect();
my $start_time = time();
while($indices_waiting > 0) {
my $end_time = time();
my $time_diff = $end_time - $start_time;
if($time_diff > 60) {
print "pandia: timing out\n";
last;
}
print "pandia: $indices_waiting URLs still in-process [$time_diff seconds elapsed]\n";
sleep(10);
}
$self->{index_pool}->shutdown;
}
sub run_reindex_batch {
my ($self) = @_;
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE analyzed=1 ORDER BY RAND() LIMIT ?");
$sth->execute($self->{index_workers});
$indices_waiting = $sth->rows;
if($indices_waiting == 0) {
print "pandia: nothing to reindex\n";
goto done;
}
my $tmpi = 0;
while (my $hashref = $sth->fetchrow_hashref()) {
$tmpi = $tmpi + 1;
print "pandia: sending $hashref->{url} to worker thread\n";
$self->{index_pool}->job($hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 1);
}
print "pandia: $indices_waiting total pages to be processed\n";
done:
$sth->finish();
$dbh->disconnect();
my $start_time = time();
while($indices_waiting > 0) {
my $end_time = time();
my $time_diff = $end_time - $start_time;
if($time_diff > 60) {
print "pandia: timing out\n";
last;
}
print "pandia: $indices_waiting URLs still in-process [$time_diff seconds elapsed]\n";
sleep(10);
}
$self->{index_pool}->shutdown;
}
sub new {
my ($class, $args) = @_;
my $cfg = Config::IniFiles->new(-file => "/etc/pandia.ini");
my $thost = $cfg->val($args->{profile}, 'dbhost');
my $tname = $cfg->val($args->{profile}, 'dbname');
my $tuser = $cfg->val($args->{profile}, 'dbuser');
my $tpass = $cfg->val($args->{profile}, 'dbpass');
my $tindex_workers = $cfg->val($args->{profile}, 'index_workers');
my $tcrawl_workers = $cfg->val($args->{profile}, 'crawl_workers');
$indices_waiting = $tindex_workers;
my $tdsn = "DBI:mysql:database=$tname;host=$thost;port=3306;mysql_connect_timeout=5;";
my $self = bless {
profile => $args->{profile},
dbhost => $thost,
dbname => $tname,
dbuser => $tuser,
dbpass => $tpass,
dsn => $tdsn,
index_workers => $tindex_workers,
crawl_workers => $tcrawl_workers,
index_pool => Thread::Pool->new(
{
workers => $tindex_workers,
do => \&do_index
}
)
}, $class;
return $self;
}
1;
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>