File:  [Coherent Logic Development] / freem / src / transact.c
Revision 1.6: download - view: text, annotated - select for diffs
Mon Mar 24 04:13:12 2025 UTC (8 days, 19 hours ago) by snw
Branches: MAIN
CVS tags: v0-62-3, HEAD
Replace action macro dat with fra_dat to avoid symbol conflict on OS/2

/*
 *   $Id: transact.c,v 1.6 2025/03/24 04:13:12 snw Exp $
 *    FreeM transaction processing support
 *
 *  
 *   Author: Serena Willis <snw@coherent-logic.com>
 *    Copyright (C) 1998 MUG Deutschland
 *    Copyright (C) 2020, 2025 Coherent Logic Development LLC
 *
 *
 *   This file is part of FreeM.
 *
 *   FreeM is free software: you can redistribute it and/or modify
 *   it under the terms of the GNU Affero Public License as published by
 *   the Free Software Foundation, either version 3 of the License, or
 *   (at your option) any later version.
 *
 *   FreeM is distributed in the hope that it will be useful,
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *   GNU Affero Public License for more details.
 *
 *   You should have received a copy of the GNU Affero Public License
 *   along with FreeM.  If not, see <https://www.gnu.org/licenses/>.
 *
 *   $Log: transact.c,v $
 *   Revision 1.6  2025/03/24 04:13:12  snw
 *   Replace action macro dat with fra_dat to avoid symbol conflict on OS/2
 *
 *   Revision 1.5  2025/03/24 02:54:47  snw
 *   Transaction compat fixes for OS/2
 *
 *   Revision 1.4  2025/03/09 19:50:47  snw
 *   Second phase of REUSE compliance and header reformat
 *
 *
 * SPDX-FileCopyrightText:  (C) 2025 Coherent Logic Development LLC
 * SPDX-License-Identifier: AGPL-3.0-or-later
 **/

#include <string.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>

#include "mpsdef.h"
#include "transact.h"
#include "iftab.h"
#include "journal.h"
#include "shmmgr.h"
#include "mref.h"
#include "tp_check.h"

#define FALSE   0
#define TRUE    1

#if !defined(__OpenBSD__) && !defined(__APPLE__) && !defined(__OS2__)
union semun {
    int              val;    /* Value for SETVAL */
    struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
    unsigned short  *array;  /* Array for GETALL, SETALL */
    struct seminfo  *__buf;  /* Buffer for IPC_INFO
                                (Linux-specific) */
};
#endif

void m_log (int, const char *);

int semid_tp;
int tp_committing = FALSE;
int tp_level = 0;
tp_transaction transactions[TP_MAX_NEST];


void tp_init(void)
{
    union semun arg;
    char err[255];
    key_t tp_sk;

    tp_sk = ftok (config_file, 4);
    
    if (first_process) {

        semid_tp = semget (tp_sk, 1, 0666 | IPC_CREAT);
        if (semid_tp == -1) {
            fprintf (stderr, "tp_init:  failed to create transaction processing semaphore [errno %d]\r\n", errno);
            exit (1);
        }

        arg.val = 1;
        if (semctl (semid_tp, 0, SETVAL, arg) == -1) {
            fprintf (stderr, "tp_init:  failed to initialize transaction processing semaphore\r\n");
            exit (1);
        }

    }
    else {
        
        semid_tp = semget (tp_sk, 1, 0);
        if (semid_tp == -1) {
            fprintf (stderr, "tp_init:  could not attach to transaction processing semaphore [errno %d]\r\n", errno);
            exit (1);
        }

    }

    return;

}

short tp_get_sem(void)
{
    int tries;
    struct sembuf s = {0, -1, 0};

    char msgbuf[100];

    snprintf (msgbuf, 99, "tp_get_sem:  process %d attempting to acquire transaction processing semaphore", pid);
    m_log (1, msgbuf);
    
    
    /* our process already owns the semaphore */
    if (shm_config->hdr->tp_owner == pid) {

        snprintf (msgbuf, 99, "tp_get_sem:  process %d increments transaction processing semaphore counter", pid);
        m_log (1, msgbuf);
    
        
        if (first_process == TRUE) {
            fprintf (stderr, "tp_get_sem:  daemon process increments critical section counter\r\n");
        }


        shm_config->hdr->tp_semctr++;

        return TRUE;
    }

    if (first_process == TRUE) {
        fprintf (stderr, "tp_get_sem:  daemon process enters critical section\r\n");
    }

    
    for (tries = 0; tries < 10; tries++) {

        if (semop (semid_tp, &s, 1) != -1) {
            shm_config->hdr->tp_owner = pid;
            shm_config->hdr->tp_semctr = 1;

            snprintf (msgbuf, 99, "tp_get_sem:  process %d takes transaction processing semaphore", pid);
            m_log (1, msgbuf);
    
            
            if (first_process == TRUE) {
                fprintf (stderr, "tp_get_sem:  daemon process takes transaction processing semaphore\r\n");
            }

            return TRUE;
        }

        snprintf (msgbuf, 99, "tp_get_sem:  process %d attempting to acquire transaction processing semaphore (tries = %d)", pid, tries);
        m_log (1, msgbuf);
    

        sleep (1);

    }

    return FALSE;
    
}

void tp_release_sem(void)
{

    char msgbuf[100];

    if (shm_config->hdr->tp_semctr == 1) {

        struct sembuf s = {0, 1, 0};

        if (first_process == TRUE) {
            fprintf (stderr, "tp_release_sem:  daemon process leaves critical section\r\n");
        }

        
        shm_config->hdr->tp_semctr = 0;
        shm_config->hdr->tp_owner = 0;

        if (first_process == TRUE) {
            fprintf (stderr, "tp_release_sem:  daemon process relinquishes transaction processing semaphore\r\n");
        }


        snprintf (msgbuf, 99, "tp_get_sem:  process %d releases transaction processing semaphore", pid);
        m_log (1, msgbuf);

        
        semop (semid_tp, &s, 1);
        
    }
    else {

        if (first_process == TRUE) {
            fprintf (stderr, "tp_release_sem:  daemon process decrements critical section counter\r\n");
        }
        
        snprintf (msgbuf, 99, "tp_get_sem:  process %d decrements transaction processing semaphore counter", pid);
        m_log (1, msgbuf);
        
        shm_config->hdr->tp_semctr--;
    }

    
}

int tp_tstart(char *tp_id, short serial, short restartable, char **sym_save)
{
    if (tp_level == TP_MAX_NEST) {
        char m[256];

        snprintf (m, 256, "Attempt to exceed TP_MAX_NEST. Transaction aborted.\r\n\201");
        write_m (m);

        return FALSE;
    }

    if (((serial == TRUE) && (tp_get_sem () == TRUE)) ||
        (serial == FALSE)) {

        tp_level++;
        
        jnl_ent_write (JNLA_TSTART, "", "");
    
        strcpy (transactions[tp_level].tp_id, tp_id);
        
        transactions[tp_level].serial = serial;
        transactions[tp_level].restartable = restartable;
        
        transactions[tp_level].opcount = 0;
      
        return TRUE;
        
    }
    else {
        fprintf (stderr, "tp_tstart:  could not get transaction processing semaphore\r\n");
        exit (1);
    }
    

}

int tp_add_op(short islock, short action, char *key, char *data)
{
    int oc;
    freem_ref_t *gr;

    gr = (freem_ref_t *) malloc (sizeof (freem_ref_t));
    NULLPTRCHK(gr,"tp_add_op");
    
    mref_init (gr, MREF_RT_GLOBAL, "");
    internal_to_mref (gr, key);
    
    if (transactions[tp_level].opcount == TP_MAX_OPS) {
        char m[256];

        snprintf (m, 256, "Attempt to exceed TP_MAX_OPS at transaction level %d. Rolling back.\r\n\201", tp_level);
        write_m (m);

        free (gr);

        tp_trollback (1);
        tp_cleanup (1);

        if (transactions[tp_level].serial == TRUE) {
            tp_release_sem();
        }
        
        return FALSE;
    }
    
    /* update transaction-in-flight symbol table */
    switch (action) {

        case lock_inc:
        case lock_dec:
        case lock_old:
        case set_sym:
            iftab_insert (action, key, data, tp_level);
            break;

        case kill_sym:
            iftab_kill (key);
            break;
            
    }

    if (transactions[tp_level].serial == TRUE) {
        /* mark the global for checkpointing */
        cptab_insert (tp_level, gr->name);
    }

    free (gr);

    transactions[tp_level].opcount = transactions[tp_level].opcount + 1;

    oc = transactions[tp_level].opcount;

    transactions[tp_level].ops[oc].is_lock = islock;
    transactions[tp_level].ops[oc].action = action;

    stcpy ((char *) &transactions[tp_level].ops[oc].key, key);
    stcpy ((char *) &transactions[tp_level].ops[oc].data, data);
       
 
    return TRUE;
}

int tp_tcommit(void)
{
    register int i;
    short is_serial = transactions[tp_level].serial;
    
    tp_committing = TRUE;

    if (is_serial) {
        /* checkpoint all globals involved in transaction */
        cptab_precommit (tp_level);
    }
    
    for(i = 1; i <= transactions[tp_level].opcount; i++) {
        
        if (transactions[tp_level].ops[i].is_lock == FALSE) {
            global (transactions[tp_level].ops[i].action, transactions[tp_level].ops[i].key, transactions[tp_level].ops[i].data);

            if (merr () > OK) goto commit_error;
                
        }
        
    }

    jnl_ent_write (JNLA_TCOMMIT, "\201", "\201");

    if (is_serial) {
        cptab_postcommit (tp_level);
    }

    goto commit_done;
    
commit_error:
    
    tp_trollback (1);

commit_done:

    tp_cleanup (1);
    
    tp_committing = FALSE;

    if (is_serial) {
        tp_release_sem ();
    }
    
    return TRUE;
}

int tp_cleanup(int levels)
{
    register int i;

    for (i = tp_level; i >= (((tp_level - levels) >= 0) ? tp_level - levels : 0); i--) {
        iftab_pop_tlevel (i);
    }

    tp_level = ((tp_level - levels) >= 0) ? tp_level - levels : 0;

    return TRUE;
}

int tp_trollback(int levels)
{
    register int i;
    register int j;

//    for (i = 0; i < levels; i++) {
    for (i = tp_level; i >= (((tp_level - levels) >= 0) ? tp_level - levels : 0); i--) {
        
        for (j = 1; j <= transactions[i].opcount; j++) {
            
            if (transactions[i].ops[j].is_lock == TRUE) {
                locktab_decrement (transactions[i].ops[j].key, -1);
            }
            
        }

        if (transactions[i].serial == TRUE) {
            cptab_rollback (i);
        }
        
    }


    return TRUE;
}

int tp_trestart(void)
{
    return 0;
}

void tp_tdump(void)
{

    int i, j;

    char tkey[256];
    char tdata[256];
    char tact[256];
    
    set_io (UNIX);

    if (tp_level == 0) {
        printf("No transaction is active.\n");

        return;
    }

    for(i = 1; i <= tp_level; i++) {

        if(i == tp_level) {
            printf(" $TLEVEL %d*\n", i);
        }
        else {
            printf(" $TLEVEL %d\n", i);
        }

        printf("  Operations for Transaction ID: %s [%s%s]\n",
               transactions[i].tp_id,
               ((transactions[i].restartable == TRUE) ? "RESTARTABLE" : "NON-RESTARTABLE"),
               ((transactions[i].serial == TRUE) ? " SERIAL" : " BATCH"));

        printf ("\n   %-10s%-15s%-15s\n", "OP. NO.", "ACTION", "KEY/DATA");
        printf ("   %-10s%-15s%-15s\n", "-------", "------", "--------");
        
        
        for(j = 1; j <= transactions[i].opcount; j++) {          
            stcpy (tkey, transactions[i].ops[j].key);
            stcnv_m2c (tkey);
            stcpy (tdata, transactions[i].ops[j].data);
            stcnv_m2c (tdata);

            tp_get_op_name (tact, transactions[i].ops[j].action);

            if (transactions[i].ops[j].action == set_sym) {
                printf ("   %-10d%-15s%s=%s\n", j, tact, tkey, tdata);
            }
            else {
                printf ("   %-10d%-15s%s\n", j, tact, tkey); 
            }
            
        }

        cptab_dump (i);
        
    }


    set_io (MUMPS);
}

void tp_get_op_name(char *buf, const short action)
{
    switch (action) {
        
        case set_sym:
            strcpy (buf, "SET");
            break;

        case killone:
        case kill_sym:
        case kill_all:
        case killexcl:
            strcpy (buf, "KILL");
            break;

        case new_sym:
        case new_all:
        case newexcl:
            strcpy (buf, "NEW");
            break;

        case get_sym:
            strcpy (buf, "GET");
            break;

        case fra_dat:
            strcpy (buf, "$DATA");
            break;

        case fra_order:
            strcpy (buf, "$ORDER");
            break;

        case fra_query:
        case bigquery:
            strcpy (buf, "$QUERY");
            break;

        case getinc:
            strcpy (buf, "$INCREMENT");
            break;

        case getnext:
            strcpy (buf, "$NEXT");
            break;

        case lock_inc:
            strcpy (buf, "LOCK (INCR)");
            break;

        case lock_old:
            strcpy (buf, "LOCK (TRAD)");
            break;
            
    }
            
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>