#!/usr/bin/env perl # # $Id: Pandia.pm,v 1.2 2025/06/30 02:18:44 snw Exp $ # Copyright (C) 2025 Coherent Logic Development LLC # # Author: Serena Willis # # Licensed AGPL-3.0 # # $Log: Pandia.pm,v $ # Revision 1.2 2025/06/30 02:18:44 snw # Updates # # Revision 1.1 2025/06/28 23:54:11 snw # Add new OO module # # package Pandia; use strict; use warnings; use HTTP::Tiny; use HTML::TreeBuilder; use URI; use DBI; use WWW::RobotRules; use Fcntl qw(:flock); use LWP::Simple qw(get); use Config::IniFiles; use Thread::Pool; use HTTP::Date; use POSIX qw(strftime); my $indices_waiting : shared; sub index { my ($url, $domain, $dsn, $dbuser, $dbpass, $reindex) = @_; print "pandia: thread connecting to MySQL database..."; my $dbh = DBI->connect($dsn, $dbuser, $dbpass, {RaiseError => 0, PrintError => 1}); if(not $dbh) { print "[FAIL]\n"; goto nodb_cleanup; } print "[OK]\n"; my $http = HTTP::Tiny->new(agent => "pandia-crawler/0.0.1", timeout => 60); my $tree = HTML::TreeBuilder->new(); my $tries; my $head; print "pandia: HEAD $url\n"; $head = $http->head($url); if(not $head->{success}) { print "pandia: HEAD fail $url\n"; goto nodb_cleanup; } else { print "pandia: HEAD OK $url\n"; } proc_head: my $headers = $head->{headers}; my $content_type = $headers->{'content-type'}; my $last_modified; my $last_modified_sys; if ($reindex == 1) { print "pandia: REINDEX $url\n"; my $last_modified_t = $headers->{'last-modified'}; $last_modified_sys = str2time($last_modified_t); if($last_modified_sys) { print "pandia: GET_LAST_INDEX_DT $url\n"; my $sth = $dbh->prepare("SELECT last_indexed_dt FROM url_fulltext WHERE url=?"); $sth->execute($url); print "pandia: GOT_LAST_INDEX_DT $url\n"; if($sth->rows < 1) { print "pandia: page not indexed\n"; goto nodb_cleanup; } my $hashref = $sth->fetchrow_hashref(); my $last_indexed = str2time($hashref->{last_indexed_dt}); if($last_modified_sys > $last_indexed) { print "pandia: $url has been modified since the last time it was indexed\n"; my $sth = $dbh->prepare("DELETE FROM url_fulltext WHERE url=?"); $sth->execute($url); print "pandia: INDEXDELETE $url\n"; } else { print "pandia: $url is still up-to-date in the index\n"; goto cleanup; } } else { print "pandia: no modify info; skipping $url\n"; goto nodb_cleanup; } } else { print "pandia: INDEX $url\n"; $last_modified = strftime("%Y-%m-%d %H:%M", localtime); } my $title = ""; my $fulltext = ""; my $fullhtml = ""; if($content_type ne 'text/plain' && substr($content_type, 0, 9) ne 'text/html') { print "pandia: content type $content_type not indexable; skipping $url\n"; my $sth = $dbh->prepare("DELETE FROM crawl_queue WHERE url=?"); $sth->execute($url); $sth->finish(); $dbh->disconnect(); goto nodb_cleanup; } my $response = $http->get($url); if(not $response->{success}) { print "pandia: http failure; skipping $url\n"; my $sth = $dbh->prepare("DELETE FROM crawl_queue WHERE url=?"); $sth->execute($url); $sth->finish(); $dbh->disconnect(); goto nodb_cleanup; } my $pagedata = $response->{content}; if($response) { $tree->parse($pagedata); $title = $tree->look_down('_tag', 'title')->as_text; $title =~ s/[^\x00-\x7F]//g; print "pandia: processing $url [$title]\n"; $fulltext = $tree->as_text; $fulltext =~ s/[^\x00-\x7F]//g; $fullhtml = $tree->as_HTML; $fullhtml =~ s/[^\x00-\x7F]//g; my $sth = $dbh->prepare("SELECT url FROM url_fulltext WHERE url=?"); $sth->execute($url); if($sth->rows > 0) { print "pandia: we already have the full text of $url recorded\n"; $sth->finish(); goto cleanup; } $sth = $dbh->prepare("INSERT INTO url_fulltext(url, url_domain, page_title, body, body_html) VALUES (?, ?, ?, ?, ?)"); my $tries = 0; while(1) { print "pandia: INSERTINDEX $url\n"; $sth->execute($url, $domain, $title, $fulltext, $fullhtml); if($DBI::err) { if($tries > 5) { print "pandia: giving up inserting fulltext on $url\n"; last; } $tries = $tries + 1; print "pandia: error inserting fulltext on $url; retrying\n"; next; } else { last; } } $sth->finish(); } print "pandia: $url has been processed\n"; cleanup: my $sthuc = $dbh->prepare("UPDATE crawl_queue SET analyzed=1 WHERE url=?"); $tries = 0; while(1) { $sthuc->execute($url); if($DBI::err) { $tries = $tries + 1; if($tries > 2) { print "pandia: giving up updating crawl_queue for $url\n"; last; } print "pandia: DBI deadlock; retrying crawl queue update\n"; next; } else { last; } } $sthuc->finish(); $dbh->disconnect(); nodb_cleanup: $indices_waiting = $indices_waiting - 1; } sub new { my ($class, $args) = @_; my $cfg = Config::IniFiles->new(-file => "/etc/pandia.ini"); my $thost = $cfg->val($args->{profile}, 'dbhost'); my $tname = $cfg->val($args->{profile}, 'dbname'); my $tuser = $cfg->val($args->{profile}, 'dbuser'); my $tpass = $cfg->val($args->{profile}, 'dbpass'); my $tindex_workers = $cfg->val($args->{profile}, 'index_workers'); my $tcrawl_workers = $cfg->val($args->{profile}, 'crawl_workers'); $indices_waiting = $tindex_workers; my $tdsn = "DBI:mysql:database=$tname;host=$thost;port=3306;mysql_connect_timeout=5;"; my $self = bless { profile => $args->{profile}, dbhost => $thost, dbname => $tname, dbuser => $tuser, dbpass => $tpass, dsn => $tdsn, index_workers => $tindex_workers, crawl_workers => $tcrawl_workers, index_pool => Thread::Pool->new( { workers => $tindex_workers, do => \&index } ) }, $class; return $self; } sub run_index_batch { my ($self) = @_; # open my $file, ">", "/tmp/pandia_indexer.lock" or die $!; # flock $file, LOCK_EX|LOCK_NB or die "Unable to lock file $!"; print "pandia: creating $self->{index_workers} indexer threads\n"; my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0}); my $sth = $dbh->prepare("SELECT * FROM crawl_queue WHERE analyzed=0 LIMIT ?"); $sth->execute($self->{index_workers}); $indices_waiting = $sth->rows; if($indices_waiting == 0) { print "pandia: nothing to index\n"; goto done; } my $tmpi = 0; while (my $hashref = $sth->fetchrow_hashref()) { $tmpi = $tmpi + 1; print "pandia: sending $hashref->{url} to worker thread\n"; $self->{index_pool}->job($hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0); } print "pandia: $indices_waiting total pages to be processed\n"; done: $sth->finish(); $dbh->disconnect(); my $start_time = time(); while($indices_waiting > 0) { my $end_time = time(); my $time_diff = $end_time - $start_time; if($time_diff > 60) { print "pandia: timing out\n"; last; } print "pandia: $indices_waiting URLs still in-process [$time_diff seconds elapsed]\n"; sleep(10); } $self->{index_pool}->shutdown; } sub run_reindex_batch { my ($self) = @_; my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0}); my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE analyzed=1 ORDER BY RAND() LIMIT ?"); $sth->execute($self->{index_workers}); $indices_waiting = $sth->rows; if($indices_waiting == 0) { print "pandia: nothing to reindex\n"; goto done; } my $tmpi = 0; while (my $hashref = $sth->fetchrow_hashref()) { $tmpi = $tmpi + 1; print "pandia: sending $hashref->{url} to worker thread\n"; $self->{index_pool}->job($hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 1); } print "pandia: $indices_waiting total pages to be processed\n"; done: $sth->finish(); $dbh->disconnect(); my $start_time = time(); while($indices_waiting > 0) { my $end_time = time(); my $time_diff = $end_time - $start_time; if($time_diff > 60) { print "pandia: timing out\n"; last; } print "pandia: $indices_waiting URLs still in-process [$time_diff seconds elapsed]\n"; sleep(10); } $self->{index_pool}->shutdown; } 1;