/*
* $Id: transact.c,v 1.6 2025/03/24 04:13:12 snw Exp $
* FreeM transaction processing support
*
*
* Author: Serena Willis <snw@coherent-logic.com>
* Copyright (C) 1998 MUG Deutschland
* Copyright (C) 2020, 2025 Coherent Logic Development LLC
*
*
* This file is part of FreeM.
*
* FreeM is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* FreeM is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero Public License for more details.
*
* You should have received a copy of the GNU Affero Public License
* along with FreeM. If not, see <https://www.gnu.org/licenses/>.
*
* $Log: transact.c,v $
* Revision 1.6 2025/03/24 04:13:12 snw
* Replace action macro dat with fra_dat to avoid symbol conflict on OS/2
*
* Revision 1.5 2025/03/24 02:54:47 snw
* Transaction compat fixes for OS/2
*
* Revision 1.4 2025/03/09 19:50:47 snw
* Second phase of REUSE compliance and header reformat
*
*
* SPDX-FileCopyrightText: (C) 2025 Coherent Logic Development LLC
* SPDX-License-Identifier: AGPL-3.0-or-later
**/
#include <string.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include "mpsdef.h"
#include "transact.h"
#include "iftab.h"
#include "journal.h"
#include "shmmgr.h"
#include "mref.h"
#include "tp_check.h"
#define FALSE 0
#define TRUE 1
#if !defined(__OpenBSD__) && !defined(__APPLE__) && !defined(__OS2__)
union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO
(Linux-specific) */
};
#endif
void m_log (int, const char *);
int semid_tp;
int tp_committing = FALSE;
int tp_level = 0;
tp_transaction transactions[TP_MAX_NEST];
void tp_init(void)
{
union semun arg;
char err[255];
key_t tp_sk;
tp_sk = ftok (config_file, 4);
if (first_process) {
semid_tp = semget (tp_sk, 1, 0666 | IPC_CREAT);
if (semid_tp == -1) {
fprintf (stderr, "tp_init: failed to create transaction processing semaphore [errno %d]\r\n", errno);
exit (1);
}
arg.val = 1;
if (semctl (semid_tp, 0, SETVAL, arg) == -1) {
fprintf (stderr, "tp_init: failed to initialize transaction processing semaphore\r\n");
exit (1);
}
}
else {
semid_tp = semget (tp_sk, 1, 0);
if (semid_tp == -1) {
fprintf (stderr, "tp_init: could not attach to transaction processing semaphore [errno %d]\r\n", errno);
exit (1);
}
}
return;
}
short tp_get_sem(void)
{
int tries;
struct sembuf s = {0, -1, 0};
char msgbuf[100];
snprintf (msgbuf, 99, "tp_get_sem: process %d attempting to acquire transaction processing semaphore", pid);
m_log (1, msgbuf);
/* our process already owns the semaphore */
if (shm_config->hdr->tp_owner == pid) {
snprintf (msgbuf, 99, "tp_get_sem: process %d increments transaction processing semaphore counter", pid);
m_log (1, msgbuf);
if (first_process == TRUE) {
fprintf (stderr, "tp_get_sem: daemon process increments critical section counter\r\n");
}
shm_config->hdr->tp_semctr++;
return TRUE;
}
if (first_process == TRUE) {
fprintf (stderr, "tp_get_sem: daemon process enters critical section\r\n");
}
for (tries = 0; tries < 10; tries++) {
if (semop (semid_tp, &s, 1) != -1) {
shm_config->hdr->tp_owner = pid;
shm_config->hdr->tp_semctr = 1;
snprintf (msgbuf, 99, "tp_get_sem: process %d takes transaction processing semaphore", pid);
m_log (1, msgbuf);
if (first_process == TRUE) {
fprintf (stderr, "tp_get_sem: daemon process takes transaction processing semaphore\r\n");
}
return TRUE;
}
snprintf (msgbuf, 99, "tp_get_sem: process %d attempting to acquire transaction processing semaphore (tries = %d)", pid, tries);
m_log (1, msgbuf);
sleep (1);
}
return FALSE;
}
void tp_release_sem(void)
{
char msgbuf[100];
if (shm_config->hdr->tp_semctr == 1) {
struct sembuf s = {0, 1, 0};
if (first_process == TRUE) {
fprintf (stderr, "tp_release_sem: daemon process leaves critical section\r\n");
}
shm_config->hdr->tp_semctr = 0;
shm_config->hdr->tp_owner = 0;
if (first_process == TRUE) {
fprintf (stderr, "tp_release_sem: daemon process relinquishes transaction processing semaphore\r\n");
}
snprintf (msgbuf, 99, "tp_get_sem: process %d releases transaction processing semaphore", pid);
m_log (1, msgbuf);
semop (semid_tp, &s, 1);
}
else {
if (first_process == TRUE) {
fprintf (stderr, "tp_release_sem: daemon process decrements critical section counter\r\n");
}
snprintf (msgbuf, 99, "tp_get_sem: process %d decrements transaction processing semaphore counter", pid);
m_log (1, msgbuf);
shm_config->hdr->tp_semctr--;
}
}
int tp_tstart(char *tp_id, short serial, short restartable, char **sym_save)
{
if (tp_level == TP_MAX_NEST) {
char m[256];
snprintf (m, 256, "Attempt to exceed TP_MAX_NEST. Transaction aborted.\r\n\201");
write_m (m);
return FALSE;
}
if (((serial == TRUE) && (tp_get_sem () == TRUE)) ||
(serial == FALSE)) {
tp_level++;
jnl_ent_write (JNLA_TSTART, "", "");
strcpy (transactions[tp_level].tp_id, tp_id);
transactions[tp_level].serial = serial;
transactions[tp_level].restartable = restartable;
transactions[tp_level].opcount = 0;
return TRUE;
}
else {
fprintf (stderr, "tp_tstart: could not get transaction processing semaphore\r\n");
exit (1);
}
}
int tp_add_op(short islock, short action, char *key, char *data)
{
int oc;
freem_ref_t *gr;
gr = (freem_ref_t *) malloc (sizeof (freem_ref_t));
NULLPTRCHK(gr,"tp_add_op");
mref_init (gr, MREF_RT_GLOBAL, "");
internal_to_mref (gr, key);
if (transactions[tp_level].opcount == TP_MAX_OPS) {
char m[256];
snprintf (m, 256, "Attempt to exceed TP_MAX_OPS at transaction level %d. Rolling back.\r\n\201", tp_level);
write_m (m);
free (gr);
tp_trollback (1);
tp_cleanup (1);
if (transactions[tp_level].serial == TRUE) {
tp_release_sem();
}
return FALSE;
}
/* update transaction-in-flight symbol table */
switch (action) {
case lock_inc:
case lock_dec:
case lock_old:
case set_sym:
iftab_insert (action, key, data, tp_level);
break;
case kill_sym:
iftab_kill (key);
break;
}
if (transactions[tp_level].serial == TRUE) {
/* mark the global for checkpointing */
cptab_insert (tp_level, gr->name);
}
free (gr);
transactions[tp_level].opcount = transactions[tp_level].opcount + 1;
oc = transactions[tp_level].opcount;
transactions[tp_level].ops[oc].is_lock = islock;
transactions[tp_level].ops[oc].action = action;
stcpy ((char *) &transactions[tp_level].ops[oc].key, key);
stcpy ((char *) &transactions[tp_level].ops[oc].data, data);
return TRUE;
}
int tp_tcommit(void)
{
register int i;
short is_serial = transactions[tp_level].serial;
tp_committing = TRUE;
if (is_serial) {
/* checkpoint all globals involved in transaction */
cptab_precommit (tp_level);
}
for(i = 1; i <= transactions[tp_level].opcount; i++) {
if (transactions[tp_level].ops[i].is_lock == FALSE) {
global (transactions[tp_level].ops[i].action, transactions[tp_level].ops[i].key, transactions[tp_level].ops[i].data);
if (merr () > OK) goto commit_error;
}
}
jnl_ent_write (JNLA_TCOMMIT, "\201", "\201");
if (is_serial) {
cptab_postcommit (tp_level);
}
goto commit_done;
commit_error:
tp_trollback (1);
commit_done:
tp_cleanup (1);
tp_committing = FALSE;
if (is_serial) {
tp_release_sem ();
}
return TRUE;
}
int tp_cleanup(int levels)
{
register int i;
for (i = tp_level; i >= (((tp_level - levels) >= 0) ? tp_level - levels : 0); i--) {
iftab_pop_tlevel (i);
}
tp_level = ((tp_level - levels) >= 0) ? tp_level - levels : 0;
return TRUE;
}
int tp_trollback(int levels)
{
register int i;
register int j;
// for (i = 0; i < levels; i++) {
for (i = tp_level; i >= (((tp_level - levels) >= 0) ? tp_level - levels : 0); i--) {
for (j = 1; j <= transactions[i].opcount; j++) {
if (transactions[i].ops[j].is_lock == TRUE) {
locktab_decrement (transactions[i].ops[j].key, -1);
}
}
if (transactions[i].serial == TRUE) {
cptab_rollback (i);
}
}
return TRUE;
}
int tp_trestart(void)
{
return 0;
}
void tp_tdump(void)
{
int i, j;
char tkey[256];
char tdata[256];
char tact[256];
set_io (UNIX);
if (tp_level == 0) {
printf("No transaction is active.\n");
return;
}
for(i = 1; i <= tp_level; i++) {
if(i == tp_level) {
printf(" $TLEVEL %d*\n", i);
}
else {
printf(" $TLEVEL %d\n", i);
}
printf(" Operations for Transaction ID: %s [%s%s]\n",
transactions[i].tp_id,
((transactions[i].restartable == TRUE) ? "RESTARTABLE" : "NON-RESTARTABLE"),
((transactions[i].serial == TRUE) ? " SERIAL" : " BATCH"));
printf ("\n %-10s%-15s%-15s\n", "OP. NO.", "ACTION", "KEY/DATA");
printf (" %-10s%-15s%-15s\n", "-------", "------", "--------");
for(j = 1; j <= transactions[i].opcount; j++) {
stcpy (tkey, transactions[i].ops[j].key);
stcnv_m2c (tkey);
stcpy (tdata, transactions[i].ops[j].data);
stcnv_m2c (tdata);
tp_get_op_name (tact, transactions[i].ops[j].action);
if (transactions[i].ops[j].action == set_sym) {
printf (" %-10d%-15s%s=%s\n", j, tact, tkey, tdata);
}
else {
printf (" %-10d%-15s%s\n", j, tact, tkey);
}
}
cptab_dump (i);
}
set_io (MUMPS);
}
void tp_get_op_name(char *buf, const short action)
{
switch (action) {
case set_sym:
strcpy (buf, "SET");
break;
case killone:
case kill_sym:
case kill_all:
case killexcl:
strcpy (buf, "KILL");
break;
case new_sym:
case new_all:
case newexcl:
strcpy (buf, "NEW");
break;
case get_sym:
strcpy (buf, "GET");
break;
case fra_dat:
strcpy (buf, "$DATA");
break;
case fra_order:
strcpy (buf, "$ORDER");
break;
case fra_query:
case bigquery:
strcpy (buf, "$QUERY");
break;
case getinc:
strcpy (buf, "$INCREMENT");
break;
case getnext:
strcpy (buf, "$NEXT");
break;
case lock_inc:
strcpy (buf, "LOCK (INCR)");
break;
case lock_old:
strcpy (buf, "LOCK (TRAD)");
break;
}
}
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>