Annotation of freem/src/transact.c, revision 1.2
1.1 snw 1: /*
2: * *
3: * * *
4: * * *
5: * ***************
6: * * * * *
7: * * MUMPS *
8: * * * * *
9: * ***************
10: * * *
11: * * *
12: * *
13: *
14: * transact.c
15: * FreeM transaction processing support
16: *
17: *
18: * Author: Serena Willis <jpw@coherent-logic.com>
19: * Copyright (C) 1998 MUG Deutschland
20: * Copyright (C) 2020 Coherent Logic Development LLC
21: *
22: *
23: * This file is part of FreeM.
24: *
25: * FreeM is free software: you can redistribute it and/or modify
26: * it under the terms of the GNU Affero Public License as published by
27: * the Free Software Foundation, either version 3 of the License, or
28: * (at your option) any later version.
29: *
30: * FreeM is distributed in the hope that it will be useful,
31: * but WITHOUT ANY WARRANTY; without even the implied warranty of
32: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
33: * GNU Affero Public License for more details.
34: *
35: * You should have received a copy of the GNU Affero Public License
36: * along with FreeM. If not, see <https://www.gnu.org/licenses/>.
37: *
38: **/
39:
40: #include <string.h>
41: #include <stddef.h>
42: #include <stdio.h>
43: #include <stdlib.h>
44: #include <unistd.h>
45: #include <errno.h>
46:
47: #include "mpsdef.h"
48: #include "transact.h"
49: #include "iftab.h"
50: #include "journal.h"
51: #include "shmmgr.h"
52: #include "mref.h"
53: #include "tp_check.h"
54:
55: #define FALSE 0
56: #define TRUE 1
57:
58: #if !defined(__OpenBSD__) && !defined(__APPLE__)
59: union semun {
60: int val; /* Value for SETVAL */
61: struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
62: unsigned short *array; /* Array for GETALL, SETALL */
63: struct seminfo *__buf; /* Buffer for IPC_INFO
64: (Linux-specific) */
65: };
66: #endif
67:
68: void m_log (int, const char *);
69:
70: int semid_tp;
71: int tp_committing = FALSE;
72: int tp_level = 0;
73: tp_transaction transactions[TP_MAX_NEST];
74:
75:
76: void tp_init(void)
77: {
78: union semun arg;
79: char err[255];
80: key_t tp_sk;
81:
82: tp_sk = ftok (config_file, 4);
83:
84: if (first_process) {
85:
86: semid_tp = semget (tp_sk, 1, 0666 | IPC_CREAT);
87: if (semid_tp == -1) {
88: fprintf (stderr, "tp_init: failed to create transaction processing semaphore [errno %d]\r\n", errno);
89: exit (1);
90: }
91:
92: arg.val = 1;
93: if (semctl (semid_tp, 0, SETVAL, arg) == -1) {
94: fprintf (stderr, "tp_init: failed to initialize transaction processing semaphore\r\n");
95: exit (1);
96: }
97:
98: }
99: else {
100:
101: semid_tp = semget (tp_sk, 1, 0);
102: if (semid_tp == -1) {
103: fprintf (stderr, "tp_init: could not attach to transaction processing semaphore [errno %d]\r\n", errno);
104: exit (1);
105: }
106:
107: }
108:
109: return;
110:
111: }
112:
113: short tp_get_sem(void)
114: {
115: int tries;
116: struct sembuf s = {0, -1, 0};
117:
118: char msgbuf[100];
119:
120: snprintf (msgbuf, 99, "tp_get_sem: process %d attempting to acquire transaction processing semaphore", pid);
121: m_log (1, msgbuf);
122:
123:
124: /* our process already owns the semaphore */
125: if (shm_config->hdr->tp_owner == pid) {
126:
127: snprintf (msgbuf, 99, "tp_get_sem: process %d increments transaction processing semaphore counter", pid);
128: m_log (1, msgbuf);
129:
130:
131: if (first_process == TRUE) {
132: fprintf (stderr, "tp_get_sem: daemon process increments critical section counter\r\n");
133: }
134:
135:
136: shm_config->hdr->tp_semctr++;
137:
138: return TRUE;
139: }
140:
141: if (first_process == TRUE) {
142: fprintf (stderr, "tp_get_sem: daemon process enters critical section\r\n");
143: }
144:
145:
146: for (tries = 0; tries < 10; tries++) {
147:
148: if (semop (semid_tp, &s, 1) != -1) {
149: shm_config->hdr->tp_owner = pid;
150: shm_config->hdr->tp_semctr = 1;
151:
152: snprintf (msgbuf, 99, "tp_get_sem: process %d takes transaction processing semaphore", pid);
153: m_log (1, msgbuf);
154:
155:
156: if (first_process == TRUE) {
157: fprintf (stderr, "tp_get_sem: daemon process takes transaction processing semaphore\r\n");
158: }
159:
160: return TRUE;
161: }
162:
163: snprintf (msgbuf, 99, "tp_get_sem: process %d attempting to acquire transaction processing semaphore (tries = %d)", pid, tries);
164: m_log (1, msgbuf);
165:
166:
167: sleep (1);
168:
169: }
170:
171: return FALSE;
172:
173: }
174:
175: void tp_release_sem(void)
176: {
177:
178: char msgbuf[100];
179:
180: if (shm_config->hdr->tp_semctr == 1) {
181:
182: struct sembuf s = {0, 1, 0};
183:
184: if (first_process == TRUE) {
185: fprintf (stderr, "tp_release_sem: daemon process leaves critical section\r\n");
186: }
187:
188:
189: shm_config->hdr->tp_semctr = 0;
190: shm_config->hdr->tp_owner = 0;
191:
192: if (first_process == TRUE) {
193: fprintf (stderr, "tp_release_sem: daemon process relinquishes transaction processing semaphore\r\n");
194: }
195:
196:
197: snprintf (msgbuf, 99, "tp_get_sem: process %d releases transaction processing semaphore", pid);
198: m_log (1, msgbuf);
199:
200:
201: semop (semid_tp, &s, 1);
202:
203: }
204: else {
205:
206: if (first_process == TRUE) {
207: fprintf (stderr, "tp_release_sem: daemon process decrements critical section counter\r\n");
208: }
209:
210: snprintf (msgbuf, 99, "tp_get_sem: process %d decrements transaction processing semaphore counter", pid);
211: m_log (1, msgbuf);
212:
213: shm_config->hdr->tp_semctr--;
214: }
215:
216:
217: }
218:
219: int tp_tstart(char *tp_id, short serial, short restartable, char **sym_save)
220: {
221: if (tp_level == TP_MAX_NEST) {
222: char m[256];
223:
224: snprintf (m, 256, "Attempt to exceed TP_MAX_NEST. Transaction aborted.\r\n\201");
225: write_m (m);
226:
227: return FALSE;
228: }
229:
230: if (((serial == TRUE) && (tp_get_sem () == TRUE)) ||
231: (serial == FALSE)) {
232:
233: tp_level++;
234:
235: jnl_ent_write (JNLA_TSTART, "", "");
236:
237: strcpy (transactions[tp_level].tp_id, tp_id);
238:
239: transactions[tp_level].serial = serial;
240: transactions[tp_level].restartable = restartable;
241:
242: transactions[tp_level].opcount = 0;
243:
244: return TRUE;
245:
246: }
247: else {
248: fprintf (stderr, "tp_tstart: could not get transaction processing semaphore\r\n");
249: exit (1);
250: }
251:
252:
253: }
254:
255: int tp_add_op(short islock, short action, char *key, char *data)
256: {
257: int oc;
258: freem_ref_t *gr;
259:
260: gr = (freem_ref_t *) malloc (sizeof (freem_ref_t));
261: NULLPTRCHK(gr,"tp_add_op");
262:
263: mref_init (gr, MREF_RT_GLOBAL, "");
264: internal_to_mref (gr, key);
265:
266: if (transactions[tp_level].opcount == TP_MAX_OPS) {
267: char m[256];
268:
269: snprintf (m, 256, "Attempt to exceed TP_MAX_OPS at transaction level %d. Rolling back.\r\n\201", tp_level);
270: write_m (m);
271:
272: free (gr);
273:
274: tp_trollback (1);
275: tp_cleanup (1);
276:
277: if (transactions[tp_level].serial == TRUE) {
278: tp_release_sem();
279: }
280:
281: return FALSE;
282: }
283:
284: /* update transaction-in-flight symbol table */
285: switch (action) {
286:
287: case lock_inc:
288: case lock_dec:
289: case lock_old:
290: case set_sym:
291: iftab_insert (action, key, data, tp_level);
292: break;
293:
294: case kill_sym:
295: iftab_kill (key);
296: break;
297:
298: }
299:
300: if (transactions[tp_level].serial == TRUE) {
301: /* mark the global for checkpointing */
302: cptab_insert (tp_level, gr->name);
303: }
304:
305: free (gr);
306:
307: transactions[tp_level].opcount = transactions[tp_level].opcount + 1;
308:
309: oc = transactions[tp_level].opcount;
310:
311: transactions[tp_level].ops[oc].is_lock = islock;
312: transactions[tp_level].ops[oc].action = action;
313:
1.2 ! snw 314: stcpy ((char *) &transactions[tp_level].ops[oc].key, key);
! 315: stcpy ((char *) &transactions[tp_level].ops[oc].data, data);
1.1 snw 316:
317:
318: return TRUE;
319: }
320:
321: int tp_tcommit(void)
322: {
323: register int i;
324: short is_serial = transactions[tp_level].serial;
325:
326: tp_committing = TRUE;
327:
328: if (is_serial) {
329: /* checkpoint all globals involved in transaction */
330: cptab_precommit (tp_level);
331: }
332:
333: for(i = 1; i <= transactions[tp_level].opcount; i++) {
334:
335: if (transactions[tp_level].ops[i].is_lock == FALSE) {
336: global (transactions[tp_level].ops[i].action, transactions[tp_level].ops[i].key, transactions[tp_level].ops[i].data);
337:
338: if (merr () > OK) goto commit_error;
339:
340: }
341:
342: }
343:
344: jnl_ent_write (JNLA_TCOMMIT, "\201", "\201");
345:
346: if (is_serial) {
347: cptab_postcommit (tp_level);
348: }
349:
350: goto commit_done;
351:
352: commit_error:
353:
354: tp_trollback (1);
355:
356: commit_done:
357:
358: tp_cleanup (1);
359:
360: tp_committing = FALSE;
361:
362: if (is_serial) {
363: tp_release_sem ();
364: }
365:
366: return TRUE;
367: }
368:
369: int tp_cleanup(int levels)
370: {
371: register int i;
372:
373: for (i = tp_level; i >= (((tp_level - levels) >= 0) ? tp_level - levels : 0); i--) {
374: iftab_pop_tlevel (i);
375: }
376:
377: tp_level = ((tp_level - levels) >= 0) ? tp_level - levels : 0;
378:
379: return TRUE;
380: }
381:
382: int tp_trollback(int levels)
383: {
384: register int i;
385: register int j;
386:
387: // for (i = 0; i < levels; i++) {
388: for (i = tp_level; i >= (((tp_level - levels) >= 0) ? tp_level - levels : 0); i--) {
389:
390: for (j = 1; j <= transactions[i].opcount; j++) {
391:
392: if (transactions[i].ops[j].is_lock == TRUE) {
393: locktab_decrement (transactions[i].ops[j].key, -1);
394: }
395:
396: }
397:
398: if (transactions[i].serial == TRUE) {
399: cptab_rollback (i);
400: }
401:
402: }
403:
404:
405: return TRUE;
406: }
407:
408: int tp_trestart(void)
409: {
410: return 0;
411: }
412:
413: void tp_tdump(void)
414: {
415:
416: int i, j;
417:
418: char tkey[256];
419: char tdata[256];
420: char tact[256];
421:
422: set_io (UNIX);
423:
424: if (tp_level == 0) {
425: printf("No transaction is active.\n");
426:
427: return;
428: }
429:
430: for(i = 1; i <= tp_level; i++) {
431:
432: if(i == tp_level) {
433: printf(" $TLEVEL %d*\n", i);
434: }
435: else {
436: printf(" $TLEVEL %d\n", i);
437: }
438:
439: printf(" Operations for Transaction ID: %s [%s%s]\n",
440: transactions[i].tp_id,
441: ((transactions[i].restartable == TRUE) ? "RESTARTABLE" : "NON-RESTARTABLE"),
442: ((transactions[i].serial == TRUE) ? " SERIAL" : " BATCH"));
443:
444: printf ("\n %-10s%-15s%-15s\n", "OP. NO.", "ACTION", "KEY/DATA");
445: printf (" %-10s%-15s%-15s\n", "-------", "------", "--------");
446:
447:
448: for(j = 1; j <= transactions[i].opcount; j++) {
449: stcpy (tkey, transactions[i].ops[j].key);
450: stcnv_m2c (tkey);
451: stcpy (tdata, transactions[i].ops[j].data);
452: stcnv_m2c (tdata);
453:
454: tp_get_op_name (tact, transactions[i].ops[j].action);
455:
456: if (transactions[i].ops[j].action == set_sym) {
457: printf (" %-10d%-15s%s=%s\n", j, tact, tkey, tdata);
458: }
459: else {
460: printf (" %-10d%-15s%s\n", j, tact, tkey);
461: }
462:
463: }
464:
465: cptab_dump (i);
466:
467: }
468:
469:
470: set_io (MUMPS);
471: }
472:
473: void tp_get_op_name(char *buf, const short action)
474: {
475: switch (action) {
476:
477: case set_sym:
478: strcpy (buf, "SET");
479: break;
480:
481: case killone:
482: case kill_sym:
483: case kill_all:
484: case killexcl:
485: strcpy (buf, "KILL");
486: break;
487:
488: case new_sym:
489: case new_all:
490: case newexcl:
491: strcpy (buf, "NEW");
492: break;
493:
494: case get_sym:
495: strcpy (buf, "GET");
496: break;
497:
498: case dat:
499: strcpy (buf, "$DATA");
500: break;
501:
502: case fra_order:
503: strcpy (buf, "$ORDER");
504: break;
505:
506: case fra_query:
507: case bigquery:
508: strcpy (buf, "$QUERY");
509: break;
510:
511: case getinc:
512: strcpy (buf, "$INCREMENT");
513: break;
514:
515: case getnext:
516: strcpy (buf, "$NEXT");
517: break;
518:
519: case lock_inc:
520: strcpy (buf, "LOCK (INCR)");
521: break;
522:
523: case lock_old:
524: strcpy (buf, "LOCK (TRAD)");
525: break;
526:
527: }
528:
529: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>