version 1.1, 2025/06/28 23:54:11
|
version 1.3, 2025/07/01 06:48:03
|
Line 9
|
Line 9
|
# Licensed AGPL-3.0 |
# Licensed AGPL-3.0 |
# |
# |
# $Log$ |
# $Log$ |
|
# Revision 1.3 2025/07/01 06:48:03 snw |
|
# Updates |
|
# |
|
# Revision 1.2 2025/06/30 02:18:44 snw |
|
# Updates |
|
# |
# Revision 1.1 2025/06/28 23:54:11 snw |
# Revision 1.1 2025/06/28 23:54:11 snw |
# Add new OO module |
# Add new OO module |
# |
# |
Line 17
|
Line 23
|
package Pandia; |
package Pandia; |
|
|
use strict; |
use strict; |
#use warnings; |
use warnings; |
|
|
use HTTP::Tiny; |
use HTTP::Tiny; |
use HTML::TreeBuilder; |
use HTML::TreeBuilder; |
Line 28 use Fcntl qw(:flock);
|
Line 34 use Fcntl qw(:flock);
|
use LWP::Simple qw(get); |
use LWP::Simple qw(get); |
use Config::IniFiles; |
use Config::IniFiles; |
use Thread::Pool; |
use Thread::Pool; |
|
use HTTP::Date; |
|
use POSIX qw(strftime); |
|
|
my $indices_waiting : shared; |
my $indices_waiting : shared; |
|
|
sub index { |
sub do_index { |
my ($url, $domain, $dsn, $dbuser, $dbpass) = @_; |
my ($url, $domain, $dsn, $dbuser, $dbpass, $reindex) = @_; |
|
|
|
print "pandia: thread connecting to MySQL database..."; |
|
|
my $dbh = DBI->connect($dsn, $dbuser, $dbpass, {RaiseError => 0, PrintError => 1}); |
my $dbh = DBI->connect($dsn, $dbuser, $dbpass, {RaiseError => 0, PrintError => 1}); |
if(not $dbh) { |
if(not $dbh) { |
print "pandia: failed to connect to MySQL database\n"; |
print "[FAIL]\n"; |
goto nodb_cleanup; |
goto nodb_cleanup; |
} |
} |
|
print "[OK]\n"; |
|
|
my $http = HTTP::Tiny->new(agent => "pandia-crawler/0.0.1", timeout => 60); |
my $http = HTTP::Tiny->new(agent => "pandia-crawler/0.0.1", timeout => 60); |
my $tree = HTML::TreeBuilder->new(); |
my $tree = HTML::TreeBuilder->new(); |
|
my $tries; |
|
|
my $head = $http->head($url); |
my $head; |
if(not $head->{success}) { |
print "pandia: HEAD $url\n"; |
print "pandia: http HEAD failure; skipping $url\n"; |
$head = $http->head($url); |
goto cleanup; |
|
|
if(not $head->{success}) { |
|
print "pandia: HEAD fail $url\n"; |
|
|
|
my $sthh = $dbh->prepare("DELETE FROM crawl_queue WHERE url=?"); |
|
$sthh->execute($url); |
|
$sthh->finish(); |
|
goto nodb_cleanup; |
|
} |
|
else { |
|
print "pandia: HEAD OK $url\n"; |
} |
} |
|
|
|
proc_head: |
my $headers = $head->{headers}; |
my $headers = $head->{headers}; |
my $content_type = $headers->{'content-type'}; |
my $content_type = $headers->{'content-type'}; |
|
my $last_modified; |
|
my $last_modified_sys; |
|
|
|
if ($reindex == 1) { |
|
print "pandia: REINDEX $url\n"; |
|
my $last_modified_t = $headers->{'last-modified'}; |
|
$last_modified_sys = str2time($last_modified_t); |
|
|
|
if($last_modified_sys) { |
|
print "pandia: GET_LAST_INDEX_DT $url\n"; |
|
my $sth = $dbh->prepare("SELECT last_indexed_dt FROM url_fulltext WHERE url=?"); |
|
$sth->execute($url); |
|
print "pandia: GOT_LAST_INDEX_DT $url\n"; |
|
|
|
if($sth->rows < 1) { |
|
print "pandia: page not indexed\n"; |
|
goto nodb_cleanup; |
|
} |
|
|
|
my $hashref = $sth->fetchrow_hashref(); |
|
my $last_indexed = str2time($hashref->{last_indexed_dt}); |
|
|
|
if($last_modified_sys > $last_indexed) { |
|
print "pandia: $url has been modified since the last time it was indexed\n"; |
|
my $sth = $dbh->prepare("DELETE FROM url_fulltext WHERE url=?"); |
|
$sth->execute($url); |
|
print "pandia: INDEXDELETE $url\n"; |
|
} |
|
else { |
|
print "pandia: $url is still up-to-date in the index\n"; |
|
goto cleanup; |
|
} |
|
|
|
} |
|
else { |
|
print "pandia: no modify info; skipping $url\n"; |
|
goto nodb_cleanup; |
|
} |
|
} |
|
else { |
|
print "pandia: INDEX $url\n"; |
|
$last_modified = strftime("%Y-%m-%d %H:%M", localtime); |
|
} |
|
|
my $title = ""; |
my $title = ""; |
my $fulltext = ""; |
my $fulltext = ""; |
my $fullhtml = ""; |
my $fullhtml = ""; |
Line 80 sub index {
|
Line 148 sub index {
|
$title = $tree->look_down('_tag', 'title')->as_text; |
$title = $tree->look_down('_tag', 'title')->as_text; |
$title =~ s/[^\x00-\x7F]//g; |
$title =~ s/[^\x00-\x7F]//g; |
|
|
#print "pandia: processing $url [$title]\n"; |
print "pandia: processing $url [$title]\n"; |
|
|
$fulltext = $tree->as_text; |
$fulltext = $tree->as_text; |
$fulltext =~ s/[^\x00-\x7F]//g; |
$fulltext =~ s/[^\x00-\x7F]//g; |
Line 100 sub index {
|
Line 168 sub index {
|
$sth = $dbh->prepare("INSERT INTO url_fulltext(url, url_domain, page_title, body, body_html) VALUES (?, ?, ?, ?, ?)"); |
$sth = $dbh->prepare("INSERT INTO url_fulltext(url, url_domain, page_title, body, body_html) VALUES (?, ?, ?, ?, ?)"); |
my $tries = 0; |
my $tries = 0; |
while(1) { |
while(1) { |
|
print "pandia: INSERTINDEX $url\n"; |
$sth->execute($url, $domain, $title, $fulltext, $fullhtml); |
$sth->execute($url, $domain, $title, $fulltext, $fullhtml); |
if($DBI::err) { |
if($DBI::err) { |
if($tries > 5) { |
if($tries > 5) { |
Line 122 sub index {
|
Line 191 sub index {
|
|
|
cleanup: |
cleanup: |
my $sthuc = $dbh->prepare("UPDATE crawl_queue SET analyzed=1 WHERE url=?"); |
my $sthuc = $dbh->prepare("UPDATE crawl_queue SET analyzed=1 WHERE url=?"); |
my $tries = 0; |
$tries = 0; |
while(1) { |
while(1) { |
$sthuc->execute($url); |
$sthuc->execute($url); |
if($DBI::err) { |
if($DBI::err) { |
Line 142 sub index {
|
Line 211 sub index {
|
$dbh->disconnect(); |
$dbh->disconnect(); |
|
|
nodb_cleanup: |
nodb_cleanup: |
lock($indices_waiting); |
|
$indices_waiting = $indices_waiting - 1; |
$indices_waiting = $indices_waiting - 1; |
} |
} |
|
|
sub new { |
sub blacklist_add { |
my ($class, $args) = @_; |
my ($self, $domain) = @_; |
|
|
|
print "pandia: connecting to database..."; |
|
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0}); |
|
die "pandia: failed to connect to MySQL database: DBI->errstr()" unless $dbh; |
|
print "[OK]\n"; |
|
|
|
print "pandia: blacklisting domain $domain..."; |
|
my $sth = $dbh->prepare("INSERT INTO blacklist (url_domain) VALUES (?)"); |
|
$sth->execute($domain); |
|
print "[OK]\n"; |
|
|
|
print "pandia: removing blacklisted items from crawl queue..."; |
|
$sth = $dbh->prepare("DELETE crawl_queue FROM crawl_queue JOIN blacklist ON crawl_queue.url_domain=blacklist.url_domain"); |
|
$sth->execute(); |
|
print "[OK]\n"; |
|
|
|
print "pandia: removing blacklisted items from index..."; |
|
$sth = $dbh->prepare("DELETE url_fulltext FROM url_fulltext JOIN blacklist ON url_fulltext.url_domain=blacklist.url_domain"); |
|
$sth->execute(); |
|
print "[OK]\n"; |
|
|
|
$sth->finish(); |
|
$dbh->disconnect(); |
|
} |
|
|
|
sub blacklist_remove { |
|
my ($self, $domain) = @_; |
|
|
|
print "pandia: connecting to database..."; |
|
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0}); |
|
die "pandia: failed to connect to MySQL database: DBI->errstr()" unless $dbh; |
|
print "[OK]\n"; |
|
|
|
my $sth = $dbh->prepare("DELETE FROM blacklist WHERE url_domain=?"); |
|
$sth->execute($domain); |
|
|
|
$sth->finish(); |
|
$dbh->disconnect(); |
|
} |
|
|
|
sub index_serial { |
|
my ($self) = @_; |
|
|
|
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0}); |
|
|
my $cfg = Config::IniFiles->new(-file => "/etc/pandia.ini"); |
my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE analyzed=0"); |
|
$sth->execute(); |
|
|
my $thost = $cfg->val($args->{profile}, 'dbhost'); |
while (my $hashref = $sth->fetchrow_hashref()) { |
my $tname = $cfg->val($args->{profile}, 'dbname'); |
do_index $hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0; |
my $tuser = $cfg->val($args->{profile}, 'dbuser'); |
} |
my $tpass = $cfg->val($args->{profile}, 'dbpass'); |
|
my $tindex_workers = $cfg->val($args->{profile}, 'index_workers'); |
|
my $tcrawl_workers = $cfg->val($args->{profile}, 'crawl_workers'); |
|
|
|
$indices_waiting = $tindex_workers; |
$sth->finish(); |
|
$dbh->disconnect(); |
|
} |
|
|
|
sub index_one { |
|
my ($self, $url) = @_; |
|
|
|
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0}); |
|
|
my $tdsn = "DBI:mysql:database=$tname;host=$thost;port=3306;mysql_connect_timeout=5;"; |
my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE url=? LIMIT 1"); |
|
$sth->execute($url); |
|
|
|
while (my $hashref = $sth->fetchrow_hashref()) { |
|
do_index $url, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0; |
|
} |
|
|
|
$sth->finish(); |
|
$dbh->disconnect(); |
|
} |
|
|
|
sub index_domain { |
|
my ($self, $domain) = @_; |
|
|
|
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0}); |
|
|
my $self = bless { |
my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE url_domain=?"); |
profile => $args->{profile}, |
$sth->execute($domain); |
dbhost => $thost, |
|
dbname => $tname, |
while (my $hashref = $sth->fetchrow_hashref()) { |
dbuser => $tuser, |
do_index $hashref->{url}, $domain, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0; |
dbpass => $tpass, |
} |
dsn => $tdsn, |
|
index_workers => $tindex_workers, |
$sth->finish(); |
crawl_workers => $tcrawl_workers, |
$dbh->disconnect(); |
index_pool => Thread::Pool->new( |
|
{ |
|
workers => $tindex_workers, |
|
do => \&index |
|
} |
|
) |
|
}, $class; |
|
|
|
return $self; |
|
} |
} |
|
|
sub run_index_batch { |
sub run_index_batch { |
Line 193 sub run_index_batch {
|
Line 316 sub run_index_batch {
|
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0}); |
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0}); |
|
|
my $sth = $dbh->prepare("SELECT * FROM crawl_queue WHERE analyzed=0 LIMIT ?"); |
my $sth = $dbh->prepare("SELECT * FROM crawl_queue WHERE analyzed=0 LIMIT ?"); |
$sth->execute($self->{index_workers} * 4); |
$sth->execute($self->{index_workers}); |
|
|
$indices_waiting = $sth->rows; |
$indices_waiting = $sth->rows; |
|
|
Line 206 sub run_index_batch {
|
Line 329 sub run_index_batch {
|
while (my $hashref = $sth->fetchrow_hashref()) { |
while (my $hashref = $sth->fetchrow_hashref()) { |
$tmpi = $tmpi + 1; |
$tmpi = $tmpi + 1; |
print "pandia: sending $hashref->{url} to worker thread\n"; |
print "pandia: sending $hashref->{url} to worker thread\n"; |
$self->{index_pool}->job($hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}); |
$self->{index_pool}->job($hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0); |
} |
} |
|
|
|
print "pandia: $indices_waiting total pages to be processed\n"; |
|
|
|
done: |
|
$sth->finish(); |
|
$dbh->disconnect(); |
|
|
|
my $start_time = time(); |
|
while($indices_waiting > 0) { |
|
my $end_time = time(); |
|
my $time_diff = $end_time - $start_time; |
|
|
|
if($time_diff > 60) { |
|
print "pandia: timing out\n"; |
|
last; |
|
} |
|
print "pandia: $indices_waiting URLs still in-process [$time_diff seconds elapsed]\n"; |
|
sleep(10); |
|
} |
|
$self->{index_pool}->shutdown; |
|
} |
|
|
|
sub run_reindex_batch { |
|
my ($self) = @_; |
|
|
|
my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0}); |
|
|
|
my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE analyzed=1 ORDER BY RAND() LIMIT ?"); |
|
$sth->execute($self->{index_workers}); |
|
|
|
$indices_waiting = $sth->rows; |
|
|
|
if($indices_waiting == 0) { |
|
print "pandia: nothing to reindex\n"; |
|
goto done; |
|
} |
|
|
|
my $tmpi = 0; |
|
while (my $hashref = $sth->fetchrow_hashref()) { |
|
$tmpi = $tmpi + 1; |
|
print "pandia: sending $hashref->{url} to worker thread\n"; |
|
$self->{index_pool}->job($hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 1); |
|
} |
|
|
print "pandia: $indices_waiting total pages to be processed\n"; |
print "pandia: $indices_waiting total pages to be processed\n"; |
|
|
done: |
done: |
$sth->finish(); |
$sth->finish(); |
$dbh->disconnect(); |
$dbh->disconnect(); |
|
|
Line 222 done:
|
Line 386 done:
|
my $end_time = time(); |
my $end_time = time(); |
my $time_diff = $end_time - $start_time; |
my $time_diff = $end_time - $start_time; |
|
|
if($time_diff > $indices_waiting * 20) { |
if($time_diff > 60) { |
print "pandia: timing out\n"; |
print "pandia: timing out\n"; |
last; |
last; |
} |
} |
Line 230 done:
|
Line 394 done:
|
sleep(10); |
sleep(10); |
} |
} |
$self->{index_pool}->shutdown; |
$self->{index_pool}->shutdown; |
|
|
} |
} |
|
|
|
sub new { |
|
my ($class, $args) = @_; |
|
|
|
my $cfg = Config::IniFiles->new(-file => "/etc/pandia.ini"); |
|
|
|
my $thost = $cfg->val($args->{profile}, 'dbhost'); |
|
my $tname = $cfg->val($args->{profile}, 'dbname'); |
|
my $tuser = $cfg->val($args->{profile}, 'dbuser'); |
|
my $tpass = $cfg->val($args->{profile}, 'dbpass'); |
|
my $tindex_workers = $cfg->val($args->{profile}, 'index_workers'); |
|
my $tcrawl_workers = $cfg->val($args->{profile}, 'crawl_workers'); |
|
|
|
$indices_waiting = $tindex_workers; |
|
|
|
my $tdsn = "DBI:mysql:database=$tname;host=$thost;port=3306;mysql_connect_timeout=5;"; |
|
|
|
my $self = bless { |
|
profile => $args->{profile}, |
|
dbhost => $thost, |
|
dbname => $tname, |
|
dbuser => $tuser, |
|
dbpass => $tpass, |
|
dsn => $tdsn, |
|
index_workers => $tindex_workers, |
|
crawl_workers => $tcrawl_workers, |
|
index_pool => Thread::Pool->new( |
|
{ |
|
workers => $tindex_workers, |
|
do => \&do_index |
|
} |
|
) |
|
}, $class; |
|
|
|
return $self; |
|
} |
|
|
|
|
1; |
1; |
|
|