File:  [Coherent Logic Development] / pandia / Pandia.pm
Revision 1.4: download - view: text, annotated - select for diffs
Tue Jul 1 19:20:47 2025 UTC (2 weeks, 5 days ago) by snw
Branches: MAIN
CVS tags: HEAD
Stop crawling and indexing URLs containing page fragments

#!/usr/bin/env perl

# 
# $Id: Pandia.pm,v 1.4 2025/07/01 19:20:47 snw Exp $
#  Copyright (C) 2025 Coherent Logic Development LLC
#
# Author: Serena Willis <snw@coherent-logic.com>
#
# Licensed AGPL-3.0
#
# $Log: Pandia.pm,v $
# Revision 1.4  2025/07/01 19:20:47  snw
# Stop crawling and indexing URLs containing page fragments
#
# Revision 1.3  2025/07/01 06:48:03  snw
# Updates
#
# Revision 1.2  2025/06/30 02:18:44  snw
# Updates
#
# Revision 1.1  2025/06/28 23:54:11  snw
# Add new OO module
#
#

package Pandia;

use strict;
use warnings;

use HTTP::Tiny;
use HTML::TreeBuilder;
use URI;
use DBI;
use WWW::RobotRules;
use Fcntl qw(:flock);
use LWP::Simple qw(get);
use Config::IniFiles;
use Thread::Pool;
use HTTP::Date;
use POSIX qw(strftime);

my $indices_waiting : shared;

sub do_index {
    my ($url, $domain, $dsn, $dbuser, $dbpass, $reindex) = @_;

    if (index($url, '#') != -1) {
        print "pandia:  URL contains a fragment; skipping\n";
        return;
    }
    
    print "pandia:  thread connecting to MySQL database...";
    
    my $dbh = DBI->connect($dsn, $dbuser, $dbpass, {RaiseError => 0, PrintError => 1});
    if(not $dbh) {
        print "[FAIL]\n";
        goto nodb_cleanup;
    }
    print "[OK]\n";
    
    my $http = HTTP::Tiny->new(agent => "pandia-crawler/0.0.1", timeout => 60);
    my $tree = HTML::TreeBuilder->new();
    my $tries;
    
    my $head;
    print "pandia:  HEAD $url\n";
    $head = $http->head($url);

    if(not $head->{success}) {              
        print "pandia:  HEAD fail $url\n";

        my $sthh = $dbh->prepare("DELETE FROM crawl_queue WHERE url=?");
        $sthh->execute($url);
        $sthh->finish();
        goto nodb_cleanup;
    }
    else {
        print "pandia:  HEAD OK $url\n";
    }

  proc_head:
    my $headers = $head->{headers};
    my $content_type = $headers->{'content-type'};
    my $last_modified;
    my $last_modified_sys;

    if ($reindex == 1) {
        print "pandia:  REINDEX $url\n";
        my $last_modified_t = $headers->{'last-modified'};
        $last_modified_sys = str2time($last_modified_t);

        if($last_modified_sys) {
            print "pandia:  GET_LAST_INDEX_DT $url\n";
            my $sth = $dbh->prepare("SELECT last_indexed_dt FROM url_fulltext WHERE url=?");
            $sth->execute($url);
            print "pandia:  GOT_LAST_INDEX_DT $url\n";

            if($sth->rows < 1) {
                print "pandia:  page not indexed\n";
                goto nodb_cleanup;
            }

            my $hashref = $sth->fetchrow_hashref();
            my $last_indexed = str2time($hashref->{last_indexed_dt});

            if($last_modified_sys > $last_indexed) {
                print "pandia:  $url has been modified since the last time it was indexed\n";
                my $sth = $dbh->prepare("DELETE FROM url_fulltext WHERE url=?");
                $sth->execute($url);
                print "pandia:  INDEXDELETE $url\n";
            }
            else {
                print "pandia:  $url is still up-to-date in the index\n";
                goto cleanup;
            }

        }
        else {
            print "pandia:  no modify info; skipping $url\n";
            goto nodb_cleanup;
        }
    }
    else {
        print "pandia:  INDEX $url\n";
        $last_modified = strftime("%Y-%m-%d %H:%M", localtime);
    }
    
    my $title = "";
    my $fulltext = "";
    my $fullhtml = "";
    
    if($content_type ne 'text/plain' && substr($content_type, 0, 9) ne 'text/html') {
        print "pandia:  content type $content_type not indexable; skipping $url\n";
        my $sth = $dbh->prepare("DELETE FROM crawl_queue WHERE url=?");
        $sth->execute($url);
        $sth->finish();
        $dbh->disconnect();
        goto nodb_cleanup;
    }
    
    my $response = $http->get($url);
    
    if(not $response->{success}) {
        print "pandia:  http failure; skipping $url\n";
        my $sth = $dbh->prepare("DELETE FROM crawl_queue WHERE url=?");
        $sth->execute($url);
        $sth->finish();
        $dbh->disconnect();
        goto nodb_cleanup;
    }
    
    my $pagedata = $response->{content};    
    if($response) {
        $tree->parse($pagedata);   
        $title = $tree->look_down('_tag', 'title')->as_text;
        $title =~ s/[^\x00-\x7F]//g;
        
        print "pandia:  processing $url [$title]\n";
        
        $fulltext = $tree->as_text;
        $fulltext =~ s/[^\x00-\x7F]//g;

        $fullhtml = $tree->as_HTML;
        $fullhtml =~ s/[^\x00-\x7F]//g;

        my $sth = $dbh->prepare("SELECT url FROM url_fulltext WHERE url=?");
        $sth->execute($url);

        if($sth->rows > 0) {
            print "pandia:  we already have the full text of $url recorded\n";
            $sth->finish();
            goto cleanup;
        }
        
        $sth = $dbh->prepare("INSERT INTO url_fulltext(url, url_domain, page_title, body, body_html) VALUES (?, ?, ?, ?, ?)");
        my $tries = 0;
        while(1) {
            print "pandia:  INSERTINDEX $url\n";
            $sth->execute($url, $domain, $title, $fulltext, $fullhtml);
            if($DBI::err) {
                if($tries > 5) {
                    print "pandia:  giving up inserting fulltext on $url\n";
                    last;
                }
                $tries = $tries + 1;
                print "pandia:  error inserting fulltext on $url; retrying\n";
                next;
            }
            else {
                last;
            }
        }
        $sth->finish();        
    }
    
    print "pandia:  $url has been processed\n";
    

  cleanup:
    my $sthuc = $dbh->prepare("UPDATE crawl_queue SET analyzed=1 WHERE url=?");
    $tries = 0;
    while(1) {
        $sthuc->execute($url);
        if($DBI::err) {
            $tries = $tries + 1;
            if($tries > 2) {
                print "pandia:  giving up updating crawl_queue for $url\n";
                last;
            }
            print "pandia:  DBI deadlock; retrying crawl queue update\n";           
            next;
        }
        else {
            last;
        }
    }
    $sthuc->finish();
    $dbh->disconnect();

  nodb_cleanup:
    $indices_waiting = $indices_waiting - 1;
}

sub blacklist_add {
    my ($self, $domain) = @_;

    print "pandia:  connecting to database...";
    my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
    die "pandia:  failed to connect to MySQL database: DBI->errstr()" unless $dbh;
    print "[OK]\n";

    print "pandia:  blacklisting domain $domain...";
    my $sth = $dbh->prepare("INSERT INTO blacklist (url_domain) VALUES (?)");
    $sth->execute($domain);
    print "[OK]\n";
    
    print "pandia:  removing blacklisted items from crawl queue...";
    $sth = $dbh->prepare("DELETE crawl_queue FROM crawl_queue JOIN blacklist ON crawl_queue.url_domain=blacklist.url_domain");
    $sth->execute();
    print "[OK]\n";
    
    print "pandia:  removing blacklisted items from index...";
    $sth = $dbh->prepare("DELETE url_fulltext FROM url_fulltext JOIN blacklist ON url_fulltext.url_domain=blacklist.url_domain");
    $sth->execute();
    print "[OK]\n";

    $sth->finish();
    $dbh->disconnect();
}

sub blacklist_remove {
    my ($self, $domain) = @_;

    print "pandia:  connecting to database...";
    my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
    die "pandia:  failed to connect to MySQL database: DBI->errstr()" unless $dbh;
    print "[OK]\n";

    my $sth = $dbh->prepare("DELETE FROM blacklist WHERE url_domain=?");
    $sth->execute($domain);

    $sth->finish();
    $dbh->disconnect();        
}

sub index_serial {
    my ($self) = @_;

    my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
    
    my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE analyzed=0");
    $sth->execute();

    while (my $hashref = $sth->fetchrow_hashref()) {
        do_index $hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0;
    }

    $sth->finish();
    $dbh->disconnect();
}

sub index_one {
    my ($self, $url) = @_;

    my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
    
    my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE url=? LIMIT 1");
    $sth->execute($url);

    while (my $hashref = $sth->fetchrow_hashref()) {
        do_index $url, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0;
    }

    $sth->finish();
    $dbh->disconnect();
}

sub index_domain {
    my ($self, $domain) = @_;

    my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
    
    my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE url_domain=?");
    $sth->execute($domain);

    while (my $hashref = $sth->fetchrow_hashref()) {
        do_index $hashref->{url}, $domain, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0;
    }

    $sth->finish();
    $dbh->disconnect();

}

sub run_index_batch {    
    my ($self) = @_;

#    open my $file, ">", "/tmp/pandia_indexer.lock" or die $!; 
#    flock $file, LOCK_EX|LOCK_NB or die "Unable to lock file $!";

    print "pandia:  creating $self->{index_workers} indexer threads\n";

    my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
    
    my $sth = $dbh->prepare("SELECT * FROM crawl_queue WHERE analyzed=0 LIMIT ?");
    $sth->execute($self->{index_workers});   
    
    $indices_waiting = $sth->rows;
    
    if($indices_waiting == 0) {
        print "pandia:  nothing to index\n";
        goto done;
    }

    my $tmpi = 0;
    while (my $hashref = $sth->fetchrow_hashref()) {
        $tmpi = $tmpi + 1;
        print "pandia:  sending $hashref->{url} to worker thread\n";
        $self->{index_pool}->job($hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 0);
    }

    print "pandia:  $indices_waiting total pages to be processed\n";

done:        
    $sth->finish();    
    $dbh->disconnect();

    my $start_time = time();
    while($indices_waiting > 0) {
        my $end_time = time();
        my $time_diff = $end_time - $start_time;

        if($time_diff > 60) {
            print "pandia:  timing out\n";
            last;
        }
        print "pandia:  $indices_waiting URLs still in-process [$time_diff seconds elapsed]\n";
        sleep(10);
    }
    $self->{index_pool}->shutdown;
}

sub run_reindex_batch {
    my ($self) = @_;

    my $dbh = DBI->connect($self->{dsn}, $self->{dbuser}, $self->{dbpass}, {RaiseError => 1, PrintError => 0});
    
    my $sth = $dbh->prepare("SELECT url, url_domain FROM crawl_queue WHERE analyzed=1 ORDER BY RAND() LIMIT ?");
    $sth->execute($self->{index_workers});   

    $indices_waiting = $sth->rows;
    
    if($indices_waiting == 0) {
        print "pandia:  nothing to reindex\n";
        goto done;
    }

    my $tmpi = 0;    
    while (my $hashref = $sth->fetchrow_hashref()) {
        $tmpi = $tmpi + 1;
        print "pandia:  sending $hashref->{url} to worker thread\n";
        $self->{index_pool}->job($hashref->{url}, $hashref->{url_domain}, $self->{dsn}, $self->{dbuser}, $self->{dbpass}, 1);
    }

    print "pandia:  $indices_waiting total pages to be processed\n";

  done:        
    $sth->finish();    
    $dbh->disconnect();

    my $start_time = time();
    while($indices_waiting > 0) {
        my $end_time = time();
        my $time_diff = $end_time - $start_time;

        if($time_diff > 60) {
            print "pandia:  timing out\n";
            last;
        }
        print "pandia:  $indices_waiting URLs still in-process [$time_diff seconds elapsed]\n";
        sleep(10);
    }
    $self->{index_pool}->shutdown;
        
}

sub new {
    my ($class, $args) = @_;
    
    my $cfg = Config::IniFiles->new(-file => "/etc/pandia.ini");

    my $thost = $cfg->val($args->{profile}, 'dbhost');
    my $tname = $cfg->val($args->{profile}, 'dbname');
    my $tuser = $cfg->val($args->{profile}, 'dbuser');
    my $tpass = $cfg->val($args->{profile}, 'dbpass');
    my $tindex_workers = $cfg->val($args->{profile}, 'index_workers');
    my $tcrawl_workers = $cfg->val($args->{profile}, 'crawl_workers');

    $indices_waiting = $tindex_workers;
    
    my $tdsn = "DBI:mysql:database=$tname;host=$thost;port=3306;mysql_connect_timeout=5;";
    
    my $self = bless {
        profile => $args->{profile},
        dbhost => $thost,
        dbname => $tname,
        dbuser => $tuser,
        dbpass => $tpass,
        dsn => $tdsn,
        index_workers => $tindex_workers,
        crawl_workers => $tcrawl_workers,
        index_pool => Thread::Pool->new(
            {
                workers => $tindex_workers,
                do => \&do_index
            }
            )
    }, $class;

    return $self;
}


1;


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>