Annotation of freem/src/transact.c, revision 1.4
1.1 snw 1: /*
1.4 ! snw 2: * $Id$
1.1 snw 3: * FreeM transaction processing support
4: *
5: *
1.3 snw 6: * Author: Serena Willis <snw@coherent-logic.com>
1.1 snw 7: * Copyright (C) 1998 MUG Deutschland
1.4 ! snw 8: * Copyright (C) 2020, 2025 Coherent Logic Development LLC
1.1 snw 9: *
10: *
11: * This file is part of FreeM.
12: *
13: * FreeM is free software: you can redistribute it and/or modify
14: * it under the terms of the GNU Affero Public License as published by
15: * the Free Software Foundation, either version 3 of the License, or
16: * (at your option) any later version.
17: *
18: * FreeM is distributed in the hope that it will be useful,
19: * but WITHOUT ANY WARRANTY; without even the implied warranty of
20: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21: * GNU Affero Public License for more details.
22: *
23: * You should have received a copy of the GNU Affero Public License
24: * along with FreeM. If not, see <https://www.gnu.org/licenses/>.
25: *
1.4 ! snw 26: * $Log$
! 27: *
! 28: * SPDX-FileCopyrightText: (C) 2025 Coherent Logic Development LLC
! 29: * SPDX-License-Identifier: AGPL-3.0-or-later
1.1 snw 30: **/
31:
32: #include <string.h>
33: #include <stddef.h>
34: #include <stdio.h>
35: #include <stdlib.h>
36: #include <unistd.h>
37: #include <errno.h>
38:
39: #include "mpsdef.h"
40: #include "transact.h"
41: #include "iftab.h"
42: #include "journal.h"
43: #include "shmmgr.h"
44: #include "mref.h"
45: #include "tp_check.h"
46:
47: #define FALSE 0
48: #define TRUE 1
49:
50: #if !defined(__OpenBSD__) && !defined(__APPLE__)
51: union semun {
52: int val; /* Value for SETVAL */
53: struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
54: unsigned short *array; /* Array for GETALL, SETALL */
55: struct seminfo *__buf; /* Buffer for IPC_INFO
56: (Linux-specific) */
57: };
58: #endif
59:
60: void m_log (int, const char *);
61:
62: int semid_tp;
63: int tp_committing = FALSE;
64: int tp_level = 0;
65: tp_transaction transactions[TP_MAX_NEST];
66:
67:
68: void tp_init(void)
69: {
70: union semun arg;
71: char err[255];
72: key_t tp_sk;
73:
74: tp_sk = ftok (config_file, 4);
75:
76: if (first_process) {
77:
78: semid_tp = semget (tp_sk, 1, 0666 | IPC_CREAT);
79: if (semid_tp == -1) {
80: fprintf (stderr, "tp_init: failed to create transaction processing semaphore [errno %d]\r\n", errno);
81: exit (1);
82: }
83:
84: arg.val = 1;
85: if (semctl (semid_tp, 0, SETVAL, arg) == -1) {
86: fprintf (stderr, "tp_init: failed to initialize transaction processing semaphore\r\n");
87: exit (1);
88: }
89:
90: }
91: else {
92:
93: semid_tp = semget (tp_sk, 1, 0);
94: if (semid_tp == -1) {
95: fprintf (stderr, "tp_init: could not attach to transaction processing semaphore [errno %d]\r\n", errno);
96: exit (1);
97: }
98:
99: }
100:
101: return;
102:
103: }
104:
105: short tp_get_sem(void)
106: {
107: int tries;
108: struct sembuf s = {0, -1, 0};
109:
110: char msgbuf[100];
111:
112: snprintf (msgbuf, 99, "tp_get_sem: process %d attempting to acquire transaction processing semaphore", pid);
113: m_log (1, msgbuf);
114:
115:
116: /* our process already owns the semaphore */
117: if (shm_config->hdr->tp_owner == pid) {
118:
119: snprintf (msgbuf, 99, "tp_get_sem: process %d increments transaction processing semaphore counter", pid);
120: m_log (1, msgbuf);
121:
122:
123: if (first_process == TRUE) {
124: fprintf (stderr, "tp_get_sem: daemon process increments critical section counter\r\n");
125: }
126:
127:
128: shm_config->hdr->tp_semctr++;
129:
130: return TRUE;
131: }
132:
133: if (first_process == TRUE) {
134: fprintf (stderr, "tp_get_sem: daemon process enters critical section\r\n");
135: }
136:
137:
138: for (tries = 0; tries < 10; tries++) {
139:
140: if (semop (semid_tp, &s, 1) != -1) {
141: shm_config->hdr->tp_owner = pid;
142: shm_config->hdr->tp_semctr = 1;
143:
144: snprintf (msgbuf, 99, "tp_get_sem: process %d takes transaction processing semaphore", pid);
145: m_log (1, msgbuf);
146:
147:
148: if (first_process == TRUE) {
149: fprintf (stderr, "tp_get_sem: daemon process takes transaction processing semaphore\r\n");
150: }
151:
152: return TRUE;
153: }
154:
155: snprintf (msgbuf, 99, "tp_get_sem: process %d attempting to acquire transaction processing semaphore (tries = %d)", pid, tries);
156: m_log (1, msgbuf);
157:
158:
159: sleep (1);
160:
161: }
162:
163: return FALSE;
164:
165: }
166:
167: void tp_release_sem(void)
168: {
169:
170: char msgbuf[100];
171:
172: if (shm_config->hdr->tp_semctr == 1) {
173:
174: struct sembuf s = {0, 1, 0};
175:
176: if (first_process == TRUE) {
177: fprintf (stderr, "tp_release_sem: daemon process leaves critical section\r\n");
178: }
179:
180:
181: shm_config->hdr->tp_semctr = 0;
182: shm_config->hdr->tp_owner = 0;
183:
184: if (first_process == TRUE) {
185: fprintf (stderr, "tp_release_sem: daemon process relinquishes transaction processing semaphore\r\n");
186: }
187:
188:
189: snprintf (msgbuf, 99, "tp_get_sem: process %d releases transaction processing semaphore", pid);
190: m_log (1, msgbuf);
191:
192:
193: semop (semid_tp, &s, 1);
194:
195: }
196: else {
197:
198: if (first_process == TRUE) {
199: fprintf (stderr, "tp_release_sem: daemon process decrements critical section counter\r\n");
200: }
201:
202: snprintf (msgbuf, 99, "tp_get_sem: process %d decrements transaction processing semaphore counter", pid);
203: m_log (1, msgbuf);
204:
205: shm_config->hdr->tp_semctr--;
206: }
207:
208:
209: }
210:
211: int tp_tstart(char *tp_id, short serial, short restartable, char **sym_save)
212: {
213: if (tp_level == TP_MAX_NEST) {
214: char m[256];
215:
216: snprintf (m, 256, "Attempt to exceed TP_MAX_NEST. Transaction aborted.\r\n\201");
217: write_m (m);
218:
219: return FALSE;
220: }
221:
222: if (((serial == TRUE) && (tp_get_sem () == TRUE)) ||
223: (serial == FALSE)) {
224:
225: tp_level++;
226:
227: jnl_ent_write (JNLA_TSTART, "", "");
228:
229: strcpy (transactions[tp_level].tp_id, tp_id);
230:
231: transactions[tp_level].serial = serial;
232: transactions[tp_level].restartable = restartable;
233:
234: transactions[tp_level].opcount = 0;
235:
236: return TRUE;
237:
238: }
239: else {
240: fprintf (stderr, "tp_tstart: could not get transaction processing semaphore\r\n");
241: exit (1);
242: }
243:
244:
245: }
246:
247: int tp_add_op(short islock, short action, char *key, char *data)
248: {
249: int oc;
250: freem_ref_t *gr;
251:
252: gr = (freem_ref_t *) malloc (sizeof (freem_ref_t));
253: NULLPTRCHK(gr,"tp_add_op");
254:
255: mref_init (gr, MREF_RT_GLOBAL, "");
256: internal_to_mref (gr, key);
257:
258: if (transactions[tp_level].opcount == TP_MAX_OPS) {
259: char m[256];
260:
261: snprintf (m, 256, "Attempt to exceed TP_MAX_OPS at transaction level %d. Rolling back.\r\n\201", tp_level);
262: write_m (m);
263:
264: free (gr);
265:
266: tp_trollback (1);
267: tp_cleanup (1);
268:
269: if (transactions[tp_level].serial == TRUE) {
270: tp_release_sem();
271: }
272:
273: return FALSE;
274: }
275:
276: /* update transaction-in-flight symbol table */
277: switch (action) {
278:
279: case lock_inc:
280: case lock_dec:
281: case lock_old:
282: case set_sym:
283: iftab_insert (action, key, data, tp_level);
284: break;
285:
286: case kill_sym:
287: iftab_kill (key);
288: break;
289:
290: }
291:
292: if (transactions[tp_level].serial == TRUE) {
293: /* mark the global for checkpointing */
294: cptab_insert (tp_level, gr->name);
295: }
296:
297: free (gr);
298:
299: transactions[tp_level].opcount = transactions[tp_level].opcount + 1;
300:
301: oc = transactions[tp_level].opcount;
302:
303: transactions[tp_level].ops[oc].is_lock = islock;
304: transactions[tp_level].ops[oc].action = action;
305:
1.2 snw 306: stcpy ((char *) &transactions[tp_level].ops[oc].key, key);
307: stcpy ((char *) &transactions[tp_level].ops[oc].data, data);
1.1 snw 308:
309:
310: return TRUE;
311: }
312:
313: int tp_tcommit(void)
314: {
315: register int i;
316: short is_serial = transactions[tp_level].serial;
317:
318: tp_committing = TRUE;
319:
320: if (is_serial) {
321: /* checkpoint all globals involved in transaction */
322: cptab_precommit (tp_level);
323: }
324:
325: for(i = 1; i <= transactions[tp_level].opcount; i++) {
326:
327: if (transactions[tp_level].ops[i].is_lock == FALSE) {
328: global (transactions[tp_level].ops[i].action, transactions[tp_level].ops[i].key, transactions[tp_level].ops[i].data);
329:
330: if (merr () > OK) goto commit_error;
331:
332: }
333:
334: }
335:
336: jnl_ent_write (JNLA_TCOMMIT, "\201", "\201");
337:
338: if (is_serial) {
339: cptab_postcommit (tp_level);
340: }
341:
342: goto commit_done;
343:
344: commit_error:
345:
346: tp_trollback (1);
347:
348: commit_done:
349:
350: tp_cleanup (1);
351:
352: tp_committing = FALSE;
353:
354: if (is_serial) {
355: tp_release_sem ();
356: }
357:
358: return TRUE;
359: }
360:
361: int tp_cleanup(int levels)
362: {
363: register int i;
364:
365: for (i = tp_level; i >= (((tp_level - levels) >= 0) ? tp_level - levels : 0); i--) {
366: iftab_pop_tlevel (i);
367: }
368:
369: tp_level = ((tp_level - levels) >= 0) ? tp_level - levels : 0;
370:
371: return TRUE;
372: }
373:
374: int tp_trollback(int levels)
375: {
376: register int i;
377: register int j;
378:
379: // for (i = 0; i < levels; i++) {
380: for (i = tp_level; i >= (((tp_level - levels) >= 0) ? tp_level - levels : 0); i--) {
381:
382: for (j = 1; j <= transactions[i].opcount; j++) {
383:
384: if (transactions[i].ops[j].is_lock == TRUE) {
385: locktab_decrement (transactions[i].ops[j].key, -1);
386: }
387:
388: }
389:
390: if (transactions[i].serial == TRUE) {
391: cptab_rollback (i);
392: }
393:
394: }
395:
396:
397: return TRUE;
398: }
399:
400: int tp_trestart(void)
401: {
402: return 0;
403: }
404:
405: void tp_tdump(void)
406: {
407:
408: int i, j;
409:
410: char tkey[256];
411: char tdata[256];
412: char tact[256];
413:
414: set_io (UNIX);
415:
416: if (tp_level == 0) {
417: printf("No transaction is active.\n");
418:
419: return;
420: }
421:
422: for(i = 1; i <= tp_level; i++) {
423:
424: if(i == tp_level) {
425: printf(" $TLEVEL %d*\n", i);
426: }
427: else {
428: printf(" $TLEVEL %d\n", i);
429: }
430:
431: printf(" Operations for Transaction ID: %s [%s%s]\n",
432: transactions[i].tp_id,
433: ((transactions[i].restartable == TRUE) ? "RESTARTABLE" : "NON-RESTARTABLE"),
434: ((transactions[i].serial == TRUE) ? " SERIAL" : " BATCH"));
435:
436: printf ("\n %-10s%-15s%-15s\n", "OP. NO.", "ACTION", "KEY/DATA");
437: printf (" %-10s%-15s%-15s\n", "-------", "------", "--------");
438:
439:
440: for(j = 1; j <= transactions[i].opcount; j++) {
441: stcpy (tkey, transactions[i].ops[j].key);
442: stcnv_m2c (tkey);
443: stcpy (tdata, transactions[i].ops[j].data);
444: stcnv_m2c (tdata);
445:
446: tp_get_op_name (tact, transactions[i].ops[j].action);
447:
448: if (transactions[i].ops[j].action == set_sym) {
449: printf (" %-10d%-15s%s=%s\n", j, tact, tkey, tdata);
450: }
451: else {
452: printf (" %-10d%-15s%s\n", j, tact, tkey);
453: }
454:
455: }
456:
457: cptab_dump (i);
458:
459: }
460:
461:
462: set_io (MUMPS);
463: }
464:
465: void tp_get_op_name(char *buf, const short action)
466: {
467: switch (action) {
468:
469: case set_sym:
470: strcpy (buf, "SET");
471: break;
472:
473: case killone:
474: case kill_sym:
475: case kill_all:
476: case killexcl:
477: strcpy (buf, "KILL");
478: break;
479:
480: case new_sym:
481: case new_all:
482: case newexcl:
483: strcpy (buf, "NEW");
484: break;
485:
486: case get_sym:
487: strcpy (buf, "GET");
488: break;
489:
490: case dat:
491: strcpy (buf, "$DATA");
492: break;
493:
494: case fra_order:
495: strcpy (buf, "$ORDER");
496: break;
497:
498: case fra_query:
499: case bigquery:
500: strcpy (buf, "$QUERY");
501: break;
502:
503: case getinc:
504: strcpy (buf, "$INCREMENT");
505: break;
506:
507: case getnext:
508: strcpy (buf, "$NEXT");
509: break;
510:
511: case lock_inc:
512: strcpy (buf, "LOCK (INCR)");
513: break;
514:
515: case lock_old:
516: strcpy (buf, "LOCK (TRAD)");
517: break;
518:
519: }
520:
521: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>